非阻塞同步算法实战(四)- 计数器定时持久化

问题背景及要求

  • 需要对评论进行点赞次数和被评论次数进行统计,或者更多维度
  • 要求高并发、高性能计数,允许极端情况丢失一些统计次数,例如宕机
  • 评论很多,不能为每一个评论都一直保留其计数器,计数器需要有回收机制

问题抽象及分析

根据以上需求,为了方便编码与测试,我们把需求转化为以下接口

/**
 * 计数器
 */
public interface Counter {
    /**
     * 取出统计数据,用Saver去持久化(仅定时器会调用,无并发)
     * @param saver
     */
    void save(Saver saver);

    /**
     * 计数(有并发)
     * @param key 业务ID
     * @param like 点赞
     * @param comment 评论
     */
    void add(String key, int like, int comment);

    /**
     * 持久化器,将数量持久化到数据库等
     */
    @FunctionalInterface
    interface Saver{
        void save(String key, int like, int comment);
    }
}

简单分析可知,计数器比较简单,用AtomicInteger便能保证原子性,但考虑到计数器会被回收,则可能会出现这样的场景:某计数器已被回收了,此时继续在该计数器上计数,便会造成数据丢失,因此要处理该并发问题

解决方案

方案一

使用原生锁来解决竞争问题

/**
 * 直接对所有操作上锁,来保证线程安全
 */
public class SynchronizedCounter implements Counter{
    private HashMap<String, Adder> map = new HashMap<>();

    @Override
    public synchronized void save(Saver saver) {
        map.forEach((key, value)->{//因为已加锁,所以可以安全地取数据
            saver.save(key, value.like, value.comment);
        });
        map = new HashMap<>();
    }

    @Override
    public synchronized void add(String key, int like, int comment) {
        //因为已加锁,所以可以安全地更新数据
        Adder adder = map.computeIfAbsent(key, x -> new Adder());
        adder.like += like;
        adder.comment += comment;
    }
    static class Adder{
        private int like;
        private int comment;
    }
}

方案点评:该方案让业务线程和定时保存线程竞争同一把实例锁,让他们互斥地访问,解决了竞争问题,但锁粒度太粗爆,性能低下


方案二

为了循序渐进,我们把“计数器需要有回收机制”这条要求去掉,这样我们可以很容易地利用上AtomicInteger这个类

/**
 * 不回收计数器,问题变得简单许多
 */
public class IncompleteCounter implements Counter {
    private ConcurrentHashMap<String, Adder> map = new ConcurrentHashMap<>();
    @Override
    public void save(Saver saver) {
        map.forEach((key, value)->{//利用了AtomicInteger的原子特性,可以线程安全地取出所有计数,并置0(因为还会继续使用)
            saver.save(key, value.like.getAndSet(0), value.comment.getAndSet(0));
        });
        //因为不回收,所以不用考虑Adder被回收丢弃后,仍被其它线程使用的情况(因为没有锁,所以这种情况是可能发生的)
    }

    @Override
    public void add(String key, int like, int comment) {
        Adder adder = map.computeIfAbsent(key, k -> new Adder());
        adder.like.addAndGet(like);//利用AtomicInteger的原子特性,保证了线程安全
        adder.comment.addAndGet(comment);
    }
    static class Adder{
        AtomicInteger like = new AtomicInteger();
        AtomicInteger comment = new AtomicInteger();
    }
}

方案点评:除了没解决回收问题,简单高效


方案三

因为调用save的线程没有并发情况,阻塞也没关系,经分析可巧妙地使用读写锁,同时又不让add方法进入阻塞

/**
 * 巧妙地利用读写锁,及save方法可阻塞的特点,实现add操作无阻塞
 */
public class ReadWriteLockCounter implements Counter {
    private volatile MapWithLock mapWithLock = new MapWithLock();

