使用内存映射开发高性能进程间消息通信组件

一、背景

  项目开发中免不了各模块或系统之间进行消息通信,目前热门的消息中间件有Redis、RabbitMQ、Kafka、RocketMQ等等。

以上几种组件中Redis在消息队列方面表现还可以,但是如果涉及发布订阅功能,就不行了,最近项目就使用了redis的发布订阅,

每秒只能发出几千条,虽然目前绰绰有余,但是瓶颈可以预期。

  其余的几种都是比较重量级的消息中间件,什么跨平台、分布式、集群、支持N种协议等等,很高大全,

我们可能就只使用了其中1、2个功能。严格来说,项目中集成这几种MQ的工作量是不小的,对于中小型系统来说,可能维护MQ

稳定的工作量都比项目还大,难度也高,所有功能用全了的程序员恐怕不多。

  从长远考虑出发,选择重量级MQ恐怕是板上钉钉的事,但是项目一开始就上这几种,我觉得那也是欠缺考虑的。如果项目

根本不要求跨机器通信,那杀鸡就不要用牛刀了。比如,你只是在模块之间、线程之间、进程之间,或者是在同一主机的各种不同系统之间,

其实都可以不用重量级MQ。当然你使用了也没事,看个人选择。

  最近的项目有这么个场景,采集近所有底层设备,每个设备有点3000个,总共20多万个点需要采集上来。刚开始使用了Redis的发布订阅,

但是程序毫无疑问地挂了,根本带不起来;因为程序启动时每个点的值都是从0变成N,就需要发消息出来,那一开始消息是很多的,redis根本

处理不完,而且有很高频率的超时断线。以至于想换RabbitMQ,后来想想还是算了,因为那样增加项目难度不说,后期维护也是个难题。

说到底这是模块之间的通信,是主程序(Winform)调用采集C++的DLL类库,发出消息后主程序和web端订阅,在主程序与DLL这边,在DLL

方法上增加一个回调函数就搞定了,完全不用走消息中间件,Web端要哪些点的实时值就先ASK,先请求需要看哪些点,如何在主程序这边

发布那些点的实时值消息,这样发布订阅的数据量少了2、3个数量级不止。

二、需求

  针对上边的业务场景,因为是模块之间的线程间通信,这样搞问题不大;如果是进程之间也要那么高频率的通信,那就不好办了,我们

不想使用重量级MQ,又想高频率传输消息,怎么办呢?网上搜索了一番,貌似没看到有成熟的速度又快、体量又小,部署又简单的中间件。

所以在下不才,针对这个问题抛砖引玉,做一个demo出来供大家讨论一下。

三、原理

  应题,就是使用内存映射来做同一个机器下各种消息的通信,之前也写过一篇关于使用共享内存实现快速读写的文章,点击前往浏览

“.net环境下跨进程、高频率读写数据”,但是内存映射比较适合做消息队列,因为消息可以持久化在本地,没读完下次进来还可以接着读。

我预想是这样设计:

1、发布订阅涉及到2个主要方法:Publish(string channel)、Subscribe(string channel, Callback callback);

2、为每个channel生成一个文件:channel.db,默认每个db可以存储1000个同类型的结构体消息作为消息队列,从头部写入,尾部读出。

   每个db文件前面留一个索引区作为发布方与订阅方各自的读写位置。发布与订阅前,先读写这个索引区,因为是一对一读写,所以

可以完美避开读写锁,大大提高性能。

3、针对一对多需求,单独设计一个config.db文件存储种channel与其相关订阅信息,大概原理图如下:

4、解决读写不加锁问题

我们看结构体:SIndex有三个属性

1) WriteIndex 记录发布方(Pubish)最后写入数据的位置

2) ReadIndex 记录订阅方(Subscribe)最后读取数据的位置

3) Over 表示WriteIndex已达到队列最大值,再WriteIndex小于等于队列最大值前,读写如下图:

