Lavavel5.5源代码 - 并发数控制

app(‘redis‘)->connection(‘default‘)->funnel(‘key000‘)
               // 每个资源最大锁定10秒自动过期,只有60个资源(并发),在3秒内获取不到锁抛出异常
               ->releaseAfter(10)->limit(60)->block(3)
               ->then(function () {
                    // 获取锁成功,执行业务
               }, function () {
                   // 获取锁失败
                   return false;
               });

  

<?php

namespace Illuminate\Redis\Limiters;

use Illuminate\Contracts\Redis\LimiterTimeoutException;

class ConcurrencyLimiter
{
    /**
     * The Redis factory implementation.
     *
     * @var \Illuminate\Redis\Connections\Connection
     */
    protected $redis;

    /**
     * The name of the limiter.
     *
     * @var string
     */
    protected $name;

    /**
     * The allowed number of concurrent tasks.
     *
     * @var int
     */
    protected $maxLocks;

    /**
     * The number of seconds a slot should be maintained.
     *
     * @var int
     */
    protected $releaseAfter;

    /**
     * Create a new concurrency limiter instance.
     *
     * @param  \Illuminate\Redis\Connections\Connection  $redis
     * @param  string  $name
     * @param  int  $maxLocks
     * @param  int  $releaseAfter
     * @return void
     */
    public function __construct($redis, $name, $maxLocks, $releaseAfter)
    {
        $this->name = $name;
        $this->redis = $redis;
        $this->maxLocks = $maxLocks;
        $this->releaseAfter = $releaseAfter;
    }

    /**
     * Attempt to acquire the lock for the given number of seconds.
     *
     * @param  int  $timeout
     * @param  callable|null  $callback
     * @return bool
     * @throws \Illuminate\Contracts\Redis\LimiterTimeoutException
     */
    public function block($timeout, $callback = null)
    {
        $starting = time();

        while (! $slot = $this->acquire()) {
            if (time() - $timeout >= $starting) {
                throw new LimiterTimeoutException;
            }

            usleep(250 * 1000);
        }

        if (is_callable($callback)) {
            return tap($callback(), function () use ($slot) {
                $this->release($slot);
            });
        }

        return true;
    }

    /**
     * Attempt to acquire the lock.
     *
     * @return mixed
     */
    protected function acquire()
    {
        $slots = array_map(function ($i) {
            return $this->name.$i;
        }, range(1, $this->maxLocks));

        return $this->redis->eval($this->luaScript(), count($slots),
            ...array_merge($slots, [$this->name, $this->releaseAfter])
        );
    }

    /**
     * Get the Lua script for acquiring a lock.
     *
     * KEYS    - The keys that represent available slots
     * ARGV[1] - The limiter name
     * ARGV[2] - The number of seconds the slot should be reserved
     *
     * @return string
     */
    protected function luaScript()
    {
        return <<<‘LUA‘
for index, value in pairs(redis.call(‘mget‘, unpack(KEYS))) do
    if not value then
        redis.call(‘set‘, ARGV[1]..index, "1", "EX", ARGV[2])
        return ARGV[1]..index
    end
end
LUA;
    }

    /**
     * Release the lock.
     *
     * @param  string  $key
     * @return void
     */
    protected function release($key)
    {
        $this->redis->del($key);
    }
}

  

原文地址:https://www.cnblogs.com/xiaoyaogege/p/10724322.html

时间: 2024-10-12 04:27:42

Lavavel5.5源代码 - 并发数控制的相关文章

Linux Shell多进程并发以及并发数控制

