Redis锁构造

单线程与隔离性

Redis是使用单线程的方式来执行事务的,事务以串行的方式运行,也就是说Redis中单个命令的执行和事务的执行都是线程安全的,不会相互影响,具有隔离性。

在多线程编程中,对于共享资源的访问要十分的小心:

import threading

num = 1
lock = threading.Lock()

def change_num():
    global num
    for i in xrange(100000):
        #lock.acquire()
        num += 5
        num -= 5
        #lock.release()

if __name__ == ‘__main__‘:
    pool = [threading.Thread(target=change_num) for i in xrange(5)]
    for t in pool:
        t.start()
    for t in pool:
        t.join()
    print num

在不加锁的情况下,num是不能保持为1的。

而在Redis中,并发执行单个命令具有很好的隔离性:

import redis

conn = redis.StrictRedis(host="localhost", port=6379, db=1)
conn.set(‘num‘, 1)

def change_num(conn):
    for i in xrange(100000):
    ┆   conn.incr(‘num‘, 5)
    ┆   conn.decr(‘num‘, 5)

if __name__ == ‘__main__‘:
    conn_pool = [redis.StrictRedis(host="localhost", port=6379, db=1)
                 for i in xrange(5)]
    t_pool = []
    for conn in conn_pool:
        t = threading.Thread(target=change_num, args=(conn,))
        t_pool.append(t)
    for t in t_pool:
        t.start()
    for t in t_pool:
        t.join()
    print conn.get(‘num‘)

模拟的5个客户端同时对Redis中的num值进行操作,num最终结果会保持为1:

1
real	0m46.463s
user	0m28.748s
sys	0m6.276s

利用Redis中单个操作和事务的原子性可以做很多事情,最简单的就是做全局计数器了。

比如在短信验证码业务中,要限制一个用户在一分钟内只能发送一次,如果使用关系型数据库,需要为每个手机号记录上次发送短信的时间,当用户请求验证码时,取出与当前时间进行对比。

这一情况下,当用户短时间点击多次时,不仅增加了数据库压力,而且还会出现同时查询均符合条件但数据库更新短信发送时间较慢的问题,就会重复发送短信了。

在Redis中解决这一问题就很简单,只需要用手机号作为key创建一个生存期限为一分钟的数值即可。key不存在时能发送短信,存在时则不能发送短信:

def can_send(phone):
    key = "message:" + str(phone)
    if conn.set(key, 0, nx=True, ex=60):
    ┆   return True
    else:
    ┆   return False

至于一些不可名的30分钟内限制访问或者下载5次的功能,将用户ip作为key,值设为次数上限,过期时间设为限制时间,每次用户访问时自减即可:

def can_download(ip):
    key = "ip:" + str(ip)
    conn.set(key, 5, nx=True, ex=600)
    if conn.decr(key) >= 0:
    ┆   return True
    else:
    ┆   return False

Redis基本事务与乐观锁

虽然Redis单个命令具有原子性,但当多个命令并行执行的时候,会有更多的问题。

比如举一个转账的例子,将用户A的钱转给用户B,那么用户A的账户减少需要与B账户的增多同时进行:

import threading
import time

import redis

conn = redis.StrictRedis(host="localhost", port=6379, db=1)
conn.mset(a_num=10, b_num=10)

def a_to_b():
    if int(conn.get(‘a_num‘)) >= 10:
        conn.decr(‘a_num‘, 10)
        time.sleep(.1)
        conn.incr(‘b_num‘, 10)
    print conn.mget(‘a_num‘, "b_num")

def b_to_a():
    if int(conn.get(‘b_num‘)) >= 10:
        conn.decr(‘b_num‘, 10)
        time.sleep(.1)
        conn.incr(‘a_num‘, 10)
    print conn.mget(‘a_num‘, "b_num")

if __name__ == ‘__main__‘:
    pool = [threading.Thread(target=a_to_b) for i in xrange(3)]
    for t in pool:
        t.start()

    pool = [threading.Thread(target=b_to_a) for i in xrange(3)]
    for t in pool:
        t.start()

运行结果:

[‘0‘, ‘10‘]
[‘0‘, ‘10‘]
[‘0‘, ‘0‘]
[‘0‘, ‘0‘]
[‘0‘, ‘10‘]
[‘10‘, ‘10‘]

出现了账户总额变少的情况。虽然是人为的为自增自减命令之间添加了100ms延迟,但在实际并发很高的情况中是很可能出现的,两个命令执行期间执行了其它的语句。