WriteIndex达到最大值后再往下写Over就要取反,如由False变为True。WriteIndex=0

如果此时没有订阅方,那新消息就会被抛弃,因为已无空间存储。

4) 如果ReadIndex数值到队列最大值,Over也取反,此时ReadIndex = 0,读写又变成图1所示

5) 读写过程中并不存在互斥的情况,只要管理好读写位置,就可以避免加锁。

四、接口设计

4.1、主要参数定义

#define FM_MAX_CHANNEL		100		// 暂定最多100个不同频道
#define FM_MAX_SUBSCRIBE	3		// 暂定最多3个订阅用户
#define FM_MAX_ROWS			1000	// 暂定最多队列大小为1000
#define FM_DISCONNECT_TIME  5000	// 超过5000毫秒无心跳更新视为订阅断开
#define FM_KEEP_CONN_CYCLE  1000	// 保持心跳连接的时间周期
#define FM_NOTHING			-1		// 空白,数组为0等
#define FM_WORD_SIZE		sizeof(WORD)	// WORD长度
#define FM_INDEX_SIZE		sizeof(SIndex)  // SIndex长度 

4.2、结构体

 1 // 索引
 2 typedef struct
 3 {
 4     WORD WriteIndex;
 5     WORD ReadIndex;
 6     WORD Over;         // 当W或R超过MAX一次,Over取反一次,Over默认为False
 7 }SIndex;
 8
 9 // 内存映射参数
10 typedef struct
11 {
12     HANDLE FileHandle;
13     HANDLE FileMappingHandle;
14     LPVOID MapViewOfFileHandle;
15     UINT StructSize;
16     char FileName[20];
17     UINT SubscribeIndex;
18     WORD Conned;
19 }SDbConnInfo;
20
21 // 频道
22 typedef struct
23 {
24     char ChannelName[20];
25     UINT StructSize;
26     DWORD Subscribe1LastTime;
27     DWORD Subscribe2LastTime;
28     DWORD Subscribe3LastTime;
29 }SChannel;
30
31 // 频道与订阅映射
32 typedef struct
33 {
34     char ChannelName[20];
35     SDbConnInfo DbConnInfo[FM_MAX_SUBSCRIBE];
36 }SChannelMapDbConnInfo;

4.3、主要方法

	// 发布信息
	template<typename T>
	int Publish(const char *channel, T* data);

	// 订阅信息
	template<typename T>
	void Subscribe(const char *channel, SubscribeCallBackHandle callback);

五、代码实现

