多线程环境下队列操作之锁的教训

  之前一直在研究多线程环境下的编程方法,却很少实战体验,以至于我一提到多线程编程,我总是信心不足,又总是说不出到底哪里不明白。今天工程现场反馈了一个“老问题”,我一直担心的是DAServer的运行机制有什么我不明白的地方,DAS Toolkit中总有一部分是我没有仔细研究的,在我心中有阴影,所以工程出了问题我第一反应就是“会不会问题出在阴影里?”。结果,至今为止,我总结起来问题90%都是在自己编码部分。

  我的DAServer中有一个需求,就是上送某个定点数据的速度不能太快,否则后台接收不过来。于是,我设计了一个类,包含了两条队列,其中一条队列是需要上送的数据,另一条队列是历史上曾经更新过的点记录及上送时间。每次组装报文时需要检查队列中是否有某个点,以及该点是否在1.5秒内“曾经上送过”。如果上送过就等下次再来检查,如果没有上送过则上送此信息并记录一下上送时间。

  我在设计这个类时一直在关注逻辑,却忽略了这是一个多线程环境的应用——装入队列数据的线程和取出队列数据的线程是不同的。好吧,只能说明我们的应用场景太低端,多线程争用资源的场景不是太频繁,以至于我过了这么久才明白过来。这个类的设计代码如下:

 1 struct SUpdateValue
 2 {
 3     int m_dataID;            // 数据点号
 4     DASVariant m_dataValue;    // 数据值
 5     SUpdateValue* next;
 6
 7     SUpdateValue() : m_dataID(0), next(NULL){ }
 8     SUpdateValue(int id, DASVariant& val) : m_dataID(id), m_dataValue(val), next(NULL){    }
 9 };
