基于Redis实现分布式锁以及任务队列

一、前言

  双十一刚过不久,大家都知道在天猫、京东、苏宁等等电商网站上有很多秒杀活动,例如在某一个时刻抢购一个原价1999现在秒杀价只要999的手机时,会迎来一个用户请求的高峰期,可能会有几十万几百万的并发量,来抢这个手机,在高并发的情形下会对数据库服务器或者是文件服务器应用服务器造成巨大的压力,严重时说不定就宕机了,另一个问题是,秒杀的东西都是有量的,例如一款手机只有10台的量秒杀,那么,在高并发的情况下,成千上万条数据更新数据库(例如10台的量被人抢一台就会在数据集某些记录下 减1),那次这个时候的先后顺序是很乱的,很容易出现10台的量,抢到的人就不止10个这种严重的问题。那么,以后所说的问题我们该如何去解决呢?

接下来我所分享的技术就可以拿来处理以上的问题: 分布式锁任务队列

二、实现思路

1.Redis实现分布式锁思路

  思路很简单,主要用到的redis函数是setnx(),这个应该是实现分布式锁最主要的函数。首先是将某一任务标识名(这里用Lock:order作为标识名的例子)作为键存到redis里,并为其设个过期时间,如果是还有Lock:order请求过来,先是通过setnx()看看是否能将Lock:order插入到redis里,可以的话就返回true,不可以就返回false。当然,在我的代码里会比这个思路复杂一些,我会在分析代码时进一步说明。

2.Redis实现任务队列

  这里的实现会用到上面的Redis分布式的锁机制,主要是用到了Redis里的有序集合这一数据结构。例如入队时,通过zset的add()函数进行入队,而出对时,可以用到zset的getScore()函数。另外还可以弹出顶部的几个任务。

  以上就是实现 分布式锁 和 任务队列 的简单思路,如果你看完有点模棱两可,那请看接下来的代码实现。

三、代码分析

(一)先来分析Redis分布式锁的代码实现  

(1)为避免特殊原因导致锁无法释放,在加锁成功后,锁会被赋予一个生存时间(通过lock方法的参数设置或者使用默认值),超出生存时间锁会被自动释放锁的生存时间默认比较短(秒级),因此,若需要长时间加锁,可以通过expire方法延长锁的生存时间为适当时间,比如在循环内。

(2)系统级的锁当进程无论何种原因时出现crash时,操作系统会自己回收锁,所以不会出现资源丢失,但分布式锁不用,若一次性设置很长时间,一旦由于各种原因出现进程crash 或者其他异常导致unlock未被调用时,则该锁在剩下的时间就会变成垃圾锁,导致其他进程或者进程重启后无法进入加锁区域。

先看加锁的实现代码:这里需要主要两个参数,一个是$timeout,这个是循环获取锁的等待时间,在这个时间内会一直尝试获取锁知道超时,如果为0,则表示获取锁失败后直接返回而不再等待;另一个重要参数的$expire,这个参数指当前锁的最大生存时间,以秒为单位的,它必须大于0,如果超过生存时间锁仍未被释放,则系统会自动强制释放。这个参数的最要作用请看上面的(1)里的解释。

  这里先取得当前时间,然后再获取到锁失败时的等待超时的时刻(是个时间戳),再获取到锁的最大生存时刻是多少。这里redis的key用这种格式:"Lock:锁的标识名",这里就开始进入循环了,先是插入数据到redis里,使用setnx()函数,这函数的意思是,如果该键不存在则插入数据,将最大生存时刻作为值存储,假如插入成功,则对该键进行失效时间的设置,并将该键放在$lockedName数组里,返回true,也就是上锁成功;如果该键存在,则不会插入操作了,这里有一步严谨的操作,那就是取得当前键的剩余时间,假如这个时间小于0,表示key上没有设置生存时间(key是不会不存在的,因为前面setnx会自动创建)如果出现这种状况,那就是进程的某个实例setnx成功后 crash 导致紧跟着的expire没有被调用,这时可以直接设置expire并把锁纳为己用。如果没设置锁失败的等待时间 或者 已超过最大等待时间了,那就退出循环,反之则 隔 $waitIntervalUs 后继续 请求。  这就是加锁的整一个代码分析。

?


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