5.1 、FMDBManager,主要管理内存映射相关操作,因为是读写位置不一样,所以不需要加互斥量

  1 class FMDBManager
  2 {
  3 public:
  4     FMDBManager() {};
  5     ~FMDBManager() {};
  6
  7 public:
  8     static int Create(SDbConnInfo *info)
  9     {
 10         CString fileName(info->FileName);
 11         DWORD totalSize = (FM_MAX_ROWS * info->StructSize) + FM_INDEX_SIZE;
 12
 13         info->FileHandle = CreateFile(fileName, (GENERIC_READ | GENERIC_WRITE), (FILE_SHARE_READ | FILE_SHARE_WRITE),
 14             NULL, OPEN_ALWAYS, FILE_FLAG_SEQUENTIAL_SCAN, NULL);
 15
 16         info->FileMappingHandle = CreateFileMapping(info->FileHandle, NULL, PAGE_READWRITE, 0, totalSize, NULL);
 17
 18         if(info->FileMappingHandle == NULL || info->FileMappingHandle == INVALID_HANDLE_VALUE)
 19         {
 20             Log("");
 21             CloseHandle(info->FileHandle);
 22             return enumFail;
 23         }
 24
 25         if(GetLastError() == ERROR_ALREADY_EXISTS)
 26         {
 27             Log("");
 28             return enumFail;
 29         }
 30
 31         // init
 32         info->MapViewOfFileHandle = MapViewOfFile(info->FileMappingHandle, FILE_MAP_ALL_ACCESS, 0, 0, totalSize);
 33
 34         if(info->MapViewOfFileHandle == NULL)
 35         {
 36             Log("");
 37             CloseHandle(info->FileMappingHandle);
 38             CloseHandle(info->FileHandle);
 39             return enumFail;
 40         }
 41
 42         return enumSuccess;
 43     }
 44
 45 protected:
 46     int Write(void *data, UINT order, SDbConnInfo *info)
 47     {
 48         if(info->MapViewOfFileHandle == NULL)
 49         {
 50             Log("");
 51             return enumFail;
 52         }
 53         else
 54             memcpy((char *)info->MapViewOfFileHandle + (order * info->StructSize) + FM_INDEX_SIZE, data, info->StructSize);
 55
 56         return enumSuccess;
 57     }
 58     int Read(void *data, UINT order, SDbConnInfo *info)
 59     {
 60         if(info->MapViewOfFileHandle == NULL)
 61         {
 62             Log("");
 63             return enumFail;
 64         }
 65         else
 66             memcpy(data, (char *)info->MapViewOfFileHandle + (order * info->StructSize) + FM_INDEX_SIZE, info->StructSize);
 67
 68         return enumSuccess;
 69     }
 70     int Delete(UINT order, SDbConnInfo *info)
 71     {
 72         if(info->MapViewOfFileHandle == NULL)
 73         {
 74             Log("");
 75             return enumFail;
 76         }
 77         else
 78             memset((char *)info->MapViewOfFileHandle + (order * info->StructSize) + FM_INDEX_SIZE, 0, info->StructSize);
 79
 80         return enumSuccess;
 81     }
 82
 83     int WriteConfig(void *data, UINT order, UINT pos, UINT size, SDbConnInfo *info)
 84     {
 85         if(info->MapViewOfFileHandle == NULL)
 86         {
 87             Log("");
 88             return enumFail;
 89         }
 90         else
 91             memcpy((char *)info->MapViewOfFileHandle + (order * info->StructSize) + FM_INDEX_SIZE + pos, data, size);
 92
 93         return enumSuccess;
 94     }
 95     int WriteIndex(void *data, UINT pos, UINT size, SDbConnInfo *info)
 96     {
 97         if(info->MapViewOfFileHandle == NULL)
 98         {
 99             Log("");
100             return enumFail;
101         }
102         else
103             memcpy((char *)info->MapViewOfFileHandle + pos, data, size);
104
105         return enumSuccess;
106     }
107     int ReadIndex(SIndex *sIndex, SDbConnInfo *info)
108     {
109         if(info->MapViewOfFileHandle == NULL)
110         {
111             Log("");
112             return enumFail;
113         }
114         else
115             memcpy(sIndex, (char *)info->MapViewOfFileHandle, FM_INDEX_SIZE);
116
117         return enumSuccess;
118     }
119 };