10
11 struct SUpdateTime
12 {
13     int dataID;            // 数据点号
14     long lastSendTime;    // 上次上送缓冲数据的时间戳(毫秒)
15     SUpdateTime* next;
16
17     SUpdateTime() : dataID(0), lastSendTime(0), next(NULL){}
18 };
19
20 class CDelayQueue
21 {
22 public:
23     CDelayQueue();
24     ~CDelayQueue();
25     bool EnqueFrame(int dataID, DASVariant& dataValue);        // 向延迟队列中装入新数据
26     bool DequeFrame(int dataID, DASVariant& dataValue);        // 从延迟队列中取出指定的数据
27     void DelayUpdateTime(int dataID);    // 设定数据点的更新时间戳,一般用于防止过快上送
28     int GetFrameCount(void);            // 获取延迟队列中的对象个数
29     string GetFrameList(void);            // 获取对象名称列表,以逗号分隔
30
31 protected:
32     bool AskForUpdate(int dataID);        // 申请上送新数据
33
34 private:
35     SUpdateValue* m_pHead;        // 需要延迟发送的数据队列
36     SUpdateValue* m_pTail;
37     SUpdateTime* m_pTimeHead;    // 已经发送的数据队列的历史记录(超时后失去记录)
38 };

  实现部分如下:

  1 CDelayQueue::CDelayQueue() : m_pHead(NULL), m_pTail(NULL), m_pTimeHead(NULL)
  2 {
  3 }
  4
  5 CDelayQueue::~CDelayQueue()
  6 {
  7     while (NULL != m_pHead)
  8     {
  9         SUpdateValue* p = m_pHead;
 10         m_pHead = m_pHead->next;
 11         delete p;
 12     }
 13
 14     while (NULL != m_pTimeHead)
 15     {
 16         SUpdateTime* p = m_pTimeHead;
 17         m_pTimeHead = m_pTimeHead->next;
 18         delete p;
 19     }
 20 }
 21
 22 bool CDelayQueue::EnqueFrame(int dataID, DASVariant& dataValue)
 23 {
 24     SUpdateValue* pNew = new SUpdateValue(dataID, dataValue);
 25     if (NULL == pNew)
 26     {
 27         return false;
 28     }
 29
 30     if (NULL == m_pHead)
 31     {
 32         m_pHead = m_pTail = pNew;
 33     }
 34     else
 35     {
 36         m_pTail->next = pNew;
 37         m_pTail = m_pTail->next;
 38     }
 39
 40     return true;
 41 }
 42
 43 bool CDelayQueue::DequeFrame(int dataID, DASVariant& dataValue)
 44 {
 45     if (NULL == m_pHead)
 46     {
 47         return false;
 48     }
 49
 50     // 检查队列中是否存在该点
 51     if (m_pHead->m_dataID == dataID)
 52     {
 53         if (AskForUpdate(dataID))
 54         {
 55             dataValue = m_pHead->m_dataValue;
 56             SUpdateValue* pDel = m_pHead;
 57             m_pHead = m_pHead->next;
 58             delete pDel;
 59             return true;
 60         }
 61         return false;
 62     }
 63
 64     SUpdateValue* pPre = m_pHead;
 65     SUpdateValue* pValue = m_pHead->next;
 66     while (pValue != NULL)
 67     {
 68         if (pValue->m_dataID == dataID)
 69         {
 70             if (AskForUpdate(pValue->m_dataID))
 71             {
 72                 dataValue = pValue->m_dataValue;
 73                 pPre->next = pValue->next;
 74                 delete pValue;
 75                 return true;
 76             }
 77             return false;
 78         }
 79         pPre = pValue;
 80         pValue = pPre->next;
 81     }
 82
 83     return false;
 84 }
 85
 86 bool CDelayQueue::AskForUpdate(int dataID)
 87 {
 88     long curTime = GetTickCount();
 89
 90     // 检查是否在短时间内更新过该数据点
 91     SUpdateTime* pList = m_pTimeHead;
 92     while (pList)
 93     {
 94         if (pList->dataID == dataID)
 95         {
 96             if ((curTime - pList->lastSendTime) < 1500)        // xiaoku
 97             {
 98                 return false;
 99             }
100             pList->lastSendTime = curTime;
101             return true;
102         }
103         pList = pList->next;
104     }
105
106     // 如果记录中没有目标点,则创建历史记录
107     if (NULL == pList)
108     {
109         pList = new SUpdateTime();
110         pList->dataID = dataID;
111         pList->lastSendTime = curTime;
112         pList->next = m_pTimeHead;
113         m_pTimeHead = pList;
114     }
115
116     return true;
117 }
118
119 void CDelayQueue::DelayUpdateTime(int dataID)
120 {
121     long curTime = ::GetTickCount();
122     SUpdateTime* pList = m_pTimeHead;
123     while (pList)
124     {
125         if (pList->dataID == dataID)
126         {
127             pList->lastSendTime = curTime - 500;
128             return ;
129         }
130         pList = pList->next;
131     }
132
133     // 如果记录中没有目标点,则创建历史记录
134     if (NULL == pList)
135     {
136         pList = new SUpdateTime();
137         pList->dataID = dataID;
138         pList->lastSendTime = curTime - 500;
139         pList->next = m_pTimeHead;
140         m_pTimeHead = pList;
141     }
142 }
143
144 int CDelayQueue::GetFrameCount()
145 {
146     if (NULL == m_pHead)
147     {
148         return 0;
149     }
150
151     int count = 1;
152     SUpdateValue* p = m_pHead;
153     while(p != m_pTail)
154     {
155         ++count;
156         p = p->next;
157     }
158     return count;
159 }
160
161 string CDelayQueue::GetFrameList()
162 {
163     string strNameList("");
164     if (NULL == m_pHead)
165     {
166         return strNameList;
167     }
168     SUpdateValue* p = m_pHead;
169     char szData[16];
170     sprintf_s(szData, 16, "%d", p->m_dataID);
171     strNameList += szData;
172     while(p != m_pTail)
173     {
174         sprintf_s(szData, 16, "%d", p->next->m_dataID);
175         strNameList += ";";
176         strNameList += szData;
177         p = p->next;
178     }
179
180     return strNameList;
181 }

  最关键两个接口是EnqueFrame() 和 DequeFrame() ,分别是装入队列和从队列中取数。都是在操作队列,如果是不同的线程,怎么能不考虑线程互斥的问题呢?好吧,迅速引入LockerGuard。

  这个失败的例子放在这里警示一下自己。

时间: 2024-07-30 10:08:22

多线程环境下队列操作之锁的教训的相关文章

SQLite在多线程环境下的应用

这几天研究了一下SQLite这个嵌入式数据库在多线程环境下的应用,感觉里面的学问还挺多,于是就在此分享一下. AD: 2014WOT全球软件技术峰会北京站 课程视频发布 先说下初衷吧,实际上我经常看到有人抱怨SQLite不支持多线程.而在iOS开发时,为了不阻塞主线程,数据库访问必须移到子线程中.为了解决这个矛盾,很有必要对此一探究竟. 关于这个问题,最权威的解答当然是SQLite官网上的“Is SQLite threadsafe?”这个问答. 简单来说,从3.3.1版本开始,它就是线程安全的了

