java并发编程之原子操作

  先来看一段简单的代码,稍微有点并发知识的都可以知道打印出结果必然是一个小于20000的值

package com.example.test.cas;

import java.io.IOException;

/**
 * @author hehang on 2019-10-09
 * @description
 */
public class LockDemo {

    private volatile  int i;

    public void add(){
        i++;
    }

    public static void main(String[] args) throws IOException {

        LockDemo lockDemo = new LockDemo();
        for (int i = 0; i <2 ; i++) {
            new Thread(() ->{
                for (int j = 0; j <10000 ; j++) {
                    lockDemo.add();
                }
            }).start();
        }
        System.in.read();
        System.out.println(lockDemo.i);
    }
}

  改进一下,使用jdk给我们提供的原子操作类,达到了我们预想的结果

package com.example.test.cas;

import java.util.concurrent.atomic.AtomicInteger;

/**
 * @author hehang on 2019-10-14
 * @description
 */
public class AtomicTest {
    public static void main(String[] args) throws InterruptedException {
        // 自增
        AtomicInteger atomicInteger = new AtomicInteger(0);
        for (int i = 0; i < 2; i++) {
            new Thread(() -> {
                for (int j = 0; j < 10000; j++) {
                    atomicInteger.incrementAndGet();
                }
            }).start();
        }
        Thread.sleep(2000L);
        System.out.println(atomicInteger.get());
    }
}

  下面就来探究下jdk为我们提供的原子操作类的原理,基于java native方法实现一个自己原子操作类

package com.example.test.cas;

import sun.misc.Unsafe;

import java.io.IOException;
import java.lang.reflect.Field;

/**
 * @author hehang on 2019-10-09
 * @description
 */
public class LockCASDemo {

    private volatile  int i;

    private static Unsafe unsafe;

    private static long offset;
    static{
        try {
            Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe");
            theUnsafe.setAccessible(true);
            unsafe = (Unsafe) theUnsafe.get(null);
            offset = unsafe.objectFieldOffset(LockCASDemo.class.getDeclaredField("i"));
        } catch (Exception e) {
            e.printStackTrace();
        }

    }

    public void add(){
        int curent;
        int value;
        do{
            curent = unsafe.getIntVolatile(this,offset);
            value = curent+1;
        }while (!unsafe.compareAndSwapInt(this,offset,curent,value));
    }

    public static void main(String[] args) throws IOException {

        LockCASDemo lockDemo = new LockCASDemo();
        for (int i = 0; i <2 ; i++) {
            new Thread(() ->{
                for (int j = 0; j <10000 ; j++) {
                    lockDemo.add();
                }
            }).start();
        }
        System.in.read();
        System.out.println(lockDemo.i);
    }
}

  实现这样一个类的要点有:1、基于反射机制获取UnSafe对象2、利用UnSafe对象获取属性偏移量,然后调用compareAndSwapInt方法,比较和替换是硬件同步原语,处理器提供了基于内存操作的原子性保证。

  以上的代码未免麻烦,因此jdk为我们封装了一些原子操作类来简化使用,打开这些原子操作类的源代码,可以发现其内部实现就是循环+调用native方法(比较替换),常用的原子操作类如下:

  cas存在的三个问题

  1、循环+cas,自旋的实现让cpu处于高频运行状态,争抢cpu执行时间,如果并发太高,部分线程长时间执行不成功,带来很大的cpu消耗

  2、只能针对单个变量实现原子操作

  3、出现ABA问题

  针对第一个问题,jdk1.8为我们提供了增强版的计数器,内部利用分而治之的思想来减少线程间的cpu争抢,提高并发效率,具体可以看下面的测试

package com.example.test.cas;

import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;

/**
 * @author hehang on 2019-10-14
 * @description asd
 */
public class LongAdderDemo {
    private long count = 0;

    // 同步代码块的方式
    public void testSync() throws InterruptedException {
        for (int i = 0; i < 3; i++) {
            new Thread(() -> {
                long starttime = System.currentTimeMillis();
                while (System.currentTimeMillis() - starttime < 2000) { // 运行两秒
                    synchronized (this) {
                        ++count;
                    }
                }
                long endtime = System.currentTimeMillis();
                System.out.println("SyncThread spend:" + (endtime - starttime) + "ms" + " v" + count);
            }).start();
        }
    }

    // Atomic方式
    private AtomicLong acount = new AtomicLong(0L);

    public void testAtomic() throws InterruptedException {
        for (int i = 0; i < 3; i++) {
            new Thread(() -> {
                long starttime = System.currentTimeMillis();
                while (System.currentTimeMillis() - starttime < 2000) { // 运行两秒
                    acount.incrementAndGet(); // acount++;
                }
                long endtime = System.currentTimeMillis();
                System.out.println("AtomicThread spend:" + (endtime - starttime) + "ms" + " v-" + acount.incrementAndGet());
            }).start();
        }
    }