那么现在要保证的是两个增减命令执行期间不受其它命令的干扰,Redis的事务可以达到这一目的。

Redis中,被MULTI命令和EXEC命令包围的所有命令会一个接一个的执行,直到所有命令都执行完毕为止。一个事务完毕后,Redis才会去处理其它的命令。也就是说,Redis事务是具有原子性的。

python中可以用pipeline来创建事务:

def a_to_b():
    if int(conn.get(‘a_num‘)) >= 10:
    ┆   pipeline = conn.pipeline()
    ┆   pipeline.decr(‘a_num‘, 10)
    ┆   time.sleep(.1)
    ┆   pipeline.incr(‘b_num‘, 10)
    ┆   pipeline.execute()
    print conn.mget(‘a_num‘, "b_num")

def b_to_a():
    if int(conn.get(‘b_num‘)) >= 10:
    ┆   pipeline = conn.pipeline()
    ┆   pipeline.decr(‘b_num‘, 10)
    ┆   time.sleep(.1)
    ┆   pipeline.incr(‘a_num‘, 10)
    ┆   pipeline.execute()
    print conn.mget(‘a_num‘, "b_num")

结果:

[‘0‘, ‘20‘]
[‘10‘, ‘10‘]
 [‘-10‘, ‘30‘]
[‘-10‘, ‘30‘]
[‘0‘, ‘20‘]
[‘10‘, ‘10‘]

可以看到,两条语句确实一起执行了,账户总额不会变,但出现了负值的情况。这是因为事务在exec命令被调用之前是不会执行的,所以用读取的数据做判断与事务执行之间就有了时间差,期间实际数据发生了变化。

为了保持数据的一致性,我们还需要用到一个事务命令WATCH。WATCH可以对一个键进行监视,监视后到EXEC命令执行之前,如果被监视的键值发生了变化(替换,更新,删除等),EXEC命令会返回一个错误,而不会真正的执行:

>>> pipeline.watch(‘a_num‘)
True
>>> pipeline.multi()
>>> pipeline.incr(‘a_num‘,10)
StrictPipeline<ConnectionPool<Connection<host=localhost,port=6379,db=1>>>
>>> pipeline.execute()
[20]
>>> pipeline.watch(‘a_num‘)
True
>>> pipeline.incr(‘a_num‘,10) #监视期间改变被监视键的值
30
>>> pipeline.multi()
>>> pipeline.incr(‘a_num‘,10)
StrictPipeline<ConnectionPool<Connection<host=localhost,port=6379,db=1>>>
>>> pipeline.execute()
    raise WatchError("Watched variable changed.")
redis.exceptions.WatchError: Watched variable changed.

现在为代码加上watch:

def a_to_b():
      pipeline = conn.pipeline()
      try:
      ┆   pipeline.watch(‘a_num‘)
      ┆   if int(pipeline.get(‘a_num‘)) < 10:
      ┆   ┆   pipeline.unwatch()
      ┆   ┆   return
      ┆   pipeline.multi()
      ┆   pipeline.decr(‘a_num‘, 10)
      ┆   pipeline.incr(‘b_num‘, 10)
      ┆   pipeline.execute()
      except redis.exceptions.WatchError:
      ┆   pass
      print conn.mget(‘a_num‘, "b_num")

  def b_to_a():
      pipeline = conn.pipeline()
      try:
      ┆   pipeline.watch(‘b_num‘)
      ┆   if int(pipeline.get(‘b_num‘)) < 10:
      ┆   ┆   pipeline.unwatch()
      ┆   ┆   return
      ┆   pipeline.multi()
      ┆   pipeline.decr(‘b_num‘, 10)
      ┆   pipeline.incr(‘a_num‘, 10)
      ┆   pipeline.execute()
      except redis.exceptions.WatchError:
      ┆   pass
      print conn.mget(‘a_num‘, "b_num")

结果:

[‘0‘, ‘20‘]
[‘10‘, ‘10‘]
[‘20‘, ‘0‘]

成功实现了账户转移,但是有三次尝试失败了,如果要尽可能的使每次交易都获得成功,可以加尝试次数或者尝试时间:

def a_to_b():
    pipeline = conn.pipeline()
    end = time.time() + 5
    while time.time() < end:
    ┆   try:
    ┆   ┆   pipeline.watch(‘a_num‘)
    ┆   ┆   if int(pipeline.get(‘a_num‘)) < 10:
    ┆   ┆   ┆   pipeline.unwatch()
    ┆   ┆   ┆   return
    ┆   ┆   pipeline.multi()
    ┆   ┆   pipeline.decr(‘a_num‘, 10)
    ┆   ┆   pipeline.incr(‘b_num‘, 10)
    ┆   ┆   pipeline.execute()
    ┆   ┆   return True
    ┆   except redis.exceptions.WatchError:
    ┆   ┆   pass
    return False

这样,Redis可以使用事务实现类似于锁的机制,但这个机制与关系型数据库的锁有所不同。关系型数据库对被访问的数据行进行加锁时,其它客户端尝试对被加锁数据行进行写入是会被阻塞的。

Redis执行WATCH时并不会对数据进行加锁,如果发现数据已经被其他客户端抢先修改,只会通知执行WATCH命令的客户端,并不会阻止修改,这称之为乐观锁。

用SET()构建锁

用WACTH实现的乐观锁一般情况下是适用的,但存在一个问题,程序会为完成一个执行失败的事务而不断地进行重试。当负载增加的时候,重试次数会上升到一个不可接受的地步。

如果要自己正确的实现锁的话,要避免下面几个情况:

  • 多个进程同时获得了锁
  • 持有锁的进程在释放锁之前崩溃了,而其他进程却不知道
  • 持有锁的进行运行时间过长,锁被自动释放了,进程本身不知道,还会尝试去释放锁

Redis中要实现锁,需要用到一个命令,SET()或者说是SETNX()。SETNX只会在键不存在的情况下为键设置值,现在SET命令在加了NX选项的情况下也能实现这个功能,而且还能设置过期时间,简直就是天生用来构建锁的。

只要以需要加锁的资源名为key设置一个值,要获取锁时,检查这个key存不存在即可。若存在,则资源已被其它进程获取,需要阻塞到其它进程释放,若不存在,则建立key并获取锁:

import time
import uuid

class RedisLock(object):

    def __init__(self, conn, lockname, retry_count=3, timeout=10,):
        self.conn = conn
        self.lockname = ‘lock:‘ + lockname
        self.retry_count = int(retry_count)
        self.timeout = int(timeout)
        self.unique_id = str(uuid.uuid4())

    def acquire(self):
        retry = 0
        while retry < self.retry_count:
            if self.conn.set(lockname, self.unique_id, nx=True, ex=self.timeout):
                return self.unique_id
            retry += 1
            time.sleep(.001)
        return False

    def release(self):
        if self.conn.get(self.lockname) == self.unique_id:
            self.conn.delete(self.lockname)
            return True
        else:
            return False

获取锁的默认尝试次数限制3次,3次获取失败则返回。锁的生存期限默认设为了10s,若不主动释放锁,10s后锁会自动消除。

还保存了获取锁时锁设置的值,当释放锁的时候,会先判断保存的值和当前锁的值是否一样,如果不一样,说明是锁过期被自动释放然后被其它进程获取了。所以锁的值必须保持唯一,以免释放了其它程序获取的锁。

使用锁:

def a_to_b():
    lock = Redlock(conn, ‘a_num‘)
    if not lock.acquire():
    ┆   return False

    pipeline = conn.pipeline()
    try:
    ┆   pipeline.get(‘a_num‘)
    ┆   (a_num,) = pipeline.execute()
    ┆   if int(a_num) < 10:
    ┆   ┆   return False
    ┆   pipeline.decr(‘a_num‘, 10)
    ┆   pipeline.incr(‘b_num‘, 10)
    ┆   pipeline.execute()
    ┆   return True
    finally:
    ┆   lock.release()

释放锁时也可以用Lua脚本来告诉Redis:删除这个key当且仅当这个key存在而且值是我期望的那个值:

    unlock_script = """
    if redis.call("get",KEYS[1]) == ARGV[1] then
    ┆   return redis.call("del",KEYS[1])
    else
    ┆   return 0
    end"""

可以用conn.eval来运行Lua脚本:

    def release(self):
    ┆   self.conn.eval(unlock_script, 1, self.lockname, self.unique_id)

这样,一个Redis单机锁就实现了。我们可以用这个锁来代替WATCH,或者与WACTH同时使用。

实际使用中还要根据业务来决定锁的粒度的问题,是锁住整个结构还是锁住结构中的一小部分。

粒度越大,性能越差,粒度越小,发生死锁的几率越大。