C++多线程环境下的构造函数

多线程的环境里,我们总不可避免要使用锁.于是一个常见的场景就是: 1 class ObjectWithLock 2 { 3 private: 4 std::mutex mtx_; 5 SomeResType shared_res_; 6 7 public: 8 // Constructor/Destructor 9 … 10 11 void OpOnSharedRes() 12 { 13 std::lock_guard<std::mutex> lock(mtx_); 14 15 // read

Java读写锁,多线程环境下提升效率

读写锁 package cn.sniper.thread.lock; import java.util.HashMap; import java.util.Map; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; public class Cache {

单例模式在多线程环境下的lazy模式为什么要加两个if(instance==null)

刚才在看阿寻的博客”C#设计模式学习笔记-单例模式“时,发现了评论里有几个人在问单例模式在多线程环境下为什么lazy模式要加两个if进行判断,评论中的一个哥们剑过不留痕,给他们写了一个demo来告诉他们为什么. 我看了一下这个demo,确实说明了这个问题,但我认为不够直观,呵呵,于是我就稍微的改了一下. 这是剑过不留痕的demo using System; using System.Threading; namespace SingletonPattern { class Program { s

设计模式——单例模式(Java)——考虑多线程环境下的线程安全问题

设计模式--单例模式(Java)--考虑多线程环境下的线程安全问题 一:单例模式概念 单例模式是一种常用的软件设计模式.在它的核心结构中只包含一个被称为单例的特殊类.通过单例模式可以保证系统中一个类只有一个实例 二:单例模式的实现方式 特别注意,在多线程环境下,需要对获取对象实例的方法加对象锁(synchronized) 方式一:(懒汉式)程序执行过程中需要这个类的对象,再实例化这个类的对象 步骤: 1.定义静态私有对象 2.构造方法私有化保证在类的外部无法实例化该类的对象 3.定义对外开放的静

多线程环境下的线程不安全问题(1)

在不考虑多线程的情况下,很多类代码都是完全正确的,但是如果放在多线程环境下,这些代码就很容易出错,我们称这些类为 线程不安全类 .多线程环境下使用线程安全类 才是安全的. 下面是一个线程不安全类的例子: public class Account { private Integer balance; public Account(Integer balance) { super(); this. balance = balance; } public Integer getBalance() {

UNIX多线程环境下屏障功能(barrier)浅析

说起屏障这个东西,相信对于大多数朋友来说比较陌生,不过要是说起pthread_join这个函数,相信都比较熟悉.我们通常使用这个函数来等待其它线程结束,例如主线程创建一些线程,这些线程去完成一些工作,而主线程需要去等待这些线程结束.其实pthread_join就实现了一种屏障.我们可以对屏障这样理解,把屏障理解为为了协同线程之间的工作而使得某一具体线程进入等待状态的一种机制.下面我们来看看UNIX为我们提供的一个屏障——barrier. 注:与所有的线程函数一样,使用屏障barrier时需要包含

多线程环境下慎用静态变量

最近在修复一个旧的互联网应用bug,问题是程序有时候会自动拼接参数,比如正确的参数应该是f(\\d)-f(\\d)-f(\\d),而实际情况可能会出现f(\\d)-f(\\d)-f(\\d)-f(\\d)-f(\\d).查找bug的时候,从页面入手,然后研究逻辑处理,url生成规则......不断地调试,实验,应用在线下一切正常,但到了线上就会出现问题.线上页有时候刷新一下则又恢复正常.仔细研究了一下程序的逻辑,发现应该是没问题的.最后在某个地方发现程序调用了静态变量.我们都知道,程序一开始会为

HttpClient在多线程环境下踩坑总结

问题现场 在多线程环境下使用HttpClient组件对某个HTTP服务发起请求,运行一段时间之后发现客户端主机CPU利用率呈现出下降趋势,而不是一个稳定的状态. 而且,从程序日志中判断有线程处于夯住的状态,应该是被阻塞了. 问题排查 一开始找不到原因,怀疑是多线程并发导致的死锁问题,但是通过代码审查并未定位到任何可能的多线程并发问题. 甚至开始怀疑是否是因为内存资源不够引起JVM频繁GC到导致业务线程被暂停,但是从GC的日志输出结果看,GC是正常的. 于是,进入一种丈二和尚摸不着头脑头脑的状态,