    // LongAdder 方式
    private LongAdder lacount = new LongAdder();
    public void testLongAdder() throws InterruptedException {
        for (int i = 0; i < 3; i++) {
            new Thread(() -> {
                long starttime = System.currentTimeMillis();
                while (System.currentTimeMillis() - starttime < 2000) { // 运行两秒
                    lacount.increment();
                }
                long endtime = System.currentTimeMillis();
                System.out.println("LongAdderThread spend:" + (endtime - starttime) + "ms" + " v-" + lacount.sum());
            }).start();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        LongAdderDemo demo = new LongAdderDemo();
        demo.testSync();
        demo.testAtomic();
        demo.testLongAdder();
    }
}

  三种方式在同样的时间内,自增的数值如下,可以看到LongAdder的效率更高一些

SyncThread spend:2000ms v23074332
SyncThread spend:2000ms v23094924
AtomicThread spend:2000ms v-38137398
AtomicThread spend:2000ms v-38152694
SyncThread spend:2011ms v23094924
AtomicThread spend:2000ms v-38416095
LongAdderThread spend:2000ms v-40097562
LongAdderThread spend:2000ms v-40606405
LongAdderThread spend:2001ms v-40917467

Process finished with exit code 0

  针对第二个问题,我们只能通过加锁或者其它手段其处理,这里不做展开

  针对第三个问题,我们先模拟出以下的场景来展示这个问题

package com.example.test.cas.aba;

/**
 * @author hehang on 2019-10-14
 * @description
 */
public class Node {
    public final String item;
    public Node next;

    public Node(String item) {
        this.item = item;
    }

    @Override
    public String toString() {
        return "item内容:" + this.item;
    }
}
package com.example.test.cas.aba;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

/**
 * @author hehang on 2019-10-14
 * @description
 */
// 实现一个 栈(后进先出)
public class Stack {
    // top cas无锁修改
    AtomicReference<Node> top = new AtomicReference<Node>();

    public void push(Node node) { // 入栈
        Node oldTop;
        do {
            oldTop = top.get();
            node.next = oldTop;
        }
        while (!top.compareAndSet(oldTop, node)); // CAS 替换栈顶
    }

    // 为了演示ABA效果, 增加一个CAS操作的延时
    public Node pop(int time) throws InterruptedException { // 出栈 -- 取出栈顶

        Node newTop;
        Node oldTop;
        do {
            oldTop = top.get();
            if (oldTop == null) {
                return null;
            }
            newTop = oldTop.next;
            if (time != 0) {
                System.out.println(Thread.currentThread() + " 休眠前拿到的数据" + oldTop.item);
                TimeUnit.SECONDS.sleep(time); // 休眠指定的时间
            }
        }
        while (!top.compareAndSet(oldTop, newTop));
        return oldTop;
    }
}
package com.example.test.cas.aba;

/**
 * @author hehang on 2019-10-14
 * @description
 */
public class Test {
    public static void main(String[] args) throws InterruptedException {
        Stack stack = new Stack();
        stack.push(new Node("B"));
        stack.push(new Node("A"));

        Thread thread1 = new Thread(() -> {
            try {
                System.out.println(Thread.currentThread() + " 拿到数据:" + stack.pop(3));
                // 再继续拿,就会有问题了,理想情况stack出数据应该是 A->C->D->B,实际上ABA问题导致A-B->null
                System.out.println(Thread.currentThread() + " 拿到数据:" + stack.pop(0));
                System.out.println(Thread.currentThread() + " 拿到数据:" + stack.pop(0));
                System.out.println(Thread.currentThread() + " 拿到数据:" + stack.pop(0));
            } catch (Exception e) {
                e.printStackTrace();
            }
        });
        thread1.start();

        Thread.sleep(300); // 让线程1先启动

        Thread thread2 = new Thread(() -> {
            Node A = null;
            try {
                A = stack.pop(0);
                System.out.println(Thread.currentThread() + " 拿到数据:" + A);
                stack.push(new Node("D"));
                stack.push(new Node("C"));
                stack.push(A);
            } catch (Exception e) {
                e.printStackTrace();
            }
        });
        thread2.start();
    }
}

  在上面的例子中我们想实现一个栈,单线程情况下是没有任何问题的,但是在并发场景下就会出现丢数据的问题,运行结果:

Thread[Thread-0,5,main] 睡一下,预期拿到的数据A
Thread[Thread-1,5,main] 拿到数据:item内容:A
Thread[Thread-0,5,main] 拿到数据:item内容:A
Thread[Thread-0,5,main] 拿到数据:item内容:B
Thread[Thread-0,5,main] 拿到数据:null
Thread[Thread-0,5,main] 拿到数据:null

Process finished with exit code 0

