多线程并发编程总结(二)

本文基于https://github.com/h2pl/Java-Tutorial的总结

ReentrantReadWriteLock(读写锁)源码分析

ReentrantReadWriteLock 分为读锁和写锁两个实例:
    读锁是共享锁,可被多个线程同时使用,写锁是独占锁。
    持有写锁的线程可以继续获取读锁(锁降级),反之不行(持有读锁必须先释放才能再次获取写锁)。

AQS 的精髓在于内部的属性 state:

    独占模式,通常就是 0 代表可获取锁,>=1 代表锁被别人获取了。
    共享模式下,每个线程都可以对 state 进行加减操作。

    独占模式和共享模式对于 state 的操作完全不一样,
    读写锁 ReentrantReadWriteLock 中是将 state 这个 32 位的 int 值分为高 16 位和低 16位,
    分别用于共享模式和独占模式。

/**
 * 读锁
 */
public class ReentrantReadWriteLock
        implements ReadWriteLock, java.io.Serializable {

    private final ReentrantReadWriteLock.ReadLock readerLock;

    private final ReentrantReadWriteLock.WriteLock writerLock;

    final Sync sync;

    abstract static class Sync extends AbstractQueuedSynchronizer {
        static final int SHARED_SHIFT   = 16;
        static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
        static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
        static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

        //分享锁(高16位)
        static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
        //独占锁(低16位)
        static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
    }

    public static class ReadLock implements Lock, java.io.Serializable {

        private final Sync sync;

        public void lock() {
            sync.acquireShared(1);
        }
    }
}

AbstractQueuedSynchronizer:

    public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)  //尝试获取锁
            doAcquireShared(arg);   //获取锁失败,进入阻塞队列
    }

ReentrantReadWriteLock:

    abstract static class Sync extends AbstractQueuedSynchronizer {

        protected final int tryAcquireShared(int unused) {
            Thread current = Thread.currentThread();
            int c = getState();
            if (exclusiveCount(c) != 0 &&
                getExclusiveOwnerThread() != current)   //非当前线程持有写锁,获取读锁失败(如果是当前线程,可以锁降级)
                return -1;
            int r = sharedCount(c); //当前持有读锁的次数
            //获取读锁
            //如果公平模式:判断阻塞队列是否有等待
            //如果非公平模式:判断阻塞队列中 head 的第一个后继节点是否是来获取写锁的,如果是的话,让这个写锁先来,避免写锁饥饿。
            if (!readerShouldBlock() &&
                r < MAX_COUNT &&
                compareAndSetState(c, c + SHARED_UNIT)) {
                if (r == 0) {
                    firstReader = current;
                    firstReaderHoldCount = 1;
                } else if (firstReader == current) {
                    firstReaderHoldCount++;
                } else {
                    HoldCounter rh = cachedHoldCounter;
                    if (rh == null ||
                        rh.tid != LockSupport.getThreadId(current))
                        cachedHoldCounter = rh = readHolds.get();
                    else if (rh.count == 0)
                        readHolds.set(rh);
                    rh.count++;
                }
                return 1;
            }
            //处理可重入锁( readerShouldBlock()中阻塞队列有等待而使其获取重入锁失败,则在此方法中获取重入锁 )
            return fullTryAcquireShared(current);
        }
    }

/**
 * 写锁
 */
写锁是独占锁:
    如果有读锁被占用,写锁获取是要进入到阻塞队列中等待的。

JUC 一 ReentrantReadWriteLock

BlockingQueue(不接受 null 值的插入)

public interface BlockingQueue<E> extends Queue<E> {}

BlockingQueu 阻塞队列

ArrayBlockingQueue(数组。有界)