    @Override
    public void save(Saver saver) {
        MapWithLock preMapWithLock = mapWithLock;
        mapWithLock = new MapWithLock();
        //不会一直阻塞,因为mapWithLock已被替换,新的add调用会拿到新的mapWithLock
        preMapWithLock.lock.writeLock().lock();
        preMapWithLock.map.forEach((key,value)->{
            //value已经废弃,故无需value.like.getAndSet(0)
            saver.save(key, value.like.get(), value.comment.get());
        });
        //不能释放该锁,否则add方法中,对被替换掉的MapWithLock.lock执行tryLock会成功
        //也许,这是你第一次见到的不需要且不允许释放的锁:)
    }

    @Override
    public void add(String key, int like, int comment) {
        MapWithLock mapWithLock;
        //如果通过tryLock获取锁失败,则表示该mapWithLock已经被废弃了(因为只有废弃了的MapWithLock才会加写锁),故重新获取最新的mapWithLock
        while(!(mapWithLock = this.mapWithLock).lock.readLock().tryLock());
        try{
            Adder adder = mapWithLock.map.computeIfAbsent(key, k -> new Adder());
            adder.like.getAndAdd(like);
            adder.comment.getAndAdd(comment);
        }finally {
            mapWithLock.lock.readLock().unlock();
        }
    }

    static class Adder{
        private AtomicInteger like = new AtomicInteger();
        private AtomicInteger comment = new AtomicInteger();

    }
    static class MapWithLock{
        private ConcurrentHashMap<String, Adder> map = new ConcurrentHashMap<>();
        private ReadWriteLock lock = new ReentrantReadWriteLock();
    }
}

方案点评:减少了锁的粒度,同时add线程可以相互兼容,大幅提升了并发能力,save线程虽会阻塞,但结合其定时执行的特点,并不受影响,且即使极端情况也不会一直阻塞


方案四

使用一个原子的state来替换LockCounter中的ReadWriteLock(因为只使用到了它的部分特性),实现wait-free,获得更高性能

/**
 * ReadWriteLockCounter的改进版,去掉ReadWriteLock,结合当前场景,实现一个wait-free的简易读写锁<br/>
 */
public class CustomLockCounter implements Counter {
    private volatile MapWithState mapWithState = new MapWithState();

    @Override
    public void save(Saver saver) {
        MapWithState preMapWithState = mapWithState;
        mapWithState = new MapWithState();
        //compareAndSet失败则表示该MapWithState正在被使用,等其使用完,它不会一直失败,因为mapWithState已经被替换
        while(!preMapWithState.state.compareAndSet(0,Integer.MIN_VALUE)){
            Thread.yield();
        }
        preMapWithState.map.forEach((key, value)->{
            //value已经废弃,故无需value.like.getAndSet(0)
            saver.save(key, value.like.get(), value.comment.get());
        });
    }

    @Override
    public void add(String key, int like, int comment) {
        MapWithState mapWithState;//add的并发,不可能将Integer.MIN_VALUE自增成正数(设置为Integer.MIN_VALUE时,该MapWithState已经被废弃了)
        while((mapWithState = this.mapWithState).state.getAndIncrement()<0);
        try{
            Adder adder = mapWithState.map.computeIfAbsent(key, k -> new Adder());
            adder.like.getAndAdd(like);
            adder.comment.getAndAdd(comment);
        }finally {
            mapWithState.state.getAndDecrement();
        }
    }

    static class Adder{
        private AtomicInteger like = new AtomicInteger();
        private AtomicInteger comment = new AtomicInteger();

    }
    static class MapWithState {
        private ConcurrentHashMap<String, Adder> map = new ConcurrentHashMap<>();
        private AtomicInteger state = new AtomicInteger();
    }
}

方案点评:保留了前一方案ReadWriteLockCounter的优点,同时结合场景的特点做了些优化,本质就是将CAS失败重试循环替换成了一条fetch-and-add指令,如果不是因为save是低频执行,本方案可能是最高效的了(暂且忽略ConcurrentHashMap等其它可能的优化空间)


方案五

先假定不会发生竞争,然后检测竞争情况,如果发生竞争,则补偿

