基于zookeeper分布式全局序列分布式锁详解

InterProcessMutex 类详解步骤:获取锁的过程步骤:

1.acquire方法,根据当前线程获取锁对象,判断当前的线程是否已经获取锁,此处则代表可重入;
2.获取锁方法,String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
3.当获取到锁时,则把锁数据放入内存对象
private final ConcurrentMap<Thread, LockData>   threadData = Maps.newConcurrentMap();

获取锁的过程步骤:

1.每个线程进入获取锁方法之后,直接调用zookeeper创建对应的节点数据;你下为/rrrrww节点下对应的线程所创建的节点

[zk: localhost:2181(CONNECTED) 4] ls /rrrrw
[_c_73f6f5a1-3172-435a-8cb7-cb685edd5850-lock-0000014332, _c_6f6e059a-44c2-4452-a6fe-a8fd46e86b2f-lock-0000014339, _c_1ba53a24-7700-4f8c-a91f-8db5c036c50c-lock-0000014338, _c_2123b109-e7d2-4d04-9a6c-2df0b627be15-lock-0000014335, _c_33672d76-265e-4998-9bb3-16218d9eca21-lock-0000014328, _c_f7ffbf6b-e0dd-4ef9-925f-233912cdff74-lock-0000014337, _c_d60409a6-190e-4aa9-a40b-b354bfacc29a-lock-0000014336, _c_89994cb9-9c19-4fa3-b07f-2af48d823d08-lock-0000014334]

2.获取节点下所有已经创建的子节点,来处理第一个进入的节点,第一个进入的节点,则会获取到锁;

3.如果没有获取到锁,则watcher当前的节点变化,让当前线程等待,当节点变化时,则notifyall所有的线程,则继续获取锁;

3.增加序列之后,则release锁;

以下代码根据zookeeper实现了分布式锁获取全局序列

package com.freeplatform.common.core.seq;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.CloseableUtils;
import org.apache.zookeeper.data.Stat;

/**
 *
 * <p>Description: </p>
 * @date 2016年5月12日
 * @author 李世佳
 */
public class DistributedLockSeq {

    public static final String LOCK_ZNODE = "/rrrrw";

    public static CuratorFramework client;

    public static CuratorFrameworkFactory.Builder builder;

    static {
        client = CuratorFrameworkFactory.newClient("172.16.35.9:2181", new ExponentialBackoffRetry(1000, 3));

        builder = CuratorFrameworkFactory.builder().connectString("172.16.35.9:2181")
                .retryPolicy(new ExponentialBackoffRetry(1000, 3));
        // etc. etc.
    }

    public static void main(String[] args) {

        final ExecutorService service = Executors.newFixedThreadPool(20);

        for (int i = 0; i < 1; i++) {
            service.execute(new SeqTask("[Concurrent-" + i + "]"));
        }

        if (!service.isShutdown()) {
            try {
                service.shutdown();
                if (!service.awaitTermination(10, TimeUnit.SECONDS)) {
                    service.shutdownNow();
                }
            } catch (InterruptedException e) {
                service.shutdownNow();
                System.out.println(e.getMessage());
            }
        }
    }

    // 借助curatorFramework利用Zookeeper实现分布式seq生成
    public static class SeqTask implements Runnable {

        private final String seqTaskName;

        public SeqTask(String seqTaskName) {
            this.seqTaskName = seqTaskName;
        }

        @Override
        public void run() {
            CuratorFramework client = builder.build();
            client.start();
            // 锁对象 client 锁节点
            InterProcessMutex lock = new InterProcessMutex(client, LOCK_ZNODE);
            try {
                boolean retry = true;
                int i = 0;
                do {
                    i++;
                    System.out.println(seqTaskName + " recome:" + i);
                    // 索取锁,设置时长1s,如果获取不到,则继续获取
                    if (lock.acquire(1000, TimeUnit.MILLISECONDS)) {
                        Stat stat = client.checkExists().forPath(LOCK_ZNODE);
                        if (stat != null) {
                            // 获取锁操作则增加序列
                            byte[] oldData = client.getData().storingStatIn(stat).forPath(LOCK_ZNODE);
                            String s = new String(oldData);
                            int d = Integer.parseInt(s);
                            d = d + 1;
                            s = String.valueOf(d);
                            byte[] newData = s.getBytes();
                            client.setData().forPath(LOCK_ZNODE, newData);
                            System.out.println(seqTaskName + " obtain seq :" + new String(newData));
                        }
                        retry = false;
                    }
                } while (retry);
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                try {
                    if (lock.isAcquiredInThisProcess()) {
                        lock.release();
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    CloseableUtils.closeQuietly(client);
                }
            }
        }
    }
}
/**
     * Acquire the mutex - blocks until it‘s available or the given time expires. Note: the same thread
     * can call acquire re-entrantly. Each call to acquire that returns true must be balanced by a call
     * to {@link #release()}
     *
     * @param time time to wait
     * @param unit time unit
     * @return true if the mutex was acquired, false if not
     * @throws Exception ZK errors, connection interruptions
     */
    @Override
    public boolean acquire(long time, TimeUnit unit) throws Exception
    {
        return internalLock(time, unit);
    }