  好在jdk为我们考虑了这个问题,提供了AtomicStampedReference和AtomicMarkableReference,前者基于时间戳,后者基于标记位来对同样的数据做区分,从未避免了ABA问题

package com.example.test.cas.aba;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicStampedReference;

/**
 * @author hehang on 2019-10-14
 * @description
 */
public class ConcurrentStack {
    AtomicStampedReference<Node> top = new AtomicStampedReference<Node>(null,0);
    public void push(Node node){
        Node oldTop;
        int v;
        do{
            v=top.getStamp();
            oldTop = top.getReference();
            node.next = oldTop;
        }
        while(!top.compareAndSet(oldTop, node,v,v+1));
        //   }while(!top.compareAndSet(oldTop, node,top.getStamp(),top.getStamp()+1));
    }
    public Node pop(int time){
        Node newTop;
        Node oldTop;
        int v;
        do{
            v=top.getStamp();
            oldTop = top.getReference();
            if(oldTop == null){
                return null;
            }
            newTop = oldTop.next;
            try {
                if (time != 0) {
                    System.out.println(Thread.currentThread() + " 睡一下,预期拿到的数据" + oldTop.item);
                    TimeUnit.SECONDS.sleep(time); // 休眠指定的时间
                }
            }
            catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        while(!top.compareAndSet(oldTop, newTop,v,v+1));
        //   }while(!top.compareAndSet(oldTop, newTop,top.getStamp(),top.getStamp()));
        return oldTop;
    }
    public void get(){
        Node node = top.getReference();
        while(node!=null){
            System.out.println(node.item);
            node = node.next;
        }
    }
}
package com.example.test.cas.aba;

/**
 * @author hehang on 2019-10-14
 * @description
 */
public class Test2 {
    public static void main(String[] args) throws InterruptedException {
        ConcurrentStack stack = new ConcurrentStack();
        stack.push(new Node("B"));
        stack.push(new Node("A"));

        Thread thread1 = new Thread(() -> {
            try {
                System.out.println(Thread.currentThread() + " 拿到数据:" + stack.pop(3));
                // #再继续拿,就会有问题了,理想情况stack出数据应该是 A->C->D->B,实际上ABA问题导致A-B->null
                System.out.println(Thread.currentThread() + " 拿到数据:" + stack.pop(0));
                System.out.println(Thread.currentThread() + " 拿到数据:" + stack.pop(0));
                System.out.println(Thread.currentThread() + " 拿到数据:" + stack.pop(0));
            } catch (Exception e) {
                e.printStackTrace();
            }
        });
        thread1.start();

        Thread.sleep(300); // 让线程1先启动

        Thread thread2 = new Thread(() -> {
            Node A = null;
            try {
                A = stack.pop(0);
                System.out.println(Thread.currentThread() + " 拿到数据:" + A);
                stack.push(new Node("D"));
                stack.push(new Node("C"));
                stack.push(A);
            } catch (Exception e) {
                e.printStackTrace();
            }
        });
        thread2.start();
    }
}