ArrayBlockingQueue 实现并发同步的原理是:根据 Condition 实现。

    读操作和写操作都需要获取到 AQS 独占锁才能进行操作。

    如果队列为空,这个时候读操作的线程进入到读线程队列排队,等待写线程写入新的元素,然后唤醒读线程队列的第一个等待线程。

    如果队列已满,这个时候写操作的线程进入到写线程队列排队,等待读线程将队列元素移除腾出空间,然后唤醒写线程队列的第一个等待线程。

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {

    final Object[] items;

    int takeIndex;  //下一次读取操作的位置

    int putIndex;   //下一次写入操作的位置

    int count;

    final ReentrantLock lock;

    private final Condition notEmpty;

    private final Condition notFull;

    transient Itrs itrs;

    //写元素
    public void put(E e) throws InterruptedException {
        Objects.requireNonNull(e);
        final ReentrantLock lock = this.lock;   //获取锁
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                notFull.await();    //队列满了,写线程阻塞,等待被唤醒
            enqueue(e); //队列不满,写线程被唤醒,写元素,并唤醒读元素线程
        } finally {
            lock.unlock();
        }
    }

    private void enqueue(E e) {
        final Object[] items = this.items;
        items[putIndex] = e;
        if (++putIndex == items.length) putIndex = 0;
        count++;
        notEmpty.signal();  //唤醒读线程
    }
}

LinkedBlockingQueue(链表。可以无界,也可以有界)

底层基于单向链表实现的阻塞队列,可以当做无界队列也可以当做有界队列来使用。

public class LinkedBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {

    static class Node<E> {
        E item;

        Node<E> next;

        Node(E x) { item = x; }
    }

    private final int capacity;

    private final AtomicInteger count = new AtomicInteger();

    transient Node<E> head; //对头

    private transient Node<E> last; //队尾

    private final ReentrantLock takeLock = new ReentrantLock();

    private final Condition notEmpty = takeLock.newCondition();

    private final ReentrantLock putLock = new ReentrantLock();

    private final Condition notFull = putLock.newCondition();
}

takeLock 和 notEmpty 搭配:
    如果要获取(take)一个元素,需要获取 takeLock 锁,
    但是获取了锁还不够,如果队列此时为空,还需要队列不为空(notEmpty)这个条件(Condition)。

putLock 需要和 notFull 搭配:
    如果要插入(put)一个元素,需要获取 putLock 锁,
    但是获取了锁还不够,如果队列此时已满,还需要队列不是满的(notFull)这个条件(Condition)。

    //写元素
    public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        final int c;
        final Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();    //获取写锁
        try {
            while (count.get() == capacity) {
                notFull.await();    //队列满了,写线程阻塞
            }
            enqueue(node);  //被别人唤醒,队列未满,写元素
            c = count.getAndIncrement();
            if (c + 1 < capacity)   //写进一个元素后,如果还有空位,唤醒别的写线程
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            //如果 c == 0,那么代表队列在这个元素入队前是空的(不包括head空节点),
            //那么所有的读线程都在等待 notEmpty 这个条件,等待唤醒,这里做一次唤醒操作
            signalNotEmpty();
    }

    private void enqueue(Node<E> node) {
        last = last.next = node;    //写元素(将元素放在链表队尾)
    }

    private void signalNotEmpty() {
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
    }

SynchronousQueue(读写匹配,不提供储存元素的空间。可以选择公平与非公平)

Synchronous 指的就是读线程和写线程需要同步,一个读线程匹配一个写线程。

不提供任何空间(一个都没有)来存储元素。数据必须从某个写线程交给某个读线程,而不是写到某个队列中等待被消费。

transfer 的设计思路:

    当调用这个方法时,如果队列是空的,或者队列中的节点和当前的线程操作类型一致
    (如当前操作是 put 操作,而队列中的元素也都是写线程)。
    这种情况下,将当前线程加入到等待队列即可。

    如果队列中有等待节点,而且与当前操作可以匹配(如队列中都是读操作线程,当前线程是写操作线程,反之亦然)。
    这种情况下,匹配等待队列的队头,出队,返回相应数据。