  private boolean internalLock(long time, TimeUnit unit) throws Exception
    {
        /*
           Note on concurrency: a given lockData instance
           can be only acted on by a single thread so locking isn‘t necessary
        */

        Thread          currentThread = Thread.currentThread();

        LockData        lockData = threadData.get(currentThread);
        if ( lockData != null )
        {
            // re-entering
            lockData.lockCount.incrementAndGet();
            return true;
        }

        String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
        if ( lockPath != null )
        {
            LockData        newLockData = new LockData(currentThread, lockPath);
            threadData.put(currentThread, newLockData);
            return true;
        }

        return false;
    }
String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception
    {
        final long      startMillis = System.currentTimeMillis();
        final Long      millisToWait = (unit != null) ? unit.toMillis(time) : null;
        final byte[]    localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes;
        int             retryCount = 0;

        String          ourPath = null;
        boolean         hasTheLock = false;
        boolean         isDone = false;
        while ( !isDone )
        {
            isDone = true;

            try
            {
                if ( localLockNodeBytes != null )
                {
                    ourPath = client.create().creatingParentsIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, localLockNodeBytes);
                }
                else
                {
                    ourPath = client.create().creatingParentsIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);
                }
                hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
            }
            catch ( KeeperException.NoNodeException e )
            {
                // gets thrown by StandardLockInternalsDriver when it can‘t find the lock node
                // this can happen when the session expires, etc. So, if the retry allows, just try it all again
                if ( client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper()) )
                {
                    isDone = false;
                }
                else
                {
                    throw e;
                }
            }
        }

        if ( hasTheLock )
        {
            return ourPath;
        }

        return null;
    }
private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception
    {
        boolean     haveTheLock = false;
        boolean     doDelete = false;
        try
        {
            if ( revocable.get() != null )
            {
                client.getData().usingWatcher(revocableWatcher).forPath(ourPath);
            }

            while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock )
            {
                //获取父节点下所有线程的子节点
                List<String>        children = getSortedChildren();

                //获取当前线程的节点名称
                String              sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash
                 //计算当前节点是否获取到锁
                PredicateResults    predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
                if ( predicateResults.getsTheLock() )
                {
                    haveTheLock = true;
                }
                else
                {
                     //没有索取到锁,则让线程等待,并且watcher当前节点,当节点有变化的之后,则notifyAll当前等待的线程,让它再次进入来争抢锁
                    String  previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();

                    synchronized(this)
                    {
                        try
                        {
                            // use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak
                            client.getData().usingWatcher(watcher).forPath(previousSequencePath);
                            if ( millisToWait != null )
                            {
                                millisToWait -= (System.currentTimeMillis() - startMillis);
                                startMillis = System.currentTimeMillis();
                                if ( millisToWait <= 0 )
                                {
                                    doDelete = true;    // timed out - delete our node
                                    break;
                                }

                                wait(millisToWait);
                            }
                            else
                            {
                                wait();
                            }
                        }
                        catch ( KeeperException.NoNodeException e )
                        {
                            // it has been deleted (i.e. lock released). Try to acquire again
                        }
                    }
                }
            }
        }
        catch ( Exception e )
        {
            doDelete = true;
            throw e;
        }
        finally
        {
            if ( doDelete )
            {
                deleteOurPath(ourPath);
            }
        }
        return haveTheLock;
    }    
@Override
    public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception
    {
        //maxLeases=1,即表示第一个进入的线程所创建的节点获取锁,其他则无,线程等待,watcher节点,节点变化,继续抢占锁
        int             ourIndex = children.indexOf(sequenceNodeName);
        validateOurIndex(sequenceNodeName, ourIndex);

        boolean         getsTheLock = ourIndex < maxLeases;
        String          pathToWatch = getsTheLock ? null : children.get(ourIndex - maxLeases);

        return new PredicateResults(pathToWatch, getsTheLock);
    }
private final Watcher watcher = new Watcher()
    {
        @Override
        public void process(WatchedEvent event)
        {
            notifyFromWatcher();
        }
    };

private synchronized void notifyFromWatcher()
    {
        notifyAll();
    }
				
时间: 2024-07-31 03:43:50

基于zookeeper分布式全局序列分布式锁详解的相关文章

基于柯西矩阵的Erasure Code技术详解

