InterProcessMutex 类详解步骤:获取锁的过程步骤:
1.acquire方法,根据当前线程获取锁对象,判断当前的线程是否已经获取锁,此处则代表可重入;
2.获取锁方法,String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
3.当获取到锁时,则把锁数据放入内存对象
private final ConcurrentMap<Thread, LockData> threadData = Maps.newConcurrentMap();
获取锁的过程步骤:
1.每个线程进入获取锁方法之后,直接调用zookeeper创建对应的节点数据;你下为/rrrrww节点下对应的线程所创建的节点
[zk: localhost:2181(CONNECTED) 4] ls /rrrrw
[_c_73f6f5a1-3172-435a-8cb7-cb685edd5850-lock-0000014332, _c_6f6e059a-44c2-4452-a6fe-a8fd46e86b2f-lock-0000014339, _c_1ba53a24-7700-4f8c-a91f-8db5c036c50c-lock-0000014338, _c_2123b109-e7d2-4d04-9a6c-2df0b627be15-lock-0000014335, _c_33672d76-265e-4998-9bb3-16218d9eca21-lock-0000014328, _c_f7ffbf6b-e0dd-4ef9-925f-233912cdff74-lock-0000014337, _c_d60409a6-190e-4aa9-a40b-b354bfacc29a-lock-0000014336, _c_89994cb9-9c19-4fa3-b07f-2af48d823d08-lock-0000014334]
2.获取节点下所有已经创建的子节点,来处理第一个进入的节点,第一个进入的节点,则会获取到锁;
3.如果没有获取到锁,则watcher当前的节点变化,让当前线程等待,当节点变化时,则notifyall所有的线程,则继续获取锁;
3.增加序列之后,则release锁;
以下代码根据zookeeper实现了分布式锁获取全局序列
package com.freeplatform.common.core.seq; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.locks.InterProcessMutex; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.curator.utils.CloseableUtils; import org.apache.zookeeper.data.Stat; /** * * <p>Description: </p> * @date 2016年5月12日 * @author 李世佳 */ public class DistributedLockSeq { public static final String LOCK_ZNODE = "/rrrrw"; public static CuratorFramework client; public static CuratorFrameworkFactory.Builder builder; static { client = CuratorFrameworkFactory.newClient("172.16.35.9:2181", new ExponentialBackoffRetry(1000, 3)); builder = CuratorFrameworkFactory.builder().connectString("172.16.35.9:2181") .retryPolicy(new ExponentialBackoffRetry(1000, 3)); // etc. etc. } public static void main(String[] args) { final ExecutorService service = Executors.newFixedThreadPool(20); for (int i = 0; i < 1; i++) { service.execute(new SeqTask("[Concurrent-" + i + "]")); } if (!service.isShutdown()) { try { service.shutdown(); if (!service.awaitTermination(10, TimeUnit.SECONDS)) { service.shutdownNow(); } } catch (InterruptedException e) { service.shutdownNow(); System.out.println(e.getMessage()); } } } // 借助curatorFramework利用Zookeeper实现分布式seq生成 public static class SeqTask implements Runnable { private final String seqTaskName; public SeqTask(String seqTaskName) { this.seqTaskName = seqTaskName; } @Override public void run() { CuratorFramework client = builder.build(); client.start(); // 锁对象 client 锁节点 InterProcessMutex lock = new InterProcessMutex(client, LOCK_ZNODE); try { boolean retry = true; int i = 0; do { i++; System.out.println(seqTaskName + " recome:" + i); // 索取锁,设置时长1s,如果获取不到,则继续获取 if (lock.acquire(1000, TimeUnit.MILLISECONDS)) { Stat stat = client.checkExists().forPath(LOCK_ZNODE); if (stat != null) { // 获取锁操作则增加序列 byte[] oldData = client.getData().storingStatIn(stat).forPath(LOCK_ZNODE); String s = new String(oldData); int d = Integer.parseInt(s); d = d + 1; s = String.valueOf(d); byte[] newData = s.getBytes(); client.setData().forPath(LOCK_ZNODE, newData); System.out.println(seqTaskName + " obtain seq :" + new String(newData)); } retry = false; } } while (retry); } catch (Exception e) { e.printStackTrace(); } finally { try { if (lock.isAcquiredInThisProcess()) { lock.release(); } } catch (Exception e) { e.printStackTrace(); } finally { CloseableUtils.closeQuietly(client); } } } } }
/** * Acquire the mutex - blocks until it‘s available or the given time expires. Note: the same thread * can call acquire re-entrantly. Each call to acquire that returns true must be balanced by a call * to {@link #release()} * * @param time time to wait * @param unit time unit * @return true if the mutex was acquired, false if not * @throws Exception ZK errors, connection interruptions */ @Override public boolean acquire(long time, TimeUnit unit) throws Exception { return internalLock(time, unit); } private boolean internalLock(long time, TimeUnit unit) throws Exception { /* Note on concurrency: a given lockData instance can be only acted on by a single thread so locking isn‘t necessary */ Thread currentThread = Thread.currentThread(); LockData lockData = threadData.get(currentThread); if ( lockData != null ) { // re-entering lockData.lockCount.incrementAndGet(); return true; } String lockPath = internals.attemptLock(time, unit, getLockNodeBytes()); if ( lockPath != null ) { LockData newLockData = new LockData(currentThread, lockPath); threadData.put(currentThread, newLockData); return true; } return false; }
String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception { final long startMillis = System.currentTimeMillis(); final Long millisToWait = (unit != null) ? unit.toMillis(time) : null; final byte[] localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes; int retryCount = 0; String ourPath = null; boolean hasTheLock = false; boolean isDone = false; while ( !isDone ) { isDone = true; try { if ( localLockNodeBytes != null ) { ourPath = client.create().creatingParentsIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, localLockNodeBytes); } else { ourPath = client.create().creatingParentsIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path); } hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath); } catch ( KeeperException.NoNodeException e ) { // gets thrown by StandardLockInternalsDriver when it can‘t find the lock node // this can happen when the session expires, etc. So, if the retry allows, just try it all again if ( client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper()) ) { isDone = false; } else { throw e; } } } if ( hasTheLock ) { return ourPath; } return null; }
private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception { boolean haveTheLock = false; boolean doDelete = false; try { if ( revocable.get() != null ) { client.getData().usingWatcher(revocableWatcher).forPath(ourPath); } while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock ) { //获取父节点下所有线程的子节点 List<String> children = getSortedChildren(); //获取当前线程的节点名称 String sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash //计算当前节点是否获取到锁 PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases); if ( predicateResults.getsTheLock() ) { haveTheLock = true; } else { //没有索取到锁,则让线程等待,并且watcher当前节点,当节点有变化的之后,则notifyAll当前等待的线程,让它再次进入来争抢锁 String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch(); synchronized(this) { try { // use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak client.getData().usingWatcher(watcher).forPath(previousSequencePath); if ( millisToWait != null ) { millisToWait -= (System.currentTimeMillis() - startMillis); startMillis = System.currentTimeMillis(); if ( millisToWait <= 0 ) { doDelete = true; // timed out - delete our node break; } wait(millisToWait); } else { wait(); } } catch ( KeeperException.NoNodeException e ) { // it has been deleted (i.e. lock released). Try to acquire again } } } } } catch ( Exception e ) { doDelete = true; throw e; } finally { if ( doDelete ) { deleteOurPath(ourPath); } } return haveTheLock; }
@Override public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception { //maxLeases=1,即表示第一个进入的线程所创建的节点获取锁,其他则无,线程等待,watcher节点,节点变化,继续抢占锁 int ourIndex = children.indexOf(sequenceNodeName); validateOurIndex(sequenceNodeName, ourIndex); boolean getsTheLock = ourIndex < maxLeases; String pathToWatch = getsTheLock ? null : children.get(ourIndex - maxLeases); return new PredicateResults(pathToWatch, getsTheLock); }
private final Watcher watcher = new Watcher() { @Override public void process(WatchedEvent event) { notifyFromWatcher(); } }; private synchronized void notifyFromWatcher() { notifyAll(); }