/**

   * 加锁

   * @param [type] $name      锁的标识名

   * @param integer $timeout    循环获取锁的等待超时时间,在此时间内会一直尝试获取锁直到超时,为0表示失败后直接返回不等待

   * @param integer $expire     当前锁的最大生存时间(秒),必须大于0,如果超过生存时间锁仍未被释放,则系统会自动强制释放

   * @param integer $waitIntervalUs 获取锁失败后挂起再试的时间间隔(微秒)

   * @return [type]         [description]

   */

  public function lock($name, $timeout = 0, $expire = 15, $waitIntervalUs = 100000) {

    if ($name == null) return false;

    //取得当前时间

    $now = time();

    //获取锁失败时的等待超时时刻

    $timeoutAt = $now + $timeout;

    //锁的最大生存时刻

    $expireAt = $now + $expire;

    $redisKey = "Lock:{$name}";

    while (true) {

      //将rediskey的最大生存时刻存到redis里,过了这个时刻该锁会被自动释放

      $result = $this->redisString->setnx($redisKey, $expireAt);

      if ($result != false) {

        //设置key的失效时间

        $this->redisString->expire($redisKey, $expireAt);

        //将锁标志放到lockedNames数组里

        $this->lockedNames[$name] = $expireAt;

        return true;

      }

      //以秒为单位,返回给定key的剩余生存时间

      $ttl = $this->redisString->ttl($redisKey);

      //ttl小于0 表示key上没有设置生存时间(key是不会不存在的,因为前面setnx会自动创建)

      //如果出现这种状况,那就是进程的某个实例setnx成功后 crash 导致紧跟着的expire没有被调用

      //这时可以直接设置expire并把锁纳为己用

      if ($ttl < 0) {

        $this->redisString->set($redisKey, $expireAt);

        $this->lockedNames[$name] = $expireAt;

        return true;

      }

      /*****循环请求锁部分*****/

      //如果没设置锁失败的等待时间 或者 已超过最大等待时间了,那就退出

      if ($timeout <= 0 || $timeoutAt < microtime(true)) break;

      //隔 $waitIntervalUs 后继续 请求

      usleep($waitIntervalUs);

    }

    return false;

  }

  接着看解锁的代码分析:解锁就简单多了,传入参数就是锁标识,先是判断是否存在该锁,存在的话,就从redis里面通过deleteKey()函数删除掉锁标识即可。

?


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

/**

   * 解锁

   * @param [type] $name [description]

   * @return [type]    [description]

   */

  public function unlock($name) {

    //先判断是否存在此锁

    if ($this->isLocking($name)) {

      //删除锁

      if ($this->redisString->deleteKey("Lock:$name")) {

        //清掉lockedNames里的锁标志

        unset($this->lockedNames[$name]);

        return true;

      }

    }

    return false;

  }

    在贴上删除掉所有锁的方法,其实都一个样,多了个循环遍历而已。

/**

   * 释放当前所有获得的锁

   * @return [type] [description]

   */

  public function unlockAll() {

    //此标志是用来标志是否释放所有锁成功

    $allSuccess = true;

    foreach ($this->lockedNames as $name => $expireAt) {

      if (false === $this->unlock($name)) {

        $allSuccess = false

      }

    }

    return $allSuccess;

  }

  以上就是用Redis实现分布式锁的整一套思路和代码实现的总结和分享,这里我附上正一个实现类的代码,代码里我基本上对每一行进行了注释,方便大家快速看懂并且能模拟应用。想要深入了解的请看整个类的代码:

?


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

/**

 *在redis上实现分布式锁

 */

class RedisLock {

  private $redisString;

  private $lockedNames = [];

  public function __construct($param = NULL) {

    $this->redisString = RedisFactory::get($param)->string;

  }

  /**

   * 加锁

   * @param [type] $name      锁的标识名

   * @param integer $timeout    循环获取锁的等待超时时间,在此时间内会一直尝试获取锁直到超时,为0表示失败后直接返回不等待

   * @param integer $expire     当前锁的最大生存时间(秒),必须大于0,如果超过生存时间锁仍未被释放,则系统会自动强制释放

   * @param integer $waitIntervalUs 获取锁失败后挂起再试的时间间隔(微秒)

   * @return [type]         [description]

   */

  public function lock($name, $timeout = 0, $expire = 15, $waitIntervalUs = 100000) {

    if ($name == null) return false;

    //取得当前时间

    $now = time();

    //获取锁失败时的等待超时时刻

    $timeoutAt = $now + $timeout;

    //锁的最大生存时刻

    $expireAt = $now + $expire;

    $redisKey = "Lock:{$name}";

    while (true) {

      //将rediskey的最大生存时刻存到redis里,过了这个时刻该锁会被自动释放

      $result = $this->redisString->setnx($redisKey, $expireAt);

      if ($result != false) {

        //设置key的失效时间

        $this->redisString->expire($redisKey, $expireAt);

        //将锁标志放到lockedNames数组里

        $this->lockedNames[$name] = $expireAt;

        return true;

      }

      //以秒为单位,返回给定key的剩余生存时间

      $ttl = $this->redisString->ttl($redisKey);

      //ttl小于0 表示key上没有设置生存时间(key是不会不存在的,因为前面setnx会自动创建)

      //如果出现这种状况,那就是进程的某个实例setnx成功后 crash 导致紧跟着的expire没有被调用

      //这时可以直接设置expire并把锁纳为己用

      if ($ttl < 0) {

        $this->redisString->set($redisKey, $expireAt);

        $this->lockedNames[$name] = $expireAt;

        return true;

      }

      /*****循环请求锁部分*****/

      //如果没设置锁失败的等待时间 或者 已超过最大等待时间了,那就退出

      if ($timeout <= 0 || $timeoutAt < microtime(true)) break;

      //隔 $waitIntervalUs 后继续 请求

      usleep($waitIntervalUs);

    }

    return false;

  }