  结果如下:

Thread[Thread-0,5,main] 睡一下,预期拿到的数据A
Thread[Thread-1,5,main] 拿到数据:item内容:A
Thread[Thread-0,5,main] 睡一下,预期拿到的数据A
Thread[Thread-0,5,main] 拿到数据:item内容:A
Thread[Thread-0,5,main] 拿到数据:item内容:C
Thread[Thread-0,5,main] 拿到数据:item内容:D
Thread[Thread-0,5,main] 拿到数据:item内容:B

Process finished with exit code 0

原文地址:https://www.cnblogs.com/hhhshct/p/11676923.html

时间: 2024-10-03 19:52:27

java并发编程之原子操作的相关文章

6、Java并发编程:volatile关键字解析

Java并发编程:volatile关键字解析 volatile这个关键字可能很多朋友都听说过,或许也都用过.在Java 5之前,它是一个备受争议的关键字,因为在程序中使用它往往会导致出人意料的结果.在Java 5之后,volatile关键字才得以重获生机. volatile关键字虽然从字面上理解起来比较简单,但是要用好不是一件容易的事情.由于volatile关键字是与Java的内存模型有关的,因此在讲述volatile关键之前,我们先来了解一下与内存模型相关的概念和知识,然后分析了volatil

Java并发编程笔记 并发概览

并发概览 >>同步 如何同步多个线程对共享资源的访问是多线程编程中最基本的问题之一.当多个线程并发访问共享数据时会出现数据处于计算中间状态或者不一致的问题,从而影响到程序的正确运行.我们通常把这种情况叫做竞争条件(race condition),把并发访问共享数据的代码叫做关键区域(critical section).同步就是使得多个线程顺序进入关键区域从而避免竞争条件的发生. >>线程安全性 编写线程安全的代码的核心是要对状态访问操作进行管理,尤其是对共享的和可变的状态访问. 线

JAVA并发编程J.U.C学习总结

前言 学习了一段时间J.U.C,打算做个小结,个人感觉总结还是非常重要,要不然总感觉知识点零零散散的. 有错误也欢迎指正,大家共同进步: 另外,转载请注明链接,写篇文章不容易啊,http://www.cnblogs.com/chenpi/p/5614290.html 本文目录如下,基本上涵盖了J.U.C的主要内容: JSR 166及J.U.C Executor框架(线程池. Callable .Future) AbstractQueuedSynchronizer(AQS框架) Locks & C

JAVA并发编程2_线程安全&amp;内存模型

"你永远都不知道一个线程何时在运行!" 在上一篇博客JAVA并发编程1_多线程的实现方式中后面看到多线程中程序运行结果往往不确定,和我们预期结果不一致.这就是线程的不安全.线程的安全性是非常复杂的,没有任何同步的情况下,多线程的执行顺序是不可预测的.当多个线程访问同一个资源时就会出现线程安全问题.例如有一个银行账户,一个线程往里面打钱,一个线程取钱,要是得到不确定的结果那是多么可怕的事情. 引入: 例如下面的程序,在单线程下,执行两次i++理论上i的最终值是12,但是在多线程环境下则不

Java并发编程(一)

Java并发编程(一) 之前看<Thinking In Java>时,并发讲解的挺多的,自己算是初步了解了并发.但是其讲解的不深入,自己感觉其讲解的不够好.后来自己想再学一学并发,买了<Java并发编程实战>,看了一下讲的好基础.好多的理论,而且自我感觉讲的逻辑性不强.最后,买了本<Java并发编程的艺术>看,这本书挺好的,逻辑性非常强. 1. 概述 本篇文章主要内容来自<Java并发编程的艺术>,其讲解的比较深入,自己也有许多不懂的地方,然后自己主要把它讲

Java并发编程-非阻塞同步方式原子类(Atomic)的使用

非阻塞同步 在大多数情况下,我们为了实现线程安全都会使用Synchronized或lock来加锁进行线程的互斥同步,但互斥同步的最主要的问题就是进行线程的阻塞和唤醒所带来的性能问题,因此这种阻塞也称作阻塞同步.从处理问题的方式上说,互斥同步属于一种悲观的并发策略,总是认为只要不去做正确的同步措施,那就肯定会出现问题,无论共享数据是否真的会出现竞争,它都会进行加锁.用户态核心态转换.维护锁的计数器和检查是否有被阻塞的线程需要被唤醒等操作. 随着硬件指令集的发展,我们有了另一个选择:基于冲突检测的乐

Java并发编程:并发容器ConcurrentHashMap

Java并发编程:并发容器之ConcurrentHashMap(转载) 下面这部分内容转载自: http://www.haogongju.net/art/2350374 JDK5中添加了新的concurrent包,相对同步容器而言,并发容器通过一些机制改进了并发性能.因为同步容器将所有对容器状态的访问都 串行化了,这样保证了线程的安全性,所以这种方法的代价就是严重降低了并发性,当多个线程竞争容器时,吞吐量严重降低.因此Java5.0开 始针对多线程并发访问设计,提供了并发性能较好的并发容器,引入

java并发编程(2)--volatile

转载:http://ifeve.com/volatile/ 作者:方 腾飞 花名清英,并发网(ifeve.com)创始人,畅销书<Java并发编程的艺术>作者,蚂蚁金服技术专家.目前工作于支付宝微贷事业部,关注互联网金融,并发编程和敏捷实践. Volatile是轻量级的synchronized,它在多处理器开发中保证了共享变量的“可见性”. 可见性的意思是当一个线程修改一个共享变量时,另外一个线程能读到这个修改的值. 共享变量:在多个线程之间能够被共享的变量被称为共享变量.共享变量包括所有的实

《Java并发编程实战》要点笔记及java.util.concurrent 的结构介绍

买了<java并发编程实战>这本书,看了好几遍都不是很懂,这个还是要在实战中找取其中的要点的,后面看到一篇文章笔记做的很不错分享给大家!! 原文地址:http://blog.csdn.net/cdl2008sky/article/details/26377433 Subsections  1.线程安全(Thread safety) 2.锁(lock) 3.共享对象 4.对象组合 5.基础构建模块 6.任务执行 7.取消和关闭 8.线程池的使用 9.性能与可伸缩性 10.并发程序的测试 11.显