zookeeper适用场景:分布式锁实现

问题导读:
1.zookeeper如何实现分布式锁?
2.什么是羊群效应?
3.zookeeper如何释放锁?

zookeeper应用场景有关于分布式集群配置文件同步问题的描述,设想一下如果有100台机器同时对同一台机器上某个文件进行修改,如何才能保证文本不会被写乱,这就是最简单的分布式锁,本文介绍利用zk实现分布式锁。下面是写锁的实现步骤

分布式写锁
create一个PERSISTENT类型的znode,/Locks/write_lock

  • 客户端创建SEQUENCE|EPHEMERAL类型的znode,名字是lockid开头,创建的znode是/Locks/write_lock/lockid0000000001
  • 调用getChildren()不要设置Watcher获取/Locks/write_lock下的znode列表
  • 判断自己步骤2创建znode是不是znode列表中最小的一个,如果是就代表获得了锁,如果不是往下走
  • 调用exists()判断步骤2自己创建的节点编号小1的znode节点(也就是获取的znode节点列表中最小的znode),并且设置Watcher,如果exists()返回false,执行步骤3
  • 如果exists()返回true,那么等待zk通知,从而在回掉函数里返回执行步骤3

释放锁就是删除znode节点或者断开连接就行

*注意:上面步骤2中getChildren()不设置Watcher的原因是,防止羊群效应,如果getChildren()设置了Watcher,那么集群一抖动都会收到通知。在整个分布式锁的竞争过程中,大量重复运行,并且绝大多数的运行结果都是判断出自己并非是序号最小的节点,从而继续等待下一次通知—,这个显然看起来不怎么科学。客户端无端的接受到过多的和自己不相关的事件通知,这如果在集群规模大的时候,会对Server造成很大的性能影响,并且如果一旦同一时间有多个节点的客户端断开连接,这个时候,服务器就会像其余客户端发送大量的事件通知——这就是所谓的羊群效应。
下面是代码实现

import sys

class GJZookeeper(object):

    ZK_HOST = "localhost:2181"
    ROOT = "/Locks"
    WORKERS_PATH = join(ROOT,"write_lock")
    MASTERS_NUM = 1
    TIMEOUT = 10000

    def __init__(self, verbose = True):
        self.VERBOSE = verbose
        self.masters = []
        self.is_master = False
        self.path = None

        self.zk = ZKClient(self.ZK_HOST, timeout = self.TIMEOUT)
        self.say("login ok!")
        # init
        self.__init_zk()
        # register
        self.register()

    def __init_zk(self):
        """
        create the zookeeper node if not exist
        |--Locks
                |--write_lock
        """
        nodes = (self.ROOT, self.WORKERS_PATH)
        for node in nodes:
            if not self.zk.exists(node):
                try:
                    self.zk.create(node, "")
                except:
                    pass

    def register(self):
        """
        register a node for this worker
        |--Locks
                |--write_lock
                            |--lockid000000000x ==> hostname
        """

        import socket
        hostname = socket.gethostname()
        self.path = self.zk.create(self.WORKERS_PATH + "/lockid", hostname, flags=zookeeper.EPHEMERAL | zookeeper.SEQUENCE)
        self.lockid = basename(self.path)
        self.say("register ok! I‘m %s" % self.path)
        # check who is the master
        self.get_lock()

    def get_lock(self):
        """
        get children znode try to get lock
        """
        @watchmethod
        def watcher(event):
            self.say("child changed, try to get lock again.")
            self.get_lock()

        children = self.zk.get_children(self.WORKERS_PATH)
        children.sort()
        min_lock_id = children[0]
        self.say("%s‘s children: %s" % (self.WORKERS_PATH, children))
        if cmp(self.lockid,min_lock_id) == 0:
            self.get_lock_success()
            return True
        elif cmp(self.lockid,min_lock_id) > 0:
            index = children.index(self.lockid)
            new_lockid_watch = join(self.WORKERS_PATH,children[index-1])
            self.say("Add watch on %s"%new_lockid_watch)
            res = self.zk.exists(new_lockid_watch,watcher)
            if not res :
                """代表没有存在之前小的锁,但是这并不意味着能拿到锁了,因为还可能有比当前还小的锁,还没轮到,要重新执行一遍"""
#               self.get_lock_success()
                return False
            else :
                """现在的锁有人在使用,等他释放了再抢"""
                self.say("I can not get the lock this time,wait for the next time")
                return False

    def get_lock_success(self):
        self.say("I get the lock !!!")
        self.write_file()
        self.zk.delete(join(self.WORKERS_PATH,self.lockid))
        self.say("I release the lock !!!")
        sys.exit(1)

    def write_file(self):
        fd = open("lock.log",‘a‘)
        fd.write("%s\n"%self.lockid)
        fd.close()

    def say(self, msg):
        """
        print messages to screen
        """
        if self.VERBOSE:
            if self.path:
                log.info("[ %s(%s) ] %s" % (self.path, "master" if self.is_master else "slave", msg))
            else:
                log.info(msg)

def start_get_lock():
    gj_zookeeper = GJZookeeper()

def main():
    th1 = threading.Thread(target = start_get_lock, name = "thread_1", args = ())
    th1.start()
    th1.join()

if __name__ == "__main__":
    main()
    time.sleep(1000)

  文章转自:http://www.aboutyun.com/forum.php?mod=viewthread&tid=9267&ctid=16

时间: 2024-10-11 21:19:26

zookeeper适用场景:分布式锁实现的相关文章

整理分布式锁:业务场景&分布式锁家族&实现原理

