无锁环形队列


  1 #include "stdafx.h"
2 #include <process.h>
3 #include <stdio.h>
4 #include <Windows.h>
5 #include <stdlib.h>
6 #include <assert.h>
7
8 #define MAX_VIDEO_BUFF 1024
9
10 struct Header
11 {
12 WORD wSize;
13 char data[0];
14 };
15
16 #define TEST_MSG_MAX_SIZE 128
17 #define MAX_VIDEO_ELEMENT (TEST_MSG_MAX_SIZE + sizeof(Header))
18
19 class VideoQueue
20 {
21 public:
22 VideoQueue() : r(0), w(0), buf(0), buf_extend(0) {}
23
24 //////////////////////////////////////////////////////////////////////
25 //////////////////// WRITE ///////////////////////////////////////////
26 //////////////////////////////////////////////////////////////////////
27
28 bool Write(const char* p, int len)
29 {
30 if (GetSpaceSize() < len)
31 return false;
32
33 int toendlen = MAX_VIDEO_BUFF - w;
34 if (toendlen >= len)
35 memcpy(&buf[w], p, len);
36 else
37 {
38 memcpy(&buf[w], p, toendlen);
39 memcpy(&buf[0], p + toendlen, len - toendlen);
40 }
41
42 w = (w + len) % MAX_VIDEO_BUFF;
43 return true;
44 }
45 bool IsFull() {
46 return ((w + 1) % MAX_VIDEO_BUFF == r);
47 }
48 int GetSpaceSize()
49 {
50 int rt = r + MAX_VIDEO_BUFF - w - 1;
51 if (rt >= MAX_VIDEO_BUFF)
52 rt -= MAX_VIDEO_BUFF;
53 return rt;
54 }
55
56 //////////////////////////////////////////////////////////////////////
57 //////////////////// READ ////////////////////////////////////////////
58 //////////////////////////////////////////////////////////////////////
59
60 const char* GetDataPtr(int len)
61 {
62 if (GetDataLength() < len)
63 return 0;
64
65 if (len > MAX_VIDEO_ELEMENT)
66 return 0;
67
68 int toendlen = MAX_VIDEO_BUFF - r;
69 if (toendlen >= len)
70 return &buf[r];
71 else
72 {
73 memcpy(buf_extend, &buf[r], toendlen);
74 memcpy(((char*)buf_extend) + toendlen, &buf[0], len - toendlen);
75 return (const char*)buf_extend;
76 }
77 }
78 bool IsEmpty() {
79 return (w == r);
80 }
81 bool Skip(int len)
82 {
83 assert(GetDataLength() >= len);
84 r = (r + len) % MAX_VIDEO_BUFF;
85 return true;
86 }
87 int GetDataLength()
88 {
89 int rt = w + MAX_VIDEO_BUFF - r;
90 if (rt >= MAX_VIDEO_BUFF)
91 rt -= MAX_VIDEO_BUFF;
92 return rt;
93 }
94
95
96 //////
97 bool CreateBuff()
98 {
99 if (buf == 0)
100 buf = (char*)malloc(MAX_VIDEO_BUFF);
101
102 if (buf_extend == 0)
103 buf_extend = (char*)malloc(MAX_VIDEO_ELEMENT);
104
105 return (buf != 0);
106 }
107
108 private:
109 char* buf;//数据实际上只能写入MAX_BUFF-1
110 int r;
111 int w;
112 char* buf_extend;
113 };
114
115
116 VideoQueue* q = 0;
117
118 CRITICAL_SECTION cs;
119
120 #define TEST_MSG_COUNT 100000
121 void ThreadA(void*)
122 {
123 // 创建消息,不断向队列里填充
124 char s_Buff[TEST_MSG_MAX_SIZE + sizeof(Header)];
125 Header *pHeader = (Header *)s_Buff;
126 int iLeftCount = 0;
127 int iPos = 0;
128 int iSendSize = 0;
129 int iPackIndex = 0;
130 int iTotalSize = 0;
131
132 while (1)
133 {
134 //EnterCriticalSection(&cs);
135
136 if (q->IsFull())
137 {
138 //LeaveCriticalSection(&cs);
139 //Sleep(0);
140 continue;
141 }
142
143 if (iLeftCount == 0)
144 {
145 if (iPackIndex++ >= TEST_MSG_COUNT)
146 {
147 //LeaveCriticalSection(&cs);
148 break;
149 }
150
151 // Create a Msg;
152 pHeader->wSize = rand() % (TEST_MSG_MAX_SIZE - 32) + 32;
153 for (WORD i = 0; i < pHeader->wSize; ++i)
154 pHeader->data[i] = char(pHeader->wSize + i);
155 iTotalSize += pHeader->wSize;
156 iLeftCount = pHeader->wSize + sizeof(Header);
157 iPos = 0;
158 }
159
160 int iSpace = q->GetSpaceSize();
161 if (iLeftCount > TEST_MSG_MAX_SIZE / 4)
162 iSendSize = rand() % iLeftCount;
163 else
164 iSendSize = iLeftCount;
165
166 if (iSpace < iSendSize)
167 {
168 assert(q->Write(s_Buff + iPos, iSpace));
169 iLeftCount -= iSpace;
170 iPos += iSpace;
171 }
172 else
173 {
174 assert(q->Write(s_Buff + iPos, iSendSize));
175 iLeftCount -= iSendSize;
176 iPos += iSendSize;
177 }
178
179 //LeaveCriticalSection(&cs);
180 //Sleep(0);
181 }
182 printf("Send %d Msg, Size:%d!\n", TEST_MSG_COUNT, iTotalSize);
183
184 // __EndThreadEx
185 }
186
187 void ThreadB(void*)
188 {
189 // 不断从队列里读消息
190 Header *pHeader = NULL;
191
192 int iMsgCount = 0;
193 int iTotalSize = 0;
194 int iErrorPack = 0;
195
196 while (1)
197 {
198 //EnterCriticalSection(&cs);
199 if (q->IsEmpty())
200 {
201 //LeaveCriticalSection(&cs);
202 //Sleep(0);
203 continue;
204 }
205
206 pHeader = (Header *)q->GetDataPtr(sizeof(Header));
207 if (pHeader)
208 {
209 pHeader = (Header *)q->GetDataPtr(sizeof(Header) + pHeader->wSize);
210 if (pHeader)
211 {
212 for (WORD i = 0; i < pHeader->wSize; ++i)
213 if (pHeader->data[i] != char(pHeader->wSize + i))
214 {
215 iErrorPack++;
216 break;
217 }
218
219 iMsgCount ++;
220 iTotalSize += pHeader->wSize;
221 q->Skip(sizeof(Header) + pHeader->wSize);
222
223 if (iMsgCount >= TEST_MSG_COUNT)
224 {
225 //LeaveCriticalSection(&cs);
226 break;
227 }
228 }
229 }
230 //LeaveCriticalSection(&cs);
231 //Sleep(0);
232 }
233 printf("Recv %d Msg, Size:%d, ErrorPack:%d!\n", TEST_MSG_COUNT, iTotalSize,iErrorPack);
234
235 // __EndThreadEx
236 }
237
238 int _tmain(int argc, _TCHAR* argv[])
239 {
240 srand(1);
241 q = new VideoQueue;
242 if (!q->CreateBuff())
243 {
244 printf("createbuff fail\n");
245 getchar();
246 return 0;
247 }
248 InitializeCriticalSection(&cs);
249
250 HANDLE hWrite = (HANDLE)_beginthread(ThreadA, 0, 0);
251 HANDLE hRead = (HANDLE)_beginthread(ThreadB, 0, 0);
252
253 WaitForSingleObject(hWrite, INFINITE);
254 WaitForSingleObject(hRead, INFINITE);
255
256 getchar();
257
258 return 0;
259 }