  /**

   * 解锁

   * @param [type] $name [description]

   * @return [type]    [description]

   */

  public function unlock($name) {

    //先判断是否存在此锁

    if ($this->isLocking($name)) {

      //删除锁

      if ($this->redisString->deleteKey("Lock:$name")) {

        //清掉lockedNames里的锁标志

        unset($this->lockedNames[$name]);

        return true;

      }

    }

    return false;

  }

  /**

   * 释放当前所有获得的锁

   * @return [type] [description]

   */

  public function unlockAll() {

    //此标志是用来标志是否释放所有锁成功

    $allSuccess = true;

    foreach ($this->lockedNames as $name => $expireAt) {

      if (false === $this->unlock($name)) {

        $allSuccess = false

      }

    }

    return $allSuccess;

  }

  /**

   * 给当前所增加指定生存时间,必须大于0

   * @param [type] $name [description]

   * @return [type]    [description]

   */

  public function expire($name, $expire) {

    //先判断是否存在该锁

    if ($this->isLocking($name)) {

      //所指定的生存时间必须大于0

      $expire = max($expire, 1);

      //增加锁生存时间

      if ($this->redisString->expire("Lock:$name", $expire)) {

        return true;

      }

    }

    return false;

  }

  /**

   * 判断当前是否拥有指定名字的所

   * @param [type] $name [description]

   * @return boolean    [description]

   */

  public function isLocking($name) {

    //先看lonkedName[$name]是否存在该锁标志名

    if (isset($this->lockedNames[$name])) {

      //从redis返回该锁的生存时间

      return (string)$this->lockedNames[$name] = (string)$this->redisString->get("Lock:$name");

    }

    return false;

  }

}

(二)用Redis实现任务队列的代码分析

(1)任务队列,用于将业务逻辑中可以异步处理的操作放入队列中,在其他线程中处理后出队

(2)队列中使用了分布式锁和其他逻辑,保证入队和出队的一致性

(3)这个队列和普通队列不一样,入队时的id是用来区分重复入队的,队列里面只会有一条记录,同一个id后入的覆盖前入的,而不是追加, 如果需求要求重复入队当做不用的任务,请使用不同的id区分

  先看入队的代码分析:首先当然是对参数的合法性检测,接着就用到上面加锁机制的内容了,就是开始加锁,入队时我这里选择当前时间戳作为score,接着就是入队了,使用的是zset数据结构的add()方法,入队完成后,就对该任务解锁,即完成了一个入队的操作。

?


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

/**

   * 入队一个 Task

   * @param [type] $name     队列名称

   * @param [type] $id      任务id(或者其数组)

   * @param integer $timeout    入队超时时间(秒)

   * @param integer $afterInterval [description]

   * @return [type]         [description]

   */

  public function enqueue($name, $id, $timeout = 10, $afterInterval = 0) {

    //合法性检测

    if (empty($name) || empty($id) || $timeout <= 0) return false;

    //加锁

    if (!$this->_redis->lock->lock("Queue:{$name}", $timeout)) {

      Logger::get(‘queue‘)->error("enqueue faild becouse of lock failure: name = $name, id = $id");

      return false;

    }

    

    //入队时以当前时间戳作为 score

    $score = microtime(true) + $afterInterval;

    //入队

    foreach ((array)$id as $item) {

      //先判断下是否已经存在该id了

      if (false === $this->_redis->zset->getScore("Queue:$name", $item)) {

        $this->_redis->zset->add("Queue:$name", $score, $item);

      }

    }

    

    //解锁

    $this->_redis->lock->unlock("Queue:$name");

    return true;

  }

  接着来看一下出队的代码分析:出队一个Task,需要指定它的$id 和
$score,如果$score与队列中的匹配则出队,否则认为该Task已被重新入队过,当前操作按失败处理。首先和对参数进行合法性检测,接着又用到加锁的功能了,然后及时出队了,先使用getScore()从Redis里获取到该id的score,然后将传入的$score和Redis里存储的score进行对比,如果两者相等就进行出队操作,也就是使用zset里的delete()方法删掉该任务id,最后当前就是解锁了。这就是出队的代码分析。

?


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