/**
 * 乐观地假定不会发生竞争,如果发生了,则尝试进行补偿
 */
public class CompensationCounter implements Counter {
    private ConcurrentHashMap<String, Adder> map = new ConcurrentHashMap<>();
    @Override
    public void save(Saver saver) {
        for(Iterator<Map.Entry<String, Adder>> it = map.entrySet().iterator(); it.hasNext();){
            Map.Entry<String, Adder> entry = it.next();
            it.remove();
            entry.getValue().discarded = true;
            saver.save(entry.getKey(), entry.getValue().like.getAndSet(0), entry.getValue().comment.getAndSet(0));//需将计数器置0,此处存在竞争
        }
    }

    @Override
    public void add(String key, int like, int comment) {
        Adder adder = map.computeIfAbsent(key, k -> new Adder());
        adder.like.addAndGet(like);
        adder.comment.addAndGet(comment);
        if(adder.discarded){//如果数量加在了废弃的Adder上面,则执行补偿逻辑
            int likeTemp = adder.like.getAndSet(0);
            int commentTemp = adder.comment.getAndSet(0);
            //即使此后又有线程在计数器上计数了也无妨
            if(likeTemp != 0 || commentTemp != 0){
                add(key, likeTemp, commentTemp);//补偿
            }//也可能已经被其它线程取走了,但并不影响业务正确性
        }
    }
    static class Adder{
        AtomicInteger like = new AtomicInteger();
        AtomicInteger comment = new AtomicInteger();
        volatile boolean discarded = false;//只有保存线程会将它改为true,故使用volatile便能保证线程安全
    }
}

方案点评:跟乐观锁的思路类似,在竞争激烈的情况下,一般不会有最优性能,但此处因为save方法是低频执行的且自身无并发,add方法才有高并发,故失败补偿其实很少真正被执行,这也是为什么测试结果中本方案性能最优的原因


性能测试

最终我们来测试一下各方案的性能,因为我们抽象出了一个统一的接口,故测试也较为容易

import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;

public class CounterTester {
    private static final int THREAD_SIZE = 6;//add方法的并发线程数
    private static final int ADD_SIZE = 5000000;//测试规模
    private static final int KEYS_SIZE = 128*1024;
    public static void main(String[] args) throws InterruptedException {
        Counter[] counters = new Counter[]{new SynchronizedCounter(), new IncompleteCounter(), new ReadWriteLockCounter(), new CustomLockCounter(), new CompensationCounter()};
        String[] keys = new String[KEYS_SIZE];
        Random random = new Random();
        for (int i = 0; i < keys.length; i++) {
            keys[i]=String.valueOf(random.nextInt(KEYS_SIZE*1024));
        }
        for (Counter counter : counters) {
            AtomicInteger totalLike = new AtomicInteger();
            AtomicInteger totalComment = new AtomicInteger();
            AtomicInteger savedTotalLike = new AtomicInteger();
            AtomicInteger savedTotalComment = new AtomicInteger();
            Counter.Saver saver = (key, like, comment) -> {
                savedTotalLike.addAndGet(like);//模拟被持久化到数据库,记录数量以便后续校验正确性
                savedTotalComment.addAndGet(comment);//同上
            };
            CountDownLatch latch = new CountDownLatch(THREAD_SIZE);
            long start = System.currentTimeMillis();
            for (int i = 0; i < THREAD_SIZE; i++) {
                new Thread(()->{
                    Random r = new Random();
                    int like, comment;
                    for (int j = 0; j < ADD_SIZE; j++) {
                        like = 2;
                        comment = 4;
                        counter.add(keys[r.nextInt(KEYS_SIZE)], like, comment);
                        totalLike.addAndGet(like);
                        totalComment.addAndGet(comment);
                    }
                    latch.countDown();
                }).start();
            }
            Thread saveThread = new Thread(()->{
                while(latch.getCount() != 0){
                    try {
                        Thread.sleep(100);//模拟100毫秒执行一次持久化
                    } catch (InterruptedException e) {}
                    counter.save(saver);
                }
                counter.save(saver);

            });
            saveThread.start();
            latch.await();
            System.out.println(counter.getClass().getSimpleName() +" cost:\t"+(System.currentTimeMillis() - start));
            saveThread.join();
            boolean error = savedTotalLike.get() != totalLike.get() || savedTotalComment.get() != totalComment.get();
            (error?System.err:System.out).println("saved:\tlike="+savedTotalLike.get()+"\tcomment="+savedTotalComment.get());
            (error?System.err:System.out).println("added:\tlike="+totalLike.get()+"\tcomment="+totalComment.get()+"\n");
        }
    }
}