5.2、FMDBClient,内存映射客户端,主要封装Publish与Subscribe方法给前端调用,屏蔽复杂性

  1 class FMDBClient : public FMDBManager
  2 {
  3 private:
  4     mutable std::mutex mut;
  5     SChannelMapDbConnInfo channelMapDbConnInfo = { 0 };
  6
  7     bool CanWrite(SIndex *sIndex)
  8     {
  9         int nextWriteIndex = sIndex->WriteIndex + 1;
 10         if(nextWriteIndex > FM_MAX_ROWS) nextWriteIndex = 0;
 11
 12         return nextWriteIndex != sIndex->ReadIndex;
 13     }
 14     bool CanRead(SIndex *sIndex) {
 15         if(sIndex->Over) return sIndex->ReadIndex > sIndex->WriteIndex;
 16         else return sIndex->ReadIndex + 1 <= sIndex->WriteIndex;
 17     }
 18
 19     int GetDbConnInfo(const char *channel, int size)
 20     {
 21         int rest = enumFail;
 22
 23         for(int i = 0; i < FM_MAX_CHANNEL; i++)
 24         {
 25             char channelNameTmp[20] = { 0 };
 26             sprintf_s(channelNameTmp, "%s", channel);
 27
 28             if(0 == strcmp(channelNameTmp, channelMapDbConnInfoArray[i].ChannelName))
 29             {
 30                 channelMapDbConnInfo = channelMapDbConnInfoArray[i];
 31                 rest = enumSuccess;
 32                 break;
 33             }
 34         }
 35
 36         return rest;
 37     }
 38     int SetDbConnInfo(const char *channel, UINT *subscribeIndex, SDbConnInfo *dbConnInfo)
 39     {
 40         std::lock_guard<std::mutex> lk(mut);
 41
 42         int nextSubscribeIndex = fmdbConfig->GetNextSubscribeIndex(channel);
 43         if(nextSubscribeIndex == FM_NOTHING)
 44         {
 45             SChannel sChannel = { 0 };
 46             sprintf_s(sChannel.ChannelName, "%s", channel);
 47             sChannel.Subscribe1LastTime = GetTickCount();
 48             sChannel.StructSize = dbConnInfo->StructSize;
 49
 50             sprintf_s(dbConnInfo->FileName, "%s.1.db", channel);
 51             if(!fmdbConfig->IsFM_NOTHING(dbConnInfo->FileName) && dbConnInfo->StructSize > 0)
 52             {
 53                 dbConnInfo->SubscribeIndex = 1;
 54                 *subscribeIndex = dbConnInfo->SubscribeIndex;
 55
 56                 if(Create(dbConnInfo) == enumSuccess) return fmdbConfig->Insert(&sChannel);
 57                 else sprintf_s(dbConnInfo->FileName, "%s", channel); //还原名称
 58             }
 59         }
 60
 61         if(nextSubscribeIndex > 1)
 62         {
 63             sprintf_s(dbConnInfo->FileName, "%s.%d.db", channel, nextSubscribeIndex);
 64             if(!fmdbConfig->IsFM_NOTHING(dbConnInfo->FileName) && dbConnInfo->StructSize > 0)
 65             {
 66                 dbConnInfo->SubscribeIndex = nextSubscribeIndex;
 67                 *subscribeIndex = nextSubscribeIndex;
 68
 69                 if(Create(dbConnInfo) == enumSuccess) return fmdbConfig->Save(channel, nextSubscribeIndex);
 70                 else sprintf_s(dbConnInfo->FileName, "%s", channel); //还原名称
 71             }
 72         }
 73
 74         return enumFail;
 75     }
 76     bool SetSubscribeConned(const char *channel, int subscribeIndex, SDbConnInfo *dbConnInfo)
 77     {
 78         int rest = enumFail;
 79
 80         if(subscribeIndex <= 0) return rest;
 81
 82         for(int i = 0; i < FM_MAX_CHANNEL; i++)
 83         {
 84             char channelNameTmp[20] = { 0 };
 85             sprintf_s(channelNameTmp, "%s", channel);
 86
 87             if(0 == strcmp(channelNameTmp, channelMapDbConnInfoArray[i].ChannelName))
 88             {
 89                 channelMapDbConnInfoArray[i].DbConnInfo[subscribeIndex - 1].SubscribeIndex = dbConnInfo->SubscribeIndex;
 90                 channelMapDbConnInfoArray[i].DbConnInfo[subscribeIndex - 1].Conned = 1;
 91                 rest = enumSuccess;
 92                 break;
 93             }
 94         }
 95
 96         return rest;
 97     }
 98     bool IsConning(SDbConnInfo *dbConnInfo) { return true; };
 99
100 public:
101     FMDBClient()
102     {
103         while(!fmdbConfigLoadFinish) { Sleep(200); }
104     };
105     ~FMDBClient() {};
106
107 public:
108     int failTimes = 0;
109
110     template<typename T>
111     int Publish(const char *channel, T* data)
112     {
113         int rest = enumFail;
114
115         // 查找
116         if(GetDbConnInfo(channel, sizeof(T)) == enumFail)
117         {
118             printf_s("发布%s失败.\n", channel);
119             return enumFail;
120         }
121
122         for(int i = 0; i < FM_MAX_SUBSCRIBE; i++)
123         {
124             if(channelMapDbConnInfo.DbConnInfo[i].FileHandle == NULL) continue;
125
126             while(IsConning(&channelMapDbConnInfo.DbConnInfo[i]))
127             {
128                 SIndex sIndex = { 0 };
129                 if(ReadIndex(&sIndex, &channelMapDbConnInfo.DbConnInfo[i]) == enumFail)
130                 {
131                     throw "映射文件加载失败";
132                 }
133
134                 if(CanWrite(&sIndex))
135                 {
136                     WORD writeIndex = sIndex.WriteIndex;
137                     if(Write(data, writeIndex, &channelMapDbConnInfo.DbConnInfo[i]) == enumSuccess)
138                     {
139                         writeIndex++;
140                         if(writeIndex > FM_MAX_ROWS)
141                         {
142                             writeIndex = 0;
143
144                             WORD Over = TRUE;
145                             WriteIndex(&Over, (FM_WORD_SIZE * 2), FM_WORD_SIZE, &channelMapDbConnInfo.DbConnInfo[i]);
146                         }
147
148                         rest = WriteIndex(&writeIndex, 0, FM_WORD_SIZE, &channelMapDbConnInfo.DbConnInfo[i]);
149                         break;
150                     }
151                 }
152                 else
153                 {
154                     failTimes++;
155                 }
156             }
157         }
158
159         return rest;
160     }
161
162     template<typename T>
163     void Subscribe(const char *channel, SubscribeCallBackHandle callback)
164     {
165         SDbConnInfo dbConnInfo = { 0 };
166         dbConnInfo.StructSize = sizeof(T);
167
168         UINT subscribeIndex = 0;
169         if(SetDbConnInfo(channel, &subscribeIndex, &dbConnInfo) == enumFail)
170         {
171             printf_s("订阅%s失败.\n", channel);
172             return;
173         }
174
175         while(IsConning(&dbConnInfo))
176         {
177             SetSubscribeConned(channel, subscribeIndex, &dbConnInfo);
178
179             SIndex sIndex = { 0 };
180             if(ReadIndex(&sIndex, &dbConnInfo) == enumFail) throw "映射文件加载失败";
181             if(!CanRead(&sIndex)) continue;
182
183             T t = { 0 };
184             int readIndex = sIndex.ReadIndex;
185             if(Read(&t, readIndex, &dbConnInfo) == enumSuccess)
186             {
187                 readIndex++;
188                 if(readIndex > FM_MAX_ROWS)
189                 {
190                     readIndex = 0;
191
192                     WORD Over = FALSE;
193                     WriteIndex(&Over, (FM_WORD_SIZE * 2), FM_WORD_SIZE, &dbConnInfo);
194                 }
195
196                 if(WriteIndex(&readIndex, FM_WORD_SIZE, FM_WORD_SIZE, &dbConnInfo) == enumSuccess)
197                     if(Delete(sIndex.ReadIndex, &dbConnInfo) == enumSuccess)
198                         if(callback(&t) == enumBreak) break;
199             }
200         }
201     }
202 };