/**

   * 出队一个Task,需要指定$id 和 $score

   * 如果$score 与队列中的匹配则出队,否则认为该Task已被重新入队过,当前操作按失败处理

   *

   * @param [type] $name  队列名称

   * @param [type] $id   任务标识

   * @param [type] $score  任务对应score,从队列中获取任务时会返回一个score,只有$score和队列中的值匹配时Task才会被出队

   * @param integer $timeout 超时时间(秒)

   * @return [type]      Task是否成功,返回false可能是redis操作失败,也有可能是$score与队列中的值不匹配(这表示该Task自从获取到本地之后被其他线程入队过)

   */

  public function dequeue($name, $id, $score, $timeout = 10) {

    //合法性检测

    if (empty($name) || empty($id) || empty($score)) return false;

    

    //加锁

    if (!$this->_redis->lock->lock("Queue:$name", $timeout)) {

      Logger:get(‘queue‘)->error("dequeue faild becouse of lock lailure:name=$name, id = $id");

      return false;

    }

    

    //出队

    //先取出redis的score

    $serverScore = $this->_redis->zset->getScore("Queue:$name", $id);

    $result = false;

    //先判断传进来的score和redis的score是否是一样

    if ($serverScore == $score) {

      //删掉该$id

      $result = (float)$this->_redis->zset->delete("Queue:$name", $id);

      if ($result == false) {

        Logger::get(‘queue‘)->error("dequeue faild because of redis delete failure: name =$name, id = $id");

      }

    }

    //解锁

    $this->_redis->lock->unlock("Queue:$name");

    return $result;

  }

  学过数据结构这门课的朋友都应该知道,队列操作还有弹出顶部某个值的方法等等,这里处理入队出队操作,我还实现了 获取队列顶部若干个Task 并将其出队的方法,想了解的朋友可以看这段代码,假如看不太明白就留言,这里我不再对其进行分析了。

?


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

/**

   * 获取队列顶部若干个Task 并将其出队

   * @param [type] $name  队列名称

   * @param integer $count  数量

   * @param integer $timeout 超时时间

   * @return [type]      返回数组[0=>[‘id‘=> , ‘score‘=> ], 1=>[‘id‘=> , ‘score‘=> ], 2=>[‘id‘=> , ‘score‘=> ]]

   */

  public function pop($name, $count = 1, $timeout = 10) {

    //合法性检测

    if (empty($name) || $count <= 0) return [];

    

    //加锁

    if (!$this->_redis->lock->lock("Queue:$name")) {

      Log::get(‘queue‘)->error("pop faild because of pop failure: name = $name, count = $count");

      return false;

    }

    

    //取出若干的Task

    $result = [];

    $array = $this->_redis->zset->getByScore("Queue:$name", false, microtime(true), true, false, [0, $count]);

    //将其放在$result数组里 并 删除掉redis对应的id

    foreach ($array as $id => $score) {

      $result[] = [‘id‘=>$id, ‘score‘=>$score];

      $this->_redis->zset->delete("Queue:$name", $id);

    }

    //解锁

    $this->_redis->lock->unlock("Queue:$name");

    return $count == 1 ? (empty($result) ? false : $result[0]) : $result;

  }

  以上就是用Redis实现任务队列的整一套思路和代码实现的总结和分享,这里我附上正一个实现类的代码,代码里我基本上对每一行进行了注释,方便大家快速看懂并且能模拟应用。想要深入了解的请看整个类的代码:

?


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

/**

 * 任务队列

 *

 */

class RedisQueue {

  private $_redis;

  public function __construct($param = null) {

    $this->_redis = RedisFactory::get($param);

  }

  /**

   * 入队一个 Task

   * @param [type] $name     队列名称

   * @param [type] $id      任务id(或者其数组)

   * @param integer $timeout    入队超时时间(秒)

   * @param integer $afterInterval [description]

   * @return [type]         [description]

   */

  public function enqueue($name, $id, $timeout = 10, $afterInterval = 0) {

    //合法性检测

    if (empty($name) || empty($id) || $timeout <= 0) return false;

    //加锁

    if (!$this->_redis->lock->lock("Queue:{$name}", $timeout)) {

      Logger::get(‘queue‘)->error("enqueue faild becouse of lock failure: name = $name, id = $id");

      return false;

    }

    

    //入队时以当前时间戳作为 score

    $score = microtime(true) + $afterInterval;

    //入队

    foreach ((array)$id as $item) {

      //先判断下是否已经存在该id了

      if (false === $this->_redis->zset->getScore("Queue:$name", $item)) {

        $this->_redis->zset->add("Queue:$name", $score, $item);

      }

    }

    

    //解锁

    $this->_redis->lock->unlock("Queue:$name");

    return true;

  }

  /**

   * 出队一个Task,需要指定$id 和 $score

   * 如果$score 与队列中的匹配则出队,否则认为该Task已被重新入队过,当前操作按失败处理

   *

   * @param [type] $name  队列名称

   * @param [type] $id   任务标识

   * @param [type] $score  任务对应score,从队列中获取任务时会返回一个score,只有$score和队列中的值匹配时Task才会被出队

   * @param integer $timeout 超时时间(秒)

   * @return [type]      Task是否成功,返回false可能是redis操作失败,也有可能是$score与队列中的值不匹配(这表示该Task自从获取到本地之后被其他线程入队过)

   */