原文地址:https://www.cnblogs.com/linxiyue/p/8185933.html

时间: 2024-08-30 02:08:39

Redis锁构造的相关文章

redis锁 和悲观锁的并发问题

1.在业务流程前后中,用到了redis锁 和 悲观锁两种不同的锁. 2.汇总账单的时候,从库中读取数据,将读取到的实收额也跟着更新,而在收费的时候添加了悲观锁, 在读账单表的时候 用到了 forupdate,但是redis锁那块同样会产生并发,因为redis锁那块在查询库的时候也需要对账单for update,这样可以防止并发,在悲观锁里若还没更新 则redis锁不去执行更新 3.解决方案 有上面的一个在 redis锁中的查询账单表的时候同样 for update,另外一种则是 对与我们业务相关

(实例篇)php 使用redis锁限制并发访问类示例

1.并发访问限制问题 对于一些需要限制同一个用户并发访问的场景,如果用户并发请求多次,而服务器处理没有加锁限制,用户则可以多次请求成功. 例如换领优惠券,如果用户同一时间并发提交换领码,在没有加锁限制的情况下,用户则可以使用同一个换领码同时兑换到多张优惠券. 伪代码如下: if A(可以换领)         B(执行换领)              C(更新为已换领)             D(结束) 如果用户并发提交换领码,都能通过可以换领(A)的判断,因为必须有一个执行换领(B)后,才会

php 使用redis锁限制并发访问类

1.并发访问限制问题 对于一些需要限制同一个用户并发访问的场景,如果用户并发请求多次,而服务器处理没有加锁限制,用户则可以多次请求成功. 例如换领优惠券,如果用户同一时间并发提交换领码,在没有加锁限制的情况下,用户则可以使用同一个换领码同时兑换到多张优惠券. 伪代码如下: if A(可以换领) B(执行换领) C(更新为已换领) D(结束) 如果用户并发提交换领码,都能通过可以换领(A)的判断,因为必须有一个执行换领(B)后,才会更新为已换领(C).因此如果用户在有一个更新为已换领之前,有多少次

redis锁的进化历程

日常工作中总是会有高并发的场景,需要实现锁机制来保证序列性,接下来我们一步一步实现一个 单机Redis下完全可靠的Redis锁(ps: 如果是Redis集群的话,就存在主从切换锁失效的问题,解决这个问题的话就比较麻烦了,这里不做讨论,现有的解决方案有redlock,大家可以看下它的实现原理) Redis锁 第一版(php实现): //加锁 public function lock($key) { $redisConnect = Redis::connection(); $v = $redisCo

利用Redis锁解决高并发问题

这里我们主要利用Redis的setnx的命令来处理高并发. setnx 有两个参数.第一个参数表示键.第二个参数表示值.如果当前键不存在,那么会插入当前键,将第二个参数做为值.返回 1.如果当前键存在,那么会返回0. 创建库存表 CREATE TABLE `storage` ( `id` int(11) unsigned NOT NULL AUTO_INCREMENT, `number` int(11) DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=Inno

php实习redis锁机制

<?php class Redis_lock { public static function getRedis() { $redis = new redis(); $redis->connect('182.254.208.72', 3838, 0); $redis->auth('eh.123'); return $redis; } public static function lock($key, $expire = 60) { if(!$key) { return false; }

分布式定时任务的redis锁实现

一个web项目如果部署为分布式时,平时常见的定时服务在一定的间隔时间内,可能出现多次重复调用的问题.而此时由于是不同容器之间的竞争,因此需要容器级别的锁 Redis为单进程单线程模式,采用队列模式将并发访问变为串行访问.Redis本身没有锁的概念,Redis对于多个客户端连接并不存在竞争.但是可以通过setnx来实现锁 SETNX命令(SET if Not exists) 语法: SETNX key value 功能:将 key 的值设为 value ,当且仅当 key 不存在:若给定的 key

redis锁

拓展 ycgwl-cache RedisStorage /** * 如果不存在则 写入缓存并设置过期时间 * @param key * @param value * @param expire * @return */ public Long setnx(String key, V value,int expire) { Jedis j = null; String svalue = JsonUtils.toJsonString(value); boolean borrowOrOprSucces

redis 锁

import threading import time from redis import Redis class Myredis(Redis): def __init__(self): super(Myredis, self).__init__(host="192.168.31.21", port=6379, db=0, password="a123456") def mysetex(self,name,time,value): print("this