1. 基础知识准备 1.1. linux后台进程 Unix是一个多任务系统,允许多用户同时运行多个程序.shell的元字符&提供了在后台运行不需要键盘输入的程序的方法.输入命令后,其后紧跟&字符,该命令就会被送往到linux后台执行,而终端又可以继续输入下一个命令了. 比如: sh a.sh & sh b.sh & sh c.sh & 这三个命令就会被同时送往linux后台执行,在这个程度上,认为这三个命令并发执行了. 1.2. linux文件描述符 文件描述符(缩

Java并发工具类之并发数控制神器Semaphore

Semaphore(信号量)使用来控制通知访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源. 我们可以这么理解Semaphore,比如一个厕所只有6个坑,同时只能满足6个人上厕所(变态除外),其他人想蹲坑,只能排队等待,如果有人从厕所出来,后面的一个人就可以进去.在这个例子中人就是线程,蹲坑表示线程在执行,离开表示线程执行完毕,而坑的数量就表示Semaphore的个数. 一.Semaphore的应用场景 Semaphore可以用于做流量控制,特别是公用资源有限的应用场景,比如

Linux-Shell-使用mkfifo实现多任务并发及并发数控制

默认的情况下,Shell脚本中的命令是串行执行的,必须等到前一条命令执行完后才执行接下来的命令,但是如果我有一大批的的命令需要执行,而且互相又没有影响的情况下(有影响的话就比较复杂了),那么就要使用命令的并发执行了. 如下: #!/bin/bash IPLIST=/home/meta/ipinfo/iplist for i in $(cat ${IPLIST} |grep -viE "^#|备机|ts"|awk '{print $1}') do ssh $i "cd ~/up

shell多进程并发数控制

在批量执行任务是,单进程执行速度太慢,使用&不加数量控制,又担心资源占用过多,导致宕机等问题,因此我们需要控制并发进程的数量,保证效率的同时,保证资源占用不会太高. 其中一个解决思路是利用简单的生产者-消费者模型. 以下为范例脚本: #!/bin/bash fifo_file=a.pipe mkfifo $fifo_file #创建任务队列 exec 5<>$fifo_file rm $fifo_file process_num=10 #并发任务的数量 function task()

Tomcat并发数优化,修改service.xml性能调优 增加最大并发连接数

可以在控制台的启动信息里看见,默认状态下没有被打开nio配置,启动时的信息,如下: 2010-2-1 12:59:40 org.apache.coyote.http11.Http11Protocol init 信息: Initializing Coyote HTTP/1.1 on http-8080 2010-2-1 12:59:40 org.apache.catalina.startup.Catalina load 修改成支持NIO的类型,配置如下: Java代码   <Connector p

在YARN中,如何控制和监控map/reduce的并发数

配置建议: 1.     In MR1, the mapred.tasktracker.map.tasks.maximum and mapred.tasktracker.reduce.tasks.maximum properties dictated how many map and reduce slots each TaskTracker had. These properties no longer exist in YARN. Instead, YARN uses yarn.nodema

信号量 也是同步锁,可用来控制线程的并发数

# 信号量 也是同步锁,可用来控制线程的并发数 import threading, time class MyThread(threading.Thread): def run(self): if semaphore.acquire(): # 同时运行五个线程,acquire()放一个进程进去计数器-1 print(self.name) # 计数器为0时阻塞线程至同步锁定状态,等待release() time.sleep(3) semaphore.release() # release()运行一

高并发实时弹幕系统 并发数一定是可以进行控制的 每个需要异步处理开启的 Goroutine(Go 协程)都必须预先创建好固定的个数,如果不提前进行控制,那么 Goroutine 就随时存在爆发的可能。

小结: 1.内存优化1.一个消息一定只有一块内存使用 Job 聚合消息,Comet 指针引用. 2.一个用户的内存尽量放到栈上内存创建在对应的用户 Goroutine(Go 程)中. 3.内存由自己控制主要是针对 Comet 模块所做的优化,可以查看模块中各个分配内存的地方,使用内存池. 2.模块优化1.消息分发一定是并行的并且互不干扰要保证到每一个 Comet 的通讯通道必须是相互独立的,保证消息分发必须是完全并列的,并且彼此之间互不干扰. 2.并发数一定是可以进行控制的每个需要异步处理开启的

dubbo是如何控制并发数和限流的?

ExecuteLimitFilter ExecuteLimitFilter ,在服务提供者,通过 的 "executes" 统一配置项开启: 表示每服务的每方法最大可并行执行请求数. ExecuteLimitFilter是通过信号量来实现的对服务端的并发数的控制. ExecuteLimitFilter执行流程: 首先会去获得服务提供者每服务每方法最大可并行执行请求数 如果每服务每方法最大可并行执行请求数大于零,那么就基于基于服务 URL + 方法维度获取一个RpcStatus实例 通过