请注意上边控制读写的2个方法

	bool CanWrite(SIndex *sIndex)
	{
		int nextWriteIndex = sIndex->WriteIndex + 1;
		if(nextWriteIndex > FM_MAX_ROWS) nextWriteIndex = 0;

		return nextWriteIndex != sIndex->ReadIndex;
	}
	bool CanRead(SIndex *sIndex)
	{
		if(sIndex->Over) return sIndex->ReadIndex > sIndex->WriteIndex;
		else return sIndex->ReadIndex + 1 <= sIndex->WriteIndex;
	}

我们可以分析一下,下一个WriteIndex值如果大于队列最大值 WriteIndex置0,下一个WriteIndex数值如果不等于

正在读的位置ReadIndex就能写;如果WriteIndex没有超出最大值,只要ReadIndex小于等于WriteIndex就能读,

如果超出,就判断ReadIndex大于WriteIndex就能读。WriteIndex与ReadIndex数值在Publish与Subscribe中维护

5.3、建立新线程获取最新订阅的客户端信息,这个功能主要是动态地像多个Subscribe端发生消息,比如订阅发生在发布之后,

也应该能收到消息。

 1 void Update()
 2 {
 3     while(true)
 4     {
 5         if(fmdbConfig->GetChannelArray() == enumSuccess)
 6         {
 7             for(int i = 0; i < FM_MAX_CHANNEL; i++)
 8             {
 9                 if(fmdbConfig->IsFM_NOTHING(channelMapDbConnInfoArray[i].ChannelName)) continue;
10
11                 for(int j = 0; j < FM_MAX_SUBSCRIBE; j++)
12                 {
13                     if(channelMapDbConnInfoArray[i].DbConnInfo[j].StructSize <= 0) continue;
14
15                     // KeepConned
16                     if(channelMapDbConnInfoArray[i].DbConnInfo[j].Conned)
17                     {
18                         fmdbConfig->KeepConned(channelMapDbConnInfoArray[i].ChannelName,
19                             channelMapDbConnInfoArray[i].DbConnInfo[j].SubscribeIndex);
20
21                         channelMapDbConnInfoArray[i].DbConnInfo[j].Conned = 0;
22                         //printf_s("%s.KeepConned.\n", channelDbParsArray[i].SDbPars[j].Channel);
23                     }
24
25                     if(!fmdbConfig->Exists(channelMapDbConnInfoArray[i].DbConnInfo[j].FileName))
26                     {
27                         FMDBManager::Create(&channelMapDbConnInfoArray[i].DbConnInfo[j]);
28                         fmdbConfig->AddChannel(channelMapDbConnInfoArray[i].DbConnInfo[j].FileName);
29                     }
30                 }
31             }
32         }
33
34         fmdbConfigLoadFinish = true;
35         Sleep(1000);
36     }
37 }
38 thread th(Update);