一.概述 Erasure Code可以应用于分布式存储系统中,替代多份数据拷贝的数据冗余方式,从而可以提高存储空间利用率.此外,Erasure code还可以应用于传统RAID系统中,增加数据冗余度,支持多块盘同时发生故障,从而可以提高数据可靠性. 采用范德蒙矩阵可以构建Erasure code(关于范德蒙矩阵的编解码方法,可以参考文章<基于范德蒙矩阵的Erasure code技术详解>),其生成矩阵表示如下: 采用范德蒙矩阵作为编码矩阵的问题在于算法复杂度太高,其解码算法复杂度为O(n^3)

mysql 锁详解

锁是计算机协调多个进程或线程并发访问某一资源的机制.在数据库中,除传统的计算资源(如CPU.RAM.I/O等)的争用以外,数据也是一种供许多用户共享的资源.如何保证数据并发访问的一致性.有效性是所有数据库必须解决的一个问题,锁冲突也是影响数据库并发访问性能的一个重要因素. MySQL有3种锁机制,特性可大致归纳如下. ·表级锁:开销小,加锁快:不会出现死锁:锁定粒度大,发生锁冲突的概率最高,并发度最低. ·行级锁:开销大,加锁慢:会出现死锁:锁定粒度最小,发生锁冲突的概率最低,并发度也最高. ·

有序序列ol li 详解(ol li 标号未显示?)

ol定义和用法 <ol> 标签定义了一个有序列表. 列表排序以数字来显示.使用<li> 标签来定义列表选项. 基本语法如下: <ol> <li>菠萝</li> <li>西瓜</li> <li>火龙果</li> </ol> 运行结果: ol属性 属性 值 描述 compact compact HTML5中不支持,不赞成使用.请使用样式取代它. 规定列表呈现的效果比正常情况更小巧. reve

maven全局配置文件settings.xml详解

maven全局配置文件settings.xml详解 https://www.cnblogs.com/jingmoxukong/p/6050172.html?utm_source=gold_browser_extension 各种标签说明,没有<distributionManagement> 原文地址:https://www.cnblogs.com/stono/p/9456886.html

spring基于通用Dao的多数据源配置详解【ds1】

spring基于通用Dao的多数据源配置详解 有时候在一个项目中会连接多个数据库,需要在spring中配置多个数据源,最近就遇到了这个问题,由于我的项目之前是基于通用Dao的,配置的时候问题不断,这种方式和资源文件冲突:扫描映射文件的话,SqlSessionFactory的bean名字必须是sqlSessionFactory 他读不到sqlSessioNFactory2或者其他名字,最终解决方法如下: 1.在项目中加入如下类MultipleDataSource.java ? 1 2 3 4 5

“全栈2019”Java多线程第二十八章:公平锁与非公平锁详解

难度 初级 学习时间 10分钟 适合人群 零基础 开发语言 Java 开发环境 JDK v11 IntelliJ IDEA v2018.3 文章原文链接 "全栈2019"Java多线程第二十八章:公平锁与非公平锁详解 下一章 "全栈2019"Java多线程第二十九章:可重入锁与不可重入锁详解 学习小组 加入同步学习小组,共同交流与进步. 方式一:关注头条号Gorhaf,私信"Java学习小组". 方式二:关注公众号Gorhaf,回复"J

Java深入学习11:Lock锁详解

Java深入学习11:Lock锁详解 一.Lock锁是什么 java.util.concurrent.locks包下常用的类与接口(lock是jdk 1.5后新增的) Lock 接口支持那些语义不同(重入.公平等)的锁规则,可以在非阻塞式结构的上下文(包括 hand-over-hand 和锁重排算法)中使用这些规则.主要的实现是 ReentrantLock. Lock 实现提供了比 synchronized 关键字 更广泛的锁操作,它能以更优雅的方式处理线程同步问题.也就是说,Lock提供了比s

分布式锁详解

基于数据库: 基于数据库表做乐观锁,用于分布式锁.(version) 基于数据库表做悲观锁(InnoDB,for update) 基于数据库表数据记录做唯一约束(表中记录方法名称) 基于缓存: 使用redis的setnx()用于分布式锁.(setNx,直接设置值为当前时间+超时时间,保持操作原子性) 使用memcached的add()方法,用于分布式锁. 使用Tair的put()方法,用于分布式锁. 基于Zookeeper: 每个客户端对某个方法加锁时,在zookeeper上的与该方法对应的指定

(三)分布式数据库tidb-隔离级别详解

tidb隔离级别详解: 1.TiDB 支持的隔离级别是 Snapshot Isolation(SI),它和 Repeatable Read(RR) 隔离级别基本等价,详细情况如下: ● TiDB 的 SI 隔离级别可以克服幻读异常(Phantom Reads),但 ANSI/ISO SQL 标准中的 RR 不能. 所谓幻读是指:事务 A 首先根据条件查询得到 n 条记录,然后事务 B 改变了这 n 条记录之的 m 条记录或者增添了 m 条符合事务 A 查询条件的记录,导致事务 A 再次发起请求时