  public function dequeue($name, $id, $score, $timeout = 10) {

    //合法性检测

    if (empty($name) || empty($id) || empty($score)) return false;

    

    //加锁

    if (!$this->_redis->lock->lock("Queue:$name", $timeout)) {

      Logger:get(‘queue‘)->error("dequeue faild becouse of lock lailure:name=$name, id = $id");

      return false;

    }

    

    //出队

    //先取出redis的score

    $serverScore = $this->_redis->zset->getScore("Queue:$name", $id);

    $result = false;

    //先判断传进来的score和redis的score是否是一样

    if ($serverScore == $score) {

      //删掉该$id

      $result = (float)$this->_redis->zset->delete("Queue:$name", $id);

      if ($result == false) {

        Logger::get(‘queue‘)->error("dequeue faild because of redis delete failure: name =$name, id = $id");

      }

    }

    //解锁

    $this->_redis->lock->unlock("Queue:$name");

    return $result;

  }

  /**

   * 获取队列顶部若干个Task 并将其出队

   * @param [type] $name  队列名称

   * @param integer $count  数量

   * @param integer $timeout 超时时间

   * @return [type]      返回数组[0=>[‘id‘=> , ‘score‘=> ], 1=>[‘id‘=> , ‘score‘=> ], 2=>[‘id‘=> , ‘score‘=> ]]

   */

  public function pop($name, $count = 1, $timeout = 10) {

    //合法性检测

    if (empty($name) || $count <= 0) return [];

    

    //加锁

    if (!$this->_redis->lock->lock("Queue:$name")) {

      Logger::get(‘queue‘)->error("pop faild because of pop failure: name = $name, count = $count");

      return false;

    }

    

    //取出若干的Task

    $result = [];

    $array = $this->_redis->zset->getByScore("Queue:$name", false, microtime(true), true, false, [0, $count]);

    //将其放在$result数组里 并 删除掉redis对应的id

    foreach ($array as $id => $score) {

      $result[] = [‘id‘=>$id, ‘score‘=>$score];

      $this->_redis->zset->delete("Queue:$name", $id);

    }

    //解锁

    $this->_redis->lock->unlock("Queue:$name");

    return $count == 1 ? (empty($result) ? false : $result[0]) : $result;

  }

  /**

   * 获取队列顶部的若干个Task

   * @param [type] $name 队列名称

   * @param integer $count 数量

   * @return [type]     返回数组[0=>[‘id‘=> , ‘score‘=> ], 1=>[‘id‘=> , ‘score‘=> ], 2=>[‘id‘=> , ‘score‘=> ]]

   */

  public function top($name, $count = 1) {

    //合法性检测

    if (empty($name) || $count < 1) return [];

    //取错若干个Task

    $result = [];

    $array = $this->_redis->zset->getByScore("Queue:$name", false, microtime(true), true, false, [0, $count]);

    

    //将Task存放在数组里

    foreach ($array as $id => $score) {

      $result[] = [‘id‘=>$id, ‘score‘=>$score];

    }

    //返回数组

    return $count == 1 ? (empty($result) ? false : $result[0]) : $result;   

  }

}

  到此,这两大块功能基本讲解完毕,对于任务队列,你可以写一个shell脚本,让服务器定时运行某些程序,实现入队出队等操作,这里我就不在将其与实际应用结合起来去实现了,大家理解好这两大功能的实现思路即可,由于代码用的是PHP语言来写的,如果你理解了实现思路,你完全可以使用java或者是.net等等其他语言去实现这两个功能。这两大功能的应用场景十分多,特别是秒杀,另一个就是春运抢火车票,这两个是最鲜明的例子了。当然还有很多地方用到,这里我不再一一列举。

  好了,本次总结和分享到此完毕。最后我附上分布式锁和任务队列这两个类:

?


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

169

170

171

172

173

174

175

176

177

178

179

180

181

182

183

184

185

186

187

188

189

190

191

192

193

194

195

196

197

198

199

200

201

202

203

204

205

206

207

208

209

210

211

212

213

214

215

216

217

218

219

220

221

222

223

224

225

226

227

228

229

230

231

232

233

234

235

236

237

238

239

240

241

242

243

244

245

246

247

248

249

250

251

252

253

254

255

256

257

258

259

260

261

262

263

264

265

266

267

268

269

270

271

272

273

/**

 *在redis上实现分布式锁

 */

class RedisLock {

  private $redisString;

  private $lockedNames = [];

  public function __construct($param = NULL) {

    $this->redisString = RedisFactory::get($param)->string;

  }

