1,ReentrantLock实现了Lock接口,下面是Lock接口。定义了一些Lock的基本操作。
2,ReentrantLock根据在高并发下获取锁的算法分为FairSync和NonfairSync两种。默认为NonfairSync。
3,FairSync和NonfairSync继承了Sync。而Sync是锁的基础控制类。FairSync依然需要检查当前线程是否是等待队列的第一个,NonfairSync则不需要直接从列表中取一个。实际中公平锁吞吐量比非公平锁小很多。
4,Sync通过AbstractQueuedSynchronizer(AQS)来管理对Lock的使用。AQS内部通过维护一个lock queue来维护对锁的争用。改lock queue是CLH locks 的一个变种。下面是节点的定义
static final class Node { /** waitStatus value to indicate thread has cancelled */ static final int CANCELLED = 1; /** waitStatus value to indicate successor's thread needs unparking */ static final int SIGNAL = -1; /** waitStatus value to indicate thread is waiting on condition */ static final int CONDITION = -2; /** Marker to indicate a node is waiting in shared mode */ static final Node SHARED = new Node(); /** Marker to indicate a node is waiting in exclusive mode */ static final Node EXCLUSIVE = null; /** * Status field, taking on only the values: * SIGNAL: The successor of this node is (or will soon be) * blocked (via park), so the current node must * unpark its successor when it releases or * cancels. To avoid races, acquire methods must * first indicate they need a signal, * then retry the atomic acquire, and then, * on failure, block. * CANCELLED: This node is cancelled due to timeout or interrupt. * Nodes never leave this state. In particular, * a thread with cancelled node never again blocks. * CONDITION: This node is currently on a condition queue. * It will not be used as a sync queue node until * transferred. (Use of this value here * has nothing to do with the other uses * of the field, but simplifies mechanics.) * 0: None of the above * * The values are arranged numerically to simplify use. * Non-negative values mean that a node doesn't need to * signal. So, most code doesn't need to check for particular * values, just for sign. * * The field is initialized to 0 for normal sync nodes, and * CONDITION for condition nodes. It is modified only using * CAS. */ volatile int waitStatus; /** * Link to predecessor node that current node/thread relies on * for checking waitStatus. Assigned during enqueing, and nulled * out (for sake of GC) only upon dequeuing. Also, upon * cancellation of a predecessor, we short-circuit while * finding a non-cancelled one, which will always exist * because the head node is never cancelled: A node becomes * head only as a result of successful acquire. A * cancelled thread never succeeds in acquiring, and a thread only * cancels itself, not any other node. */ volatile Node prev; /** * Link to the successor node that the current node/thread * unparks upon release. Assigned once during enqueuing, and * nulled out (for sake of GC) when no longer needed. Upon * cancellation, we cannot adjust this field, but can notice * status and bypass the node if cancelled. The enq operation * does not assign next field of a predecessor until after * attachment, so seeing a null next field does not * necessarily mean that node is at end of queue. However, if * a next field appears to be null, we can scan prev's from * the tail to double-check. */ volatile Node next; /** * The thread that enqueued this node. Initialized on * construction and nulled out after use. */ volatile Thread thread; /** * Link to next node waiting on condition, or the special * value SHARED. Because condition queues are accessed only * when holding in exclusive mode, we just need a simple * linked queue to hold nodes while they are waiting on * conditions. They are then transferred to the queue to * re-acquire. And because conditions can only be exclusive, * we save a field by using special value to indicate shared * mode. */ Node nextWaiter; /** * Returns true if node is waiting in shared mode */ final boolean isShared() { return nextWaiter == SHARED; } /** * Returns previous node, or throws NullPointerException if * null. Use when predecessor cannot be null. * @return the predecessor of this node */ final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } Node() { // Used to establish initial head or SHARED marker } Node(Thread thread, Node mode) { // Used by addWaiter this.nextWaiter = mode; this.thread = thread; } Node(Thread thread, int waitStatus) { // Used by Condition this.waitStatus = waitStatus; this.thread = thread; } }
5,ReentrantLock中对锁的调用实际调的都是Sync的子类。比如:
public void lock() { sync.lock(); }
6,在中有一个重要的方法如下。通过该方法我们可以获取Condition.通过条件变量Condition,我们更灵活的使用锁。可以实现类似Object.wait/notify/notifyAll的应用场景
public Condition newCondition() { return sync.newCondition(); }
7,下面是condition的例子
package com.j2se.concurrence.lockcondition; import java.util.ArrayList; import java.util.List; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** * Lock test * * @author joeyon * @date Aug 14, 2014 8:04:57 PM */ public class LockTest { private static int i; private static Lock lk = new ReentrantLock(); public static void test() { List<Thread> list = new ArrayList<Thread>(); int tcount = 3; // prepare threads for (int i = 0; i < tcount; i++) { list.add(new Thread(new TmpRunnable(), "t-" + i)); } // start threads for (int i = 0; i < tcount; i++) { list.get(i).start(); } } private static class TmpRunnable implements Runnable { @Override public void run() { lk.lock(); try { printTime("begin"); Thread.sleep(1000 * 1); // sleep a while, for test purpose printTime("end"); } catch (InterruptedException e) { e.printStackTrace(); } finally { lk.unlock(); } } } public static void printTime() { printTime(""); } /** * print thread name & time * * @param info * additional info to print */ public synchronized static void printTime(String info) { System.out.printf("%s:\t%d,\t,%d,\t%s\n", Thread.currentThread().getName(), ++i, System.currentTimeMillis() / 1000, info); } public static void main(String[] args) { test(); } }
8,read/write lock例子 :
package com.j2se.concurrence.lockcondition; import java.util.ArrayList; import java.util.List; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantReadWriteLock; /** * Read Write Lock test * * @author joeyon * @date Aug 14, 2014 10:34:08 PM */ public class ReadWriteLockTest { private static int i; private static ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); private static Lock rlk = lock.readLock(); private static Lock wlk = lock.writeLock(); private static String data = ""; private static volatile long lastUpdate; // track last publish date /** * publish data, use write lock, * * @param newData */ public static void publish(String newData) { wlk.lock(); try { printTime("begin publish"); data = newData; lastUpdate = System.currentTimeMillis(); // modify last update date printTime("data:\n\t" + data); printTime("end publish"); } catch (Exception e) { e.printStackTrace(); } finally { wlk.unlock(); } } /** * view data, use read lock * * @param previousView * last viewed publish date * @return date of new publish, or -1 if no new publish */ public static long view(long previousView) { if (previousView < lastUpdate) { // new publish rlk.lock(); try { printTime("view data:\n\t" + data); } catch (Exception e) { e.printStackTrace(); } finally { rlk.unlock(); } return lastUpdate; } else { // no new publish printTime("no new publish yet"); return -1; } } public static void test() throws InterruptedException { Thread tPublish = new Thread(new Runnable() { @Override public void run() { while (true) { publish("hi, xxxxxx, data_" + i + "_xxxxxx"); try { Thread.sleep(1000 * 10); // update interval } catch (InterruptedException e) { e.printStackTrace(); } } } }, "t-publish"); // prepare view threads int tViewCount = 3; // count of view thread List<Thread> tViewList = new ArrayList<Thread>(); final List<Long> tLastView = new ArrayList<Long>(); // keep track of last viewed publish date for (int i = 0; i < tViewCount; i++) { final int _index = i; tViewList.add(new Thread(new Runnable() { @Override public void run() { while (true) { long _lastDate = view(tLastView.get(_index)); if (_lastDate > 0) { tLastView.set(_index, _lastDate); // update last viewed publish date, if has new publish } try { Thread.sleep(1000 * 4); // view interval } catch (InterruptedException e) { e.printStackTrace(); } } } }, "t-view-" + i)); tLastView.add(0L); } tPublish.start(); for (int i = 0; i < tViewCount; i++) { tViewList.get(i).start(); Thread.sleep(1000 * 5); // start interval of view threads } } public static void printTime() { printTime(""); } /** * print thread name & time * * @param info * additional info to print */ public synchronized static void printTime(String info) { System.out.printf("%s:\t%d,\t,%d,\t%s\n", Thread.currentThread().getName(), ++i, System.currentTimeMillis() / 1000, info); } public static void main(String[] args) throws InterruptedException { test(); } }
参考:http://kuchaguangjie.iteye.com/blog/1632154
http://www.blogjava.net/xylz/archive/2010/07/15/326152.html
http://www.cnblogs.com/MichaelPeng/archive/2010/02/12/1667947.html
http://flyfoxs.iteye.com/blog/2101244
时间: 2024-10-27 07:24:28