六、Demo测试

6.1、Producer.cpp

 1 #include "pch.h"
 2 #include "../FMDB.h"
 3
 4 using namespace std;
 5
 6 int main()
 7 {
 8     FMClient * client = new FMClient();
 9
10     int times = 0;
11     int index = 0;
12     int total = 0;
13     UINT structSize = sizeof(SPerson);
14     DWORD dwStartTmp = GetTickCount();
15
16     while(TRUE)
17     {
18         times++;
19         if(index == 0)
20         {
21             dwStartTmp = GetTickCount();
22         }
23
24         SPerson sPerson = { 0 };
25         sPerson.Idx = index;
26         sprintf_s(sPerson.Name, "Name.%d", index);
27         sPerson.Age = index;
28
29         if(client->Publish("Person", &sPerson) == enumSuccess)
30         {
31             if(index % 2 == 0) total = total + sPerson.Idx;
32             else total = total - sPerson.Idx;
33
34             index++;
35             if(index % 50000 == 0)
36                 printf_s("发送条数: %d, 耗时:%d \n", index, (GetTickCount() - dwStartTmp));
37         }
38
39         if(index >= 2000000) break;
40     }
41
42     printf_s("调用次数: %d, 成功条数: %d, 检验值: %d \n", times, index, total);
43     system("pause");
44 }