在jdk11(jdk8也基本一致)下的测试结果如下:

注:方案二的IncompleteCounter并未完成回收,仅作对比

SynchronizedCounter cost:    12377
saved:    like=60000000    comment=120000000
added:    like=60000000    comment=120000000

IncompleteCounter cost:    2560
saved:    like=60000000    comment=120000000
added:    like=60000000    comment=120000000

ReadWriteLockCounter cost:    7902
saved:    like=60000000    comment=120000000
added:    like=60000000    comment=120000000

CustomLockCounter cost:    3541
saved:    like=60000000    comment=120000000
added:    like=60000000    comment=120000000

CompensationCounter cost:    2093
saved:    like=60000000    comment=120000000
added:    like=60000000    comment=120000000

小结

非阻塞同步算法一般不需要我们去设计,直接使用现有的工具便可,但如果真想通过它进一步去压榨性能,应细心分析各线程穿插执行的情况,同时结合业务场景来考虑(也许在A场景不允许的情况,在B场景是允许的)

原文地址:https://www.cnblogs.com/trytocatch/p/counter-persistence.html

时间: 2024-10-18 16:31:10

非阻塞同步算法实战(四)- 计数器定时持久化的相关文章

非阻塞算法-简单的计数器

1.为什么要用非阻塞算法?我们知道为了避免并发环境下操作共享变量的问题,可以采用同步(synchronize)和锁(Lock)的方式做到线程安全,但是JVM处理锁竞争时对于竞争失败的线程采用的是挂起稍后调度的策略,这样会带来额外的线程上下文切换成本.同时和CAS(Compare And Set)这种非阻塞算法相比,CAS是在底层硬件(CPU)层面实现,只需要锁定独立的内存位置,更细的同步粒度使得CAS失败的线程可以立即重试而不用挂起.总的来说,大多数场景下非阻塞算法和同步锁相比能带来更好的性能,

Linux非阻塞IO(四)非阻塞IO中connect的实现

我们为客户端的编写再做一些工作. 这次我们使用非阻塞IO实现connect函数. int connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen); 非阻塞IO有以下用处: 1.将三次握手的处理过程生下来,处理其他事情. 2.使用这个同时建立多个连接. 3.实现超时connect功能,本节实现的connect就可以指定时间,超时后算作错误处理.   在阻塞IO中,调用connect后一般会阻塞,直到确定连接成功或者失败

非阻塞同步算法与CAS(Compare and Swap)无锁算法

CAS无锁算法 要实现无锁(lock-free)的非阻塞算法有多种实现方法,其中CAS(比较与交换,Compare and swap)是一种有名的无锁算法.CAS, CPU指令,在大多数处理器架构,包括IA32.Space中采用的都是CAS指令,CAS的语义是"我认为V的值应该为A,如果是,那么将V的值更新为B,否则不修改并告诉V的值实际为多少",CAS是项乐观锁技术,当多个线程尝试使用CAS同时更新同一个变量时,只有其中一个线程能更新变量的值,而其它线程都失败,失败的线程并不会被挂起

008. 阻塞&amp;非阻塞、同步&amp;异步