无锁环形队列

时间: 2024-12-26 11:16:46

无锁环形队列的相关文章

眉目传情之并发无锁环形队列的实现

眉目传情之并发无锁环形队列的实现 Author:Echo Chen(陈斌) Email:[email protected] Blog:Blog.csdn.net/chen19870707 Date:October 10th, 2014 前面在<眉目传情之匠心独运的kfifo>一文中详细解析了 linux  内核并发无锁环形队列kfifo的原理和实现,kfifo鬼斧神工,博大精深,让人叹为观止,但遗憾的是kfifo为内核提供服务,并未开放出来.剑不试则利钝暗,弓不试则劲挠诬,鹰不试则巧拙惑,马不

再看无锁环形队列

今天通过打印头和尾的值来看里面的规律,数学不好真看不懂下面的意思 int size = (m_nTail - m_nHead + MAX_LEN) % MAX_LEN; int size = (m_nHead - m_nTail + MAX_LEN - 1) % MAX_LEN; 但通过打印头和尾的值能发现其中的规律 队列为空:当头和尾的值相等时表示队列是空的 队列为满:当头和尾相差为1时,当然是尾索引向右移动一格时正好到达头部这种情况 对于为什么是线程安全,因为读线程只操作头指针,写线程只操作

生产者消费者模式下的并发无锁环形缓冲区