  /**

   * 加锁

   * @param [type] $name      锁的标识名

   * @param integer $timeout    循环获取锁的等待超时时间,在此时间内会一直尝试获取锁直到超时,为0表示失败后直接返回不等待

   * @param integer $expire     当前锁的最大生存时间(秒),必须大于0,如果超过生存时间锁仍未被释放,则系统会自动强制释放

   * @param integer $waitIntervalUs 获取锁失败后挂起再试的时间间隔(微秒)

   * @return [type]         [description]

   */

  public function lock($name, $timeout = 0, $expire = 15, $waitIntervalUs = 100000) {

    if ($name == null) return false;

    //取得当前时间

    $now = time();

    //获取锁失败时的等待超时时刻

    $timeoutAt = $now + $timeout;

    //锁的最大生存时刻

    $expireAt = $now + $expire;

    $redisKey = "Lock:{$name}";

    while (true) {

      //将rediskey的最大生存时刻存到redis里,过了这个时刻该锁会被自动释放

      $result = $this->redisString->setnx($redisKey, $expireAt);

      if ($result != false) {

        //设置key的失效时间

        $this->redisString->expire($redisKey, $expireAt);

        //将锁标志放到lockedNames数组里

        $this->lockedNames[$name] = $expireAt;

        return true;

      }

      //以秒为单位,返回给定key的剩余生存时间

      $ttl = $this->redisString->ttl($redisKey);

      //ttl小于0 表示key上没有设置生存时间(key是不会不存在的,因为前面setnx会自动创建)

      //如果出现这种状况,那就是进程的某个实例setnx成功后 crash 导致紧跟着的expire没有被调用

      //这时可以直接设置expire并把锁纳为己用

      if ($ttl < 0) {

        $this->redisString->set($redisKey, $expireAt);

        $this->lockedNames[$name] = $expireAt;

        return true;

      }

      /*****循环请求锁部分*****/

      //如果没设置锁失败的等待时间 或者 已超过最大等待时间了,那就退出

      if ($timeout <= 0 || $timeoutAt < microtime(true)) break;

      //隔 $waitIntervalUs 后继续 请求

      usleep($waitIntervalUs);

    }

    return false;

  }

  /**

   * 解锁

   * @param [type] $name [description]

   * @return [type]    [description]

   */

  public function unlock($name) {

    //先判断是否存在此锁

    if ($this->isLocking($name)) {

      //删除锁

      if ($this->redisString->deleteKey("Lock:$name")) {

        //清掉lockedNames里的锁标志

        unset($this->lockedNames[$name]);

        return true;

      }

    }

    return false;

  }

  /**

   * 释放当前所有获得的锁

   * @return [type] [description]

   */

  public function unlockAll() {

    //此标志是用来标志是否释放所有锁成功

    $allSuccess = true;

    foreach ($this->lockedNames as $name => $expireAt) {

      if (false === $this->unlock($name)) {

        $allSuccess = false

      }

    }

    return $allSuccess;

  }

  /**

   * 给当前所增加指定生存时间,必须大于0

   * @param [type] $name [description]

   * @return [type]    [description]

   */

  public function expire($name, $expire) {

    //先判断是否存在该锁

    if ($this->isLocking($name)) {

      //所指定的生存时间必须大于0

      $expire = max($expire, 1);

      //增加锁生存时间

      if ($this->redisString->expire("Lock:$name", $expire)) {

        return true;

      }

    }

    return false;

  }

  /**

   * 判断当前是否拥有指定名字的所

   * @param [type] $name [description]

   * @return boolean    [description]

   */

  public function isLocking($name) {

    //先看lonkedName[$name]是否存在该锁标志名

    if (isset($this->lockedNames[$name])) {

      //从redis返回该锁的生存时间

      return (string)$this->lockedNames[$name] = (string)$this->redisString->get("Lock:$name");

    }

    return false;

  }

}

/**

 * 任务队列

 */

class RedisQueue {

  private $_redis;

  public function __construct($param = null) {

    $this->_redis = RedisFactory::get($param);

  }

  /**

   * 入队一个 Task

   * @param [type] $name     队列名称

   * @param [type] $id      任务id(或者其数组)

   * @param integer $timeout    入队超时时间(秒)

   * @param integer $afterInterval [description]

   * @return [type]         [description]

   */

  public function enqueue($name, $id, $timeout = 10, $afterInterval = 0) {

    //合法性检测

    if (empty($name) || empty($id) || $timeout <= 0) return false;

    //加锁

    if (!$this->_redis->lock->lock("Queue:{$name}", $timeout)) {

      Logger::get(‘queue‘)->error("enqueue faild becouse of lock failure: name = $name, id = $id");

      return false;

    }

    

    //入队时以当前时间戳作为 score

    $score = microtime(true) + $afterInterval;

    //入队

    foreach ((array)$id as $item) {

      //先判断下是否已经存在该id了

      if (false === $this->_redis->zset->getScore("Queue:$name", $item)) {

        $this->_redis->zset->add("Queue:$name", $score, $item);

      }

    }

    

    //解锁

    $this->_redis->lock->unlock("Queue:$name");

    return true;

  }