public class SynchronousQueue<E> extends AbstractQueue<E>
    implements BlockingQueue<E>, java.io.Serializable {

    public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        if (transferer.transfer(e, false, 0) == null) {
            Thread.interrupted();
            throw new InterruptedException();
        }
    }

    public E take() throws InterruptedException {
        E e = transferer.transfer(null, false, 0);
        if (e != null)
            return e;
        Thread.interrupted();
        throw new InterruptedException();
    }

    static final class TransferQueue<E> extends Transferer<E> { //公平模式

        static final class QNode {  //单向链表等待队列
            volatile QNode next;
            volatile Object item;
            volatile Thread waiter;
            final boolean isData;

        }

        transient volatile QNode head;

        transient volatile QNode tail;

        transient volatile QNode cleanMe;

        E transfer(E e, boolean timed, long nanos) {}
    }
}

PriorityBlockingQueue(数组。无界。二叉堆)

插入队列的对象必须是可比较大小的(comparable),否则报 ClassCastException 异常。
它的插入操作 put 方法不会 block,因为它是无界队列(take 方法在队列为空的时候会阻塞)。

迭代并遍历时,不能保证有序性:
    如果你想要实现有序遍历,建议采用 Arrays.sort(queue.toArray()) 进行处理。

public class PriorityBlockingQueue<E> extends AbstractQueue<E>
    implements BlockingQueue<E>, java.io.Serializable {

    private transient Object[] queue;

    private transient int size;

    private transient Comparator<? super E> comparator;

    private final ReentrantLock lock = new ReentrantLock(); //并发控制采用的是 ReentrantLock

    private final Condition notEmpty = lock.newCondition();

    private transient volatile int allocationSpinLock;  //数组扩容的时候,需要先获取到这个锁,才能进行扩容操作

    private PriorityQueue<E> q; //用于序列化和反序列化的时候用
}

原文地址:https://www.cnblogs.com/loveer/p/11829537.html

时间: 2024-11-02 14:28:56

多线程并发编程总结(二)的相关文章

多线程并发编程

前言 多线程并发编程是Java编程中重要的一块内容,也是面试重点覆盖区域,所以学好多线程并发编程对我们来说极其重要,下面跟我一起开启本次的学习之旅吧. 正文 线程与进程 1 线程:进程中负责程序执行的执行单元线程本身依靠程序进行运行线程是程序中的顺序控制流,只能使用分配给程序的资源和环境 2 进程:执行中的程序一个进程至少包含一个线程 3 单线程:程序中只存在一个线程,实际上主方法就是一个主线程 4 多线程:在一个程序中运行多个任务目的是更好地使用CPU资源 线程的实现 继承Thread类 在j

Java 多线程并发编程面试笔录一览

知识体系图: 1.线程是什么? 线程是进程中独立运行的子任务. 2.创建线程的方式 方式一:将类声明为 Thread 的子类.该子类应重写 Thread 类的 run 方法 方式二:声明实现 Runnable 接口的类.该类然后实现 run 方法 推荐方式二,因为接口方式比继承方式更灵活,也减少程序间的耦合. 3.获取当前线程信息? Thread.currentThread() 4.线程的分类 线程分为守护线程.用户线程.线程初始化默认为用户线程. setDaemon(true) 将该线程标记为

对JAVA多线程 并发编程的理解

对JAVA多线程并发编程的理解 Java多线程编程关注的焦点主要是对单一资源的并发访问,本文从Java如何实现支持并发访问的角度,浅析对并发编程的理解,也算是对前段时间所学的一个总结. 线程状态转换 Java语言定义了5中线程状态,在任何一个时间点,一个线程只能有且只有其中一种状态,这5中状态分别是: ?  新建(New):创建后尚未启动的线程处于这种状态 ?  运行(Runable):Runable包括了操作系统线程状态中的Running和Ready,也就是处于此状态的线程可能正在执行,也有可

并发编程(二):全视角解析volatile