6.2、Consumer.cpp

 1 #include "pch.h"
 2 #include "../FMDB.h"
 3
 4 using namespace std;
 5
 6 int index = 0;
 7 int total = 0;
 8 DWORD dwStartTmp = GetTickCount();
 9
10 int SubscribeCallback(void *msg)
11 {
12     SPerson * person = (SPerson *)msg;
13
14     if(index == 0)
15     {
16         dwStartTmp = GetTickCount();
17     }
18
19     if(index % 2 == 0) total = total + person->Idx;
20     else total = total - person->Idx;
21
22     index++;
23     if(index % 50000 == 0)
24     {
25         printf("接收条数: %d, 耗时:%d, Idx:%d, Name:%s, Age:%d\n",
26             index, (GetTickCount() - dwStartTmp), person->Idx, person->Name, person->Age);
27     }
28
29     if(index >= 2000000)
30     {
31         return enumBreak;
32     }
33
34     return enumSuccess;
35 };
36
37 int main()
38 {
39     FMClient * client = new FMClient();
40     client->Subscribe<SPerson>("Person", SubscribeCallback);
41
42     printf("接收条数: %d, 检验值: %d \n", index, total);
43     system("pause");
44 }

6.3、运行,测试用例中使用了向队列发送200万条数据,消息大小128字节,订阅端也是接受到200万数据后退出,并且打印检验值。

1) 检验值计算:0+1-2+3-4+ ---------  - 2000000 = -1000000,如果队列运行正常,那两边的检验值应该都是是 -1000000.

2) 每5万条打印一次日志,运行情况如下

一对一方式运行三次,分别耗时(毫秒):2886、2979、2871

3) 一对二方式运行三次,分别耗时(毫秒):4087、4009、4040

4)运行过程中产生的文件

6.4、200万数据一对一耗时近3秒,貌似也不是非常快是不是?但是这就是最大速度了吗?

当然不是哦,别忘了这是debug版本,我们切换到release版本看速度会不会有所提升。

一对一运行三次耗时分别是:1224、1373、1326

厉害了,

SPerson结构体128字节,每秒可以处理180万数据,当然实际运用肯定达不到,因为处理其他业务逻辑也要耗时间。

好了,为了这个demo脑壳都想疼了,思考模型,调试BUG,期间各种问题,实在茶壶煮饺子,有苦说不出。

你看,又浪费我周末2天时间,期间就吃了一餐,今天的还没吃呢,等下去旁边山上走走,不然就要发霉了。拜拜。。。

原文地址:https://www.cnblogs.com/lanxiaoke/p/10228355.html

时间: 2024-09-30 06:53:57

使用内存映射开发高性能进程间消息通信组件的相关文章

在Windows系统上实现轻量级的线程间及进程间消息队列

看IaaS 资料时,捎带研究下硬件虚拟化,主要参考<基于intel VT-x 的Xen 全虚拟化实现>,<intel 开发手册 第三卷 19/20章> Intel VT 是intel X86架构的CPU硬件虚拟化技术,新增两种模式: VM root: 即虚拟机管理系统运行模式: VM non root:即虚拟机运行模式: 如下图: VMXON.VMXOFF用以实现打开或关闭虚拟化功能: VM Exit和VM Entry 用以实现non root和root之间的切换:这种转换被VMC

Erlang进程间消息接收超时设定

Erlang消息接收函数,一般都会设计成尾递归调用自己的模式.但是这样的模式,如果没有消息则会无限的等待下去,所以为了不无限等待,这里可以加个超时设定,例如: flush() -> receive _ -> flush() after 1000 -> ok end. 有个特殊情况是,当超时时间设定为0时,程序不是立马退出,而是先将message box中的消息匹配完后,再返回. 更多进程消息信息请戳这里 Erlang进程间消息接收超时设定,布布扣,bubuko.com

python全栈开发day32-进程创建,进程同步,进程间的通信,进程池