  /**

   * 出队一个Task,需要指定$id 和 $score

   * 如果$score 与队列中的匹配则出队,否则认为该Task已被重新入队过,当前操作按失败处理

   *

   * @param [type] $name  队列名称

   * @param [type] $id   任务标识

   * @param [type] $score  任务对应score,从队列中获取任务时会返回一个score,只有$score和队列中的值匹配时Task才会被出队

   * @param integer $timeout 超时时间(秒)

   * @return [type]      Task是否成功,返回false可能是redis操作失败,也有可能是$score与队列中的值不匹配(这表示该Task自从获取到本地之后被其他线程入队过)

   */

  public function dequeue($name, $id, $score, $timeout = 10) {

    //合法性检测

    if (empty($name) || empty($id) || empty($score)) return false;

    

    //加锁

    if (!$this->_redis->lock->lock("Queue:$name", $timeout)) {

      Logger:get(‘queue‘)->error("dequeue faild becouse of lock lailure:name=$name, id = $id");

      return false;

    }

    

    //出队

    //先取出redis的score

    $serverScore = $this->_redis->zset->getScore("Queue:$name", $id);

    $result = false;

    //先判断传进来的score和redis的score是否是一样

    if ($serverScore == $score) {

      //删掉该$id

      $result = (float)$this->_redis->zset->delete("Queue:$name", $id);

      if ($result == false) {

        Logger::get(‘queue‘)->error("dequeue faild because of redis delete failure: name =$name, id = $id");

      }

    }

    //解锁

    $this->_redis->lock->unlock("Queue:$name");

    return $result;

  }

  /**

   * 获取队列顶部若干个Task 并将其出队

   * @param [type] $name  队列名称

   * @param integer $count  数量

   * @param integer $timeout 超时时间

   * @return [type]      返回数组[0=>[‘id‘=> , ‘score‘=> ], 1=>[‘id‘=> , ‘score‘=> ], 2=>[‘id‘=> , ‘score‘=> ]]

   */

  public function pop($name, $count = 1, $timeout = 10) {

    //合法性检测

    if (empty($name) || $count <= 0) return [];

    

    //加锁

    if (!$this->_redis->lock->lock("Queue:$name")) {

      Logger::get(‘queue‘)->error("pop faild because of pop failure: name = $name, count = $count");

      return false;

    }

    

    //取出若干的Task

    $result = [];

    $array = $this->_redis->zset->getByScore("Queue:$name", false, microtime(true), true, false, [0, $count]);

    //将其放在$result数组里 并 删除掉redis对应的id

    foreach ($array as $id => $score) {

      $result[] = [‘id‘=>$id, ‘score‘=>$score];

      $this->_redis->zset->delete("Queue:$name", $id);

    }

    //解锁

    $this->_redis->lock->unlock("Queue:$name");

    return $count == 1 ? (empty($result) ? false : $result[0]) : $result;

  }

  /**

   * 获取队列顶部的若干个Task

   * @param [type] $name 队列名称

   * @param integer $count 数量

   * @return [type]     返回数组[0=>[‘id‘=> , ‘score‘=> ], 1=>[‘id‘=> , ‘score‘=> ], 2=>[‘id‘=> , ‘score‘=> ]]

   */

  public function top($name, $count = 1) {

    //合法性检测

    if (empty($name) || $count < 1) return [];

    //取错若干个Task

    $result = [];

    $array = $this->_redis->zset->getByScore("Queue:$name", false, microtime(true), true, false, [0, $count]);

    

    //将Task存放在数组里

    foreach ($array as $id => $score) {

      $result[] = [‘id‘=>$id, ‘score‘=>$score];

    }

    //返回数组

    return $count == 1 ? (empty($result) ? false : $result[0]) : $result;   

  }

}

以上就是本文的全部内容,希望对大家的学习有所帮助。

原文地址:https://www.cnblogs.com/jimcsharp/p/8576986.html

时间: 2024-08-30 10:34:52

基于Redis实现分布式锁以及任务队列的相关文章

基于Redis的分布式锁到底安全吗(上)?

网上有关Redis分布式锁的文章可谓多如牛毛了,不信的话你可以拿关键词"Redis 分布式锁"随便到哪个搜索引擎上去搜索一下就知道了.这些文章的思路大体相近,给出的实现算法也看似合乎逻辑,但当我们着手去实现它们的时候,却发现如果你越是仔细推敲,疑虑也就越来越多. 实际上,大概在一年以前,关于Redis分布式锁的安全性问题,在分布式系统专家Martin Kleppmann和Redis的作者antirez之间就发生过一场争论.由于对这个问题一直以来比较关注,所以我前些日子仔细阅读了与这场争

基于redis的分布式锁