一.目录 1.引入话题-发散思考 2.volatile深度解析 3.解决volatile原子性问题 4.volatile应用场景 二.引入话题-发散思考 public class T1 { /*volatile*/ boolean running=true; public void m(){ System.out.println(Thread.currentThread().getName()+":start!"); while(running){ /*try { TimeUnit.M

Java多线程并发编程

Thread和Runnable Runnable接口可以避免继承自Thread类的单继承的局限性. Runnable的代码可以被多个线程(Thread的实例)所共享,适合于多个线程共享资源(其实就是持有同一个runnable实例)的情况. 以火车站买票为例,分别以继承Thread类和实现Runnable接口这两种方式来模拟3个线程卖5张票: 使用Thread类模拟卖票 1 class MyThread extends Thread{ 2 3 private int ticketCount = 5

java多线程并发编程与CPU时钟分配小议

我们先来研究下JAVA的多线程的并发编程和CPU时钟振荡的关系吧 老规矩,先科普 我们的操作系统在DOS以前都是单任务的 什么是单任务呢?就是一次只能做一件事 你复制文件的时候,就不能重命名了 那么现在的操作系统,我一边在这边写BLOG,一边听歌,一边开着QQ,一边…………………… 显然,现在的操作系统都是多任务的操作系统 操作系统对多任务的支持是怎么样的呢? 每打开一个程序,就启动一个进程,为其分配相应空间(主要是运行程序的内存空间) 这其实就支持并发运行了 CPU有个时钟频率,表示每秒能执行

Java多线程-并发编程模型

以下内容转自http://ifeve.com/%E5%B9%B6%E5%8F%91%E7%BC%96%E7%A8%8B%E6%A8%A1%E5%9E%8B/: 并发系统可以采用多种并发编程模型来实现.并发模型指定了系统中的线程如何通过协作来完成分配给它们的作业.不同的并发模型采用不同的方式拆分作业,同时线程间的协作和交互方式也不相同.这篇并发模型教程将会较深入地介绍目前(2015年,本文撰写时间)比较流行的几种并发模型. 并发模型与分布式系统之间的相似性 本文所描述的并发模型类似于分布式系统中使

(转)《深入理解java虚拟机》学习笔记10——并发编程(二)

Java的并发编程是依赖虚拟机内存模型的三个特性实现的: (1).原子性(Atomicity): 原子性是指不可再分的最小操作指令,即单条机器指令,原子性操作任意时刻只能有一个线程,因此是线程安全的. Java内存模型中通过read.load.assign.use.store和write这6个操作保证变量的原子性操作. long和double这两个64位长度的数据类型java虚拟机并没有强制规定他们的read.load.store和write操作的原子性,即所谓的非原子性协定,但是目前的各种商业

java高并发编程(二)

马士兵java并发编程的代码,照抄过来,做个记录. 一.分析下面面试题 /** * 曾经的面试题:(淘宝?) * 实现一个容器,提供两个方法,add,size * 写两个线程,线程1添加10个元素到容器中,线程2实现监控元素的个数,当个数到5个时,线程2给出提示并结束 * * 分析下面这个程序,能完成这个功能吗? * @author mashibing */ package yxxy.c_019; import java.util.ArrayList; import java.util.List

互联网架构多线程并发编程高级教程(下)

基础篇幅:线程基础知识.并发安全性.JDK锁相关知识.线程间的通讯机制.JDK提供的原子类.并发容器.线程池相关知识点 高级篇幅:ReentrantLock源码分析.对比两者源码,更加深入理解读写锁,JAVA内存模型.先行发生原则.指令重排序 环境说明:    idea.java8.maven  第四章--锁 01 锁的分类 自旋锁: 线程状态及上下文切换消耗系统资源,当访问共享资源的时间短,频繁上下文切换不值得.jvm实现,使线程在没获得锁的时候,不被挂起,转而执行空循环,循环几次之后,如果还