1 #include "stdafx.h"
2 #include <process.h>
3 #include <stdio.h>
4 #include <Windows.h>
5 #include <stdlib.h>
6 #include <assert.h>
7
8 #define MAX_VIDEO_BUFF 1024
9
10 struct Header
11 {
12 WORD wSize;
13 char data[0];
14 };
15
16 #define TEST_MSG_MAX_SIZE 128
17 #define MAX_VIDEO_ELEMENT (TEST_MSG_MAX_SIZE + sizeof(Header))
18
19 class VideoQueue
20 {
21 public:
22 VideoQueue() : r(0), w(0), buf(0), buf_extend(0) {}
23
24 //////////////////////////////////////////////////////////////////////
25 //////////////////// WRITE ///////////////////////////////////////////
26 //////////////////////////////////////////////////////////////////////
27
28 bool Write(const char* p, int len)
29 {
30 if (GetSpaceSize() < len)
31 return false;
32
33 int toendlen = MAX_VIDEO_BUFF - w;
34 if (toendlen >= len)
35 memcpy(&buf[w], p, len);
36 else
37 {
38 memcpy(&buf[w], p, toendlen);
39 memcpy(&buf[0], p + toendlen, len - toendlen);
40 }
41
42 w = (w + len) % MAX_VIDEO_BUFF;
43 return true;
44 }
45 bool IsFull() {
46 return ((w + 1) % MAX_VIDEO_BUFF == r);
47 }
48 int GetSpaceSize()
49 {
50 int rt = r + MAX_VIDEO_BUFF - w - 1;
51 if (rt >= MAX_VIDEO_BUFF)
52 rt -= MAX_VIDEO_BUFF;
53 return rt;
54 }
55
56 //////////////////////////////////////////////////////////////////////
57 //////////////////// READ ////////////////////////////////////////////
58 //////////////////////////////////////////////////////////////////////
59
60 const char* GetDataPtr(int len)
61 {
62 if (GetDataLength() < len)
63 return 0;
64
65 if (len > MAX_VIDEO_ELEMENT)
66 return 0;
67
68 int toendlen = MAX_VIDEO_BUFF - r;
69 if (toendlen >= len)
70 return &buf[r];
71 else
72 {
73 memcpy(buf_extend, &buf[r], toendlen);
74 memcpy(((char*)buf_extend) + toendlen, &buf[0], len - toendlen);
75 return (const char*)buf_extend;
76 }
77 }
78 bool IsEmpty() {
79 return (w == r);
80 }
81 bool Skip(int len)
82 {
83 assert(GetDataLength() >= len);
84 r = (r + len) % MAX_VIDEO_BUFF;
85 return true;
86 }
87 int GetDataLength()
88 {
89 int rt = w + MAX_VIDEO_BUFF - r;
90 if (rt >= MAX_VIDEO_BUFF)
91 rt -= MAX_VIDEO_BUFF;
92 return rt;
93 }
94
95
96 //////
97 bool CreateBuff()
98 {
99 if (buf == 0)
100 buf = (char*)malloc(MAX_VIDEO_BUFF);
101
102 if (buf_extend == 0)
103 buf_extend = (char*)malloc(MAX_VIDEO_ELEMENT);
104
105 return (buf != 0);
106 }
107
108 private:
109 char* buf;//数据实际上只能写入MAX_BUFF-1
110 int r;
111 int w;
112 char* buf_extend;
113 };
114
115
116 VideoQueue* q = 0;
117
118 CRITICAL_SECTION cs;
119
120 #define TEST_MSG_COUNT 100000
121 void ThreadA(void*)
122 {
123 // 创建消息,不断向队列里填充
124 char s_Buff[TEST_MSG_MAX_SIZE + sizeof(Header)];
125 Header *pHeader = (Header *)s_Buff;
126 int iLeftCount = 0;
127 int iPos = 0;
128 int iSendSize = 0;
129 int iPackIndex = 0;
130 int iTotalSize = 0;
131
132 while (1)
133 {
134 //EnterCriticalSection(&cs);
135
136 if (q->IsFull())
137 {
138 //LeaveCriticalSection(&cs);
139 //Sleep(0);
140 continue;
141 }
142
143 if (iLeftCount == 0)
144 {
145 if (iPackIndex++ >= TEST_MSG_COUNT)
146 {
147 //LeaveCriticalSection(&cs);
148 break;
149 }
150
151 // Create a Msg;
152 pHeader->wSize = rand() % (TEST_MSG_MAX_SIZE - 32) + 32;
153 for (WORD i = 0; i < pHeader->wSize; ++i)
154 pHeader->data[i] = char(pHeader->wSize + i);
155 iTotalSize += pHeader->wSize;
156 iLeftCount = pHeader->wSize + sizeof(Header);
157 iPos = 0;
158 }
159
160 int iSpace = q->GetSpaceSize();
161 if (iLeftCount > TEST_MSG_MAX_SIZE / 4)
162 iSendSize = rand() % iLeftCount;
163 else
164 iSendSize = iLeftCount;
165
166 if (iSpace < iSendSize)
167 {
168 assert(q->Write(s_Buff + iPos, iSpace));
169 iLeftCount -= iSpace;
170 iPos += iSpace;
171 }
172 else
173 {
174 assert(q->Write(s_Buff + iPos, iSendSize));
175 iLeftCount -= iSendSize;
176 iPos += iSendSize;
177 }
178
179 //LeaveCriticalSection(&cs);
180 //Sleep(0);
181 }
182 printf("Send %d Msg, Size:%d!\n", TEST_MSG_COUNT, iTotalSize);
183
184 // __EndThreadEx
185 }
186
187 void ThreadB(void*)
188 {
189 // 不断从队列里读消息
190 Header *pHeader = NULL;
191
192 int iMsgCount = 0;
193 int iTotalSize = 0;
194 int iErrorPack = 0;
195
196 while (1)
197 {
198 //EnterCriticalSection(&cs);
199 if (q->IsEmpty())
200 {
201 //LeaveCriticalSection(&cs);
202 //Sleep(0);
203 continue;
204 }
205
206 pHeader = (Header *)q->GetDataPtr(sizeof(Header));
207 if (pHeader)
208 {
209 pHeader = (Header *)q->GetDataPtr(sizeof(Header) + pHeader->wSize);
210 if (pHeader)
211 {
212 for (WORD i = 0; i < pHeader->wSize; ++i)
213 if (pHeader->data[i] != char(pHeader->wSize + i))
214 {
215 iErrorPack++;
216 break;
217 }
218
219 iMsgCount ++;
220 iTotalSize += pHeader->wSize;
221 q->Skip(sizeof(Header) + pHeader->wSize);
222
223 if (iMsgCount >= TEST_MSG_COUNT)
224 {
225 //LeaveCriticalSection(&cs);
226 break;
227 }
228 }
229 }
230 //LeaveCriticalSection(&cs);
231 //Sleep(0);
232 }
233 printf("Recv %d Msg, Size:%d, ErrorPack:%d!\n", TEST_MSG_COUNT, iTotalSize,iErrorPack);
234
235 // __EndThreadEx
236 }
237
238 int _tmain(int argc, _TCHAR* argv[])
239 {
240 srand(1);
241 q = new VideoQueue;
242 if (!q->CreateBuff())
243 {
244 printf("createbuff fail\n");
245 getchar();
246 return 0;
247 }
248 InitializeCriticalSection(&cs);
249
250 HANDLE hWrite = (HANDLE)_beginthread(ThreadA, 0, 0);
251 HANDLE hRead = (HANDLE)_beginthread(ThreadB, 0, 0);
252
253 WaitForSingleObject(hWrite, INFINITE);
254 WaitForSingleObject(hRead, INFINITE);
255
256 getchar();
257
258 return 0;
259 }
无锁环形队列
时间: 2024-12-26 11:16:46