阻塞 非阻塞:关注的对象是调用者: 阻塞:调用者发起调用后,处于等待状态,直到该调用有返回: 非阻塞:调用者发起调用后,不需要等待返回,可以往下执行: 同步 异步:  关注的对象是被调用者: 同步:服务方(被调用者)接收到这个调用后,直到执行完成得到结果,才将结果返回调用者:    异步:服务方(被调用者)接收到这个调用后,立即返回,当执行完成得到结果后,再通过修改状态或发送消息通知调用者. 一.从分布式系统服务间调用的角度 作者:严肃链接:https://www.zhihu.com/quest

《Java并发编程实战》第十五章 原子变量与非阻塞同步机制 读书笔记

一.锁的劣势 锁定后如果未释放,再次请求锁时会造成阻塞,多线程调度通常遇到阻塞会进行上下文切换,造成更多的开销. 在挂起与恢复线程等过程中存在着很大的开销,并且通常存在着较长时间的中断. 锁可能导致优先级反转,即使较高优先级的线程可以抢先执行,但仍然需要等待锁被释放,从而导致它的优先级会降至低优先级线程的级别. 二.硬件对并发的支持 处理器填写了一些特殊指令,例如:比较并交换.关联加载/条件存储. 1 比较并交换 CAS的含义是:"我认为V的值应该为A,如果是,那么将V的值更新为B,否则不需要修

同步(Sync)/异步(Async),阻塞(Block)/非阻塞(Unblock)四种调用方式

1. 概念理解        在进行网络编程时,我们常常见到同步(Sync)/异步(Async),阻塞(Block)/非阻塞(Unblock)四种调用方式:   同步/异步主要针对C端: 同步:      所谓同步,就是在c端发出一个功能调用时,在没有得到结果之前,该调用就不返回.也就是必须一件一件事做,等前一件做完了才能做下一件事.   例如普通B/S模式(同步):提交请求->等待服务器处理->处理完毕返回 这个期间客户端浏览器不能干任何事 异步:      异步的概念和同步相对.当c端一个

同步、异步、阻塞、非阻塞四种调用方式

在进行网络编程时,我们常常见到同步(Sync)/异步(Async),阻塞(Block)/非阻塞(Unblock)四种调用方式: 同步/异步主要针对C端:  同步(Sync) 所谓同步,就是发出一个功能调用时,在没有得到结果之前,该调用就不返回或继续执行后续操作. 根据这个定义,Java中所有方法都是同步调用,应为必须要等到结果后才会继续执行.我们在说同步.异步的时候,一般而言是特指那些需要其他端协作或者需要一定时间完成的任务. 简单来说,同步就是必须一件一件事做,等前一件做完了才能做下一件事.

《Java并发编程实战》笔记-非阻塞算法

如果在某种算法中,一个线程的失败或挂起不会导致其他线程也失败和挂起,那么这种算法就被称为非阻塞算法.如果在算法的每个步骤中都存在某个线程能够执行下去,那么这种算法也被称为无锁(Lock-Free)算法.如果在算法中仅将CAS用于协调线程之间的操作,并且能正确地实现,那么它既是一种无阻塞算法,又是一种无锁算法. 创建非阻塞算法的关键在于,找出如何将原子修改的范围缩小到单个变量上,同时还要维护数据的一致性. 非阻塞算法的所有特性:某项工作的完成具有不确定性,必须重新执行.

Java并发编程实战 第15章 原子变量和非阻塞同步机制

非阻塞的同步机制 简单的说,那就是又要实现同步,又不使用锁. 与基于锁的方案相比,非阻塞算法的实现要麻烦的多,但是它的可伸缩性和活跃性上拥有巨大的优势. 实现非阻塞算法的常见方法就是使用volatile语义和原子变量. 硬件对并发的支持 原子变量的产生主要是处理器的支持,最重要的是大多数处理器架构都支持的CAS(比较并交换)指令. 模拟实现AtomicInteger的++操作 首先我们模拟处理器的CAS语法,之所以说模拟,是因为CAS在处理器中是原子操作直接支持的.不需要加锁. public s