1.引入业务场景 业务场景一出现: 因为小T刚接手项目,正在吭哧吭哧对熟悉着代码.部署架构.在看代码过程中发现,下单这块代码可能会出现问题,这可是分布式部署的,如果多个用户同时购买同一个商品,就可能导致商品出现 库存超卖 (数据不一致) 现象,对于这种情况代码中并没有做任何控制. 原来一问才知道,以前他们都是售卖的虚拟商品,没啥库存一说,所以当时没有考虑那么多... 这次不一样啊,这次是售卖的实体商品,那就有库存这么一说了,起码要保证不能超过库存设定的数量吧. 小T大眼对着屏幕,屏住呼吸,还好提

基于zookeeper简单实现分布式锁

这里利用zookeeper的EPHEMERAL_SEQUENTIAL类型节点及watcher机制.来简单实现分布式锁. 主要思想: 1.开启10个线程.在disLocks节点下各自创建名为sub的EPHEMERAL_SEQUENTIAL节点. 2.获取disLocks节点下全部子节点,排序,假设自己的节点编号最小,则获取锁: 3.否则watch排在自己前面的节点,监听到其删除后,进入第2步(又一次检測排序是防止监听的节点发生连接失效.导致的节点删除情况): 4.删除自身sub节点,释放连接: 这

[ZooKeeper.net] 3 ZooKeeper的分布式锁

基于ZooKeeper的分布式锁  源码分享:http://pan.baidu.com/s/1miQCDKk ZooKeeper 里实现分布式锁的基本逻辑: 1.zookeeper中创建一个根节点(Locks),用于后续各个客户端的锁操作. 2.想要获取锁的client都在Locks中创建一个自增序的子节点,每个client得到一个序号,如果自己的序号是最小的则获得锁. 3.如果没有得到锁,就监控排在自己前面的序号节点,并且设置默认时间,等待它的释放. 4.业务操作后释放锁,然后监控自己的节点的

SpringBoot实战实现分布式锁一之重现多线程高并发场景

实战前言:上篇博文我总体介绍了我这套视频课程:"SpringBoot实战实现分布式锁" 总体涉及的内容,从本篇文章开始,我将开始介绍其中涉及到的相关知识要点,感兴趣的小伙伴可以关注关注学习学习!!工欲善其事,必先利其器,介绍分布式锁使用的前因后果之前,得先想办法说清楚为啥需要分布式锁以及如何才需要将分布式锁搬上用场!!其中,该课程的学习链接:http://edu.51cto.com/course/15684.html感兴趣的童鞋可以前往观看学习!!! 实战概要:故而此文将介绍一下分布式

zookeeper 实现分布式锁安全用法

背景 ConnectionLoss 链接丢失 SessionExpired 会话过期 绕开 zookeeper broker 进行状态通知 leader 选举与zkNode 断开 做好幂等 静态扩容.动态扩容 背景 分布式锁现在用的越来越多,通常用来协调多个并发任务.在一般的应用场景中存在一定的不安全用法,不安全用法会带来多个master在并行执行,业务或数据可能存在重复计算带来的副作用,在没有拿到lock的情况下扮演者master等诸如此类. 要想准确的拿到分布式锁,并且准确的捕获在分布式情况

分布式锁的几种使用方式(redis、zookeeper、数据库)

Q:一个业务服务器,一个数据库,操作:查询用户当前余额,扣除当前余额的3%作为手续费synchronizedlockdb lockQ:两个业务服务器,一个数据库,操作:查询用户当前余额,扣除当前余额的3%作为手续费分布式锁我们需要怎么样的分布式锁?可以保证在分布式部署的应用集群中,同一个方法在同一时间只能被一台机器上的一个线程执行. 这把锁要是一把可重入锁(避免死锁) 这把锁最好是一把阻塞锁(根据业务需求考虑要不要这条) 这把锁最好是一把公平锁(根据业务需求考虑要不要这条) 有高可用的获取锁和释

关于分布式锁原理的一些学习与思考-redis分布式锁,zookeeper分布式锁

首先分布式锁和我们平常讲到的锁原理基本一样,目的就是确保,在多个线程并发时,只有一个线程在同一刻操作这个业务或者说方法.变量. 在一个进程中,也就是一个jvm 或者说应用中,我们很容易去处理控制,在jdk java.util 并发包中已经为我们提供了这些方法去加锁, 比如synchronized 关键字 或者Lock 锁,都可以处理. 但是我们现在的应用程序如果只部署一台服务器,那并发量是很差的,如果同时有上万的请求那么很有可能造成服务器压力过大,而瘫痪. 想想双十一 和 三十晚上十点分支付宝红

基于ZooKeeper的分布式锁和队列

在分布式系统中,往往需要一些分布式同步原语来做一些协同工作,上一篇文章介绍了Zookeeper的基本原理,本文介绍下基于Zookeeper的Lock和Queue的实现,主要代码都来自Zookeeper的官方recipe. 锁(Lock) 完全分布式锁是全局同步的,这意味着在任何时刻没有两个客户端会同时认为它们都拥有相同的锁,使用 Zookeeper 可以实现分布式锁,需要首先定义一个锁节点(lock root node). 需要获得锁的客户端按照以下步骤来获取锁: 保证锁节点(lock root

[转载] zookeeper 分布式锁服务

转载自http://www.cnblogs.com/shanyou/archive/2012/09/22/2697818.html 分布式锁服务在大家的项目中或许用的不多,因为大家都把排他放在数据库那一层来挡.当大量的行锁.表锁.事务充斥着数据库的时候.一般web应用很多的瓶颈都在数据库上,这里给大家介绍的是减轻数据库锁负担的一种方案,使用zookeeper分布式锁服务. zookeeper是hadoop下面的一个子项目, 用来协调跟hadoop相关的一些分布式的框架, 如hadoop, hiv