一.内容总结 1.进程创建 1) Process:两种创建一个新进程的方法: 1.实例化Process,通过args=(,)元组形式传参,2创建类继承Process,类初始化的时候传参数 2) p.join(),阻塞主进程,执行完p进程后,释放 3)  守护进程 ,守护主程序代码执行完毕,p.daemon = True import time from multiprocessing import Process def func(): while True: print('is alive')

python全栈开发day33-进程间的通信、进程间的数据共享,进程池

一.昨日内容回顾: 1.  守护进程 1).p.saemon, 2 ).p.terminate 3 ).p.join 2.  同步控制 1).锁,Lock 互斥锁,解决数据安全.进程之间资源抢占问题. 2).信号量,Semaphore 锁+计数器 3).事件,Event 通过一个标志位flag来控制进程的阻塞和执行. 3.  多进程实现tcp协议的socket的sever端 1)子进程中不能使用input 2)允许端口的重用设置 3)妥善处理sk的close确保操作系统的资源能够被及时回收. i

Linux进程间的通信

一.管道 管道是Linux支持的最初Unix IPC形式之一,具有以下特点: A. 管道是半双工的,数据只能向一个方向流动: B. 需要双工通信时,需要建立起两个管道: C. 只能用于父子进程或者兄弟进程之间(具有亲缘关系的进程): D. 单独构成一种独立的文件系统:管道对于管道两端的进程而言,就是一个文件,但它不是普通的文件,它不属于某种文件系统,而是自立门户,单独构成一种文件系统,并且只存在与内存中. 匿名管道的创建:该函数创建的管道的两端处于一个进程中间,在实际应用中没有太大意义;因此,一

Linux进程间的通信方法

linux进程间的通信方法总结如下 通过fork函数把打开文件的描述符传递给子进程 通过wait得到子进程的终结信息 通过加锁的方式,实现几个进行共享读写某个文件 进行间通过信号通信,SIGUSR1和SIGUSR2实现用户定义功能 利用pipe进行通信 FIFO文件进行通信 mmap,几个进程映射到同一内存区 SYS IPC 消息队列,信号量(很少用) UNIX Domain Socket,常用

linux 进程间的通信

现在linux使用的进程间通信方式:(1)管道(pipe)和有名管道(FIFO)(2)信号(signal)(3)消息队列(4)共享内存(5)信号量(6)套接字(socket) 为何进行进程间的通信:A.数据传输:一个进程需要将它的数据发送给另一个进程,发送的数据量在一个字节到几M字节之间B.共享数据:多个进程想要操作共享数据,一个进程对共享数据的修改,别的进程应该立刻看到.C.通知事件:一个进程需要向另一个或一组进程发送消息,通知它(它们)发生了某种事件(如进程终止时要通知父进程).D.资源共享

Linux/UNIX进程间的通信(1)

进程间的通信(1) 进程间的通信IPC(InterProcessCommunication )主要有以下不同形式: 半双工管道和FIFO:全双工管道和命名全双工管道:消息队列,信号量和共享存储:套接字和STREAMS 管道 pipe函数 当从一个进程连接到另一个进程时,我们使用术语管道.我们通常是把一个进程的输出通过管道连接到另一个进程的输入. 管道是由调用pipe函数创建的: #include<unistd.h> int pipe(intpipefd[2]); 经由参数pipefd返回两个文

Nginx学习——Nginx进程间的通信

nginx进程间的通信 进程间消息传递 共享内存 共享内存还是Linux下提供的最主要的进程间通信方式,它通过mmap和shmget系统调用在内存中创建了一块连续的线性地址空间,而通过munmap或者shmdt系统调用可以释放这块内存.使用共享内存的优点是当多个进程使用同一块共享内存时,在不论什么一个进程改动了共享内存中的内容后,其它进程通过訪问这段共享内存都可以得到改动后的内容. Nginx定义了ngx_shm_t结构体.用于描写叙述一块共享内存, typedef struct{ //指向共享