业务背景
在内存中,对mq消息进行分类计数。
问题描述
生产环境,运行一段时间后,发现消息队列有大量堆积。如果把计数逻辑注释掉,只接收用户访问消息而不进行处理,则mq队列无堆积。mq栈dump信息如下:
ConsumeMessageThread_75 TID: 214 STATE: WAITING
ConsumeMessageThread_75 sun.misc.Unsafe.park(Native Method)
ConsumeMessageThread_75 java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
ConsumeMessageThread_75 java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:834)
ConsumeMessageThread_75 java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:867)
ConsumeMessageThread_75 java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1197)
ConsumeMessageThread_75 java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:214)
ConsumeMessageThread_75 java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:290)
ConsumeMessageThread_75 com.youku.paycenter.acl.service.impl.AsyncAclServiceImpl.getAtomicLong(AsyncAclServiceImpl.java:72)
ConsumeMessageThread_75 com.youku.paycenter.acl.service.impl.AsyncAclServiceImpl.count(AsyncAclServiceImpl.java:57)
ConsumeMessageThread_75 com.youku.paycenter.acl.mq.consumer.AclCountConsumer.receive(AclCountConsumer.java:70)
ConsumeMessageThread_75 com.youku.paycenter.mq.rocketmq.RocketMqPushConsumer.syncHandleMessage(RocketMqPushConsumer.java:207)
ConsumeMessageThread_75 com.youku.paycenter.mq.rocketmq.RocketMqPushConsumer.consumeMessage(RocketMqPushConsumer.java:191)
ConsumeMessageThread_75 com.alibaba.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService$ConsumeRequest.run(ConsumeMessageConcurrentlyService.java:142)
ConsumeMessageThread_75 java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
ConsumeMessageThread_75 java.util.concurrent.FutureTask.run(FutureTask.java:262)
ConsumeMessageThread_75 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
ConsumeMessageThread_75 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
ConsumeMessageThread_75 java.lang.Thread.run(Thread.java:745)
分析发现有部分消息消费的线程处于等待状态,代码指向AsyncAclServiceImpl.getAtomicLong(AsyncAclServiceImpl.java:72)。
问题代码
privatefinal AtomicLong getAtomicLong(Stringkey)
{
AtomicLong atomicLong = tempData.get(key); // 注释1
while (null== atomicLong) // 注释2
{
try
{
lock.lock(); // 注释3
while (null== tempData.get(key)) // 注释4
{
atomicLong = new AtomicLong(0);
tempData.put(key, atomicLong);
}
} finally
{
lock.unlock();
}
}
return atomicLong;
}
问题分析
看问题代码,该段代码的功能是查询tempData(ConcurrentHashMap)中是否缓存的有计数器,没有的话创建一个计数器,放入缓存,然后返回。有的话,直接返回。假设并发情况下有2个线程同时执行注释1,然后依次经过注释2到达注释3处的代码。假设第一个线程获得了锁,进入了内层while循环,此时缓存依旧为空,因此会进行创建然后放入缓存的动作,然后退出,然后释放锁。之后,第2个线程在注释3处被唤醒,再次执行注释4的时候发现条件已经不成立,于是释放锁,进入外层循环,这时候问题应该很明显了,因为atomicLong在注释1处已经执行,只不过当时拿到的是null,于是在最外层,第2个线程进入了死循环。
优化代码
private final AtomicLong getAtomicLong(String key){ AtomicLong atomicLong = tempData.get(key); while (null == atomicLong) { try { lock.lock(); if (null == (atomicLong = tempData.get(key))) // 注释1 { atomicLong = new AtomicLong(0); tempData.put(key, atomicLong); } } finally { lock.unlock(); } } return atomicLong;}
改动点非常简单,在注释1处,把内层循环替换为if,最主要的是判断的时候同时对atomicLong进行赋值操作。
心得
编写高并发代码的时候,除了要格外细心,还需要尽可能的模拟真实环境的数据进行并发测试,找有经验的同事进行代码审查也是非常有必要的。