<?php /** * 基于redis的分布式锁 * * 参考开源代码: * http://nleach.com/post/31299575840/redis-mutex-in-php * * https://gist.github.com/nickyleach/3694555 */ pc_base::load_sys_class('cache_redis', '', 0); class dist_key_redis { //锁的超时时间 const TIMEOUT = 20; const SL

转载:基于Redis实现分布式锁

转载:基于Redis实现分布式锁  ,出处: http://blog.csdn.net/ugg/article/details/41894947 背景在很多互联网产品应用中,有些场景需要加锁处理,比如:秒杀,全局递增ID,楼层生成等等.大部分的解决方案是基于DB实现的,Redis为单进程单线程模式,采用队列模式将并发访问变成串行访问,且多客户端对Redis的连接并不存在竞争关系.其次Redis提供一些命令SETNX,GETSET,可以方便实现分布式锁机制. Redis命令介绍使用Redis实现分

基于redis的分布式锁(不适合用于生产环境)

基于redis的分布式锁 1 介绍 这篇博文讲介绍如何一步步构建一个基于Redis的分布式锁.会从最原始的版本开始,然后根据问题进行调整,最后完成一个较为合理的分布式锁. 本篇文章会将分布式锁的实现分为两部分,一个是单机环境,另一个是集群环境下的Redis锁实现.在介绍分布式锁的实现之前,先来了解下分布式锁的一些信息. 2 分布式锁 2.1 什么是分布式锁? 分布式锁是控制分布式系统或不同系统之间共同访问共享资源的一种锁实现,如果不同的系统或同一个系统的不同主机之间共享了某个资源时,往往需要互斥

基于redis的分布式锁实现

关于分布式锁 很久之前有讲过并发编程中的锁并发编程的锁机制:synchronized和lock.在单进程的系统中,当存在多个线程可以同时改变某个变量时,就需要对变量或代码块做同步,使其在修改这种变量时能够线性执行消除并发修改变量.而同步的本质是通过锁来实现的.为了实现多个线程在一个时刻同一个代码块只能有一个线程可执行,那么需要在某个地方做个标记,这个标记必须每个线程都能看到,当标记不存在时可以设置该标记,其余后续线程发现已经有标记了则等待拥有标记的线程结束同步代码块取消标记后再去尝试设置标记.

[Redis] 基于redis的分布式锁

前言分布式锁一般有三种实现方式:1. 数据库乐观锁:2. 基于Redis的分布式锁:3. 基于ZooKeeper的分布式锁.本篇博客将介绍第二种方式,基于Redis实现分布式锁. 可靠性首先,为了确保分布式锁可用,我们至少要确保锁的实现同时满足以下四个条件: 互斥性.在任意时刻,只有一个客户端能持有锁.不会发生死锁.即使有一个客户端在持有锁的期间崩溃而没有主动解锁,也能保证后续其他客户端能加锁.具有容错性.只要大部分的Redis节点正常运行,客户端就可以加锁和解锁.解铃还须系铃人.加锁和解锁必须

python基于redis实现分布式锁

阅读目录 什么事分布式锁 基于redis实现分布式锁 一.什么是分布式锁 我们在开发应用的时候,如果需要对某一个共享变量进行多线程同步访问的时候,可以使用我们学到的锁进行处理,并且可以完美的运行,毫无Bug! 注意这是单机应用,后来业务发展,需要做集群,一个应用需要部署到几台机器上然后做负载均衡,大致如下图: 上图可以看到,变量A存在三个服务器内存中(这个变量A主要体现是在一个类中的一个成员变量,是一个有状态的对象),如果不加任何控制的话,变量A同时都会在分配一块内存,三个请求发过来同时对这个变

基于redis的分布式锁的分析与实践

转:https://my.oschina.net/wnjustdoit/blog/1606215 前言:在分布式环境中,我们经常使用锁来进行并发控制,锁可分为乐观锁和悲观锁,基于数据库版本戳的实现是乐观锁,基于redis或zookeeper的实现可认为是悲观锁了.乐观锁和悲观锁最根本的区别在于线程之间是否相互阻塞. 那么,本文主要来讨论基于redis的分布式锁算法问题. 从2.6.12版本开始,redis为SET命令增加了一系列选项(SET key value [EX seconds] [PX

基于Redis的分布式锁和Redlock算法

1 前言 前面写了4篇Redis底层实现和工程架构相关文章,感兴趣的读者可以回顾一下: Redis面试热点之底层实现篇-1 Redis面试热点之底层实现篇-2 Redis面试热点之工程架构篇-1 Redis面试热点之工程架构篇-2 今天开始来和大家一起学习一下Redis实际应用篇,会写几个Redis的常见应用. 在我看来Redis最为典型的应用就是作为分布式缓存系统,其他的一些应用本质上并不是杀手锏功能,是基于Redis支持的数据类型和分布式架构来实现的,属于小而美的应用. 结合笔者的日常工作,