上一篇记录了几种环形缓冲区的设计方法和环形缓冲区在生产者消费者模式下的使用(并发有锁),这一篇主要看看怎么实现并发无锁. 0.简单的说明 首先对环形缓冲区做下说明: 环形缓冲区使用改进的数组版本,缓冲区容量为2的幂 缓冲区满阻塞生产者,消费者进行消费后,缓冲区又有可用资源,由消费者唤醒生产者 缓冲区空阻塞消费者,生产者进程生产后,缓冲区又有可用资源,由生产者唤醒消费者 然后对涉及到的几个技术做下说明: ⑴CAS,Compare & Set,X86下对应的是CMPXCHG 汇编指令,原子操作,基本

环形缓冲区的设计及其在生产者消费者模式下的使用(并发有锁环形队列)

1.环形缓冲区 缓冲区的好处,就是空间换时间和协调快慢线程.缓冲区可以用很多设计法,这里说一下环形缓冲区的几种设计方案,可以看成是几种环形缓冲区的模式.设计环形缓冲区涉及到几个点,一是超出缓冲区大小的的索引如何处理,二是如何表示缓冲区满和缓冲区空,三是如何入队.出队,四是缓冲区中数据长度如何计算. ps.规定以下所有方案,在缓冲区满时不可再写入数据,缓冲区空时不能读数据 1.1.常规数组环形缓冲区 设缓冲区大小为N,队头out,队尾in,out.in均是下标表示: 初始时,in=out=0 队头

一个无锁消息队列引发的血案:怎样做一个真正的程序员?(二)——月:自旋锁

前续 一个无锁消息队列引发的血案:怎样做一个真正的程序员?(一)——地:起因 一个无锁消息队列引发的血案:怎样做一个真正的程序员?(二)——月:自旋锁 平行时空 在复制好上面那一行我就先停下来了,算是先占了个位置,虽然我知道大概要怎么写,不过感觉还是很乱. 我突然想到,既然那么纠结,那么混乱,那么不知所措,我们不如换个视角.记得高中时看过的为数不多的长篇小说<穆斯林的葬礼>,作者是:霍达(女),故事描写了两个发生在不同时代.有着不同的内容却又交错扭结的爱情悲剧,一个是“玉”的故事,一个是“月”

一个无锁消息队列引发的血案(六)——RingQueue(中) 休眠的艺术 [续]

目录 (一)起因 (二)混合自旋锁 (三)q3.h 与 RingBuffer (四)RingQueue(上) 自旋锁 (五)RingQueue(中) 休眠的艺术 (六)RingQueue(中) 休眠的艺术 [续] 开篇 这是第五篇的后续,这部分的内容同时会更新和添加在 第五篇:RingQueue(中) 休眠的艺术 一文的末尾. 归纳 紧接上一篇的末尾,我们把 Windows 和 Linux 下的休眠策略归纳总结一下,如下图: 我们可以看到,Linux 下的 sched_yield() 虽然包括了

linux内核无锁缓冲队列kfifo原理

Linux kernel里面从来就不缺少简洁,优雅和高效的代码 比如,通过限定写入的数据不能溢出和内存屏障实现在单线程写单线程读的情况下不使用锁.因为锁是使用在共享资源可能存在冲突的情况下.还用设置buffer缓冲区的大小为2的幂次方,以简化求模运算,这样求模运算就演变为 (fifo->in & (fifo->size - 1)).通过使用unsigned int为kfifo的下标,可以不用考虑每次下标超过size时对下表进行取模运算赋值,这里使用到了无符号整数的溢出回零的特性.由于指

无锁队列--基于linuxkfifo实现

一直想写个无锁的队列,来提高项目后台的效率. 偶然看到linux内核的kfifo.h 实现原理.于是自己仿照了这个实现,目前linux应该是可以对外提供接口了. #ifndef _NO_LOCK_QUEUE_H_ #define _NO_LOCK_QUEUE_H_ #include <stdlib.h> #include <stdio.h> #include <string.h> #include <string> #include <pthread.

基于无锁队列和c++11的高性能线程池

基于无锁队列和c++11的高性能线程池线程使用c++11库和线程池之间的消息通讯使用一个简单的无锁消息队列适用于linux平台,gcc 4.6以上 标签: <无> 代码片段(6)[全屏查看所有代码] 1. [代码]lckfree.h ? 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 // lckfree.h //