基于无锁队列和c++11的高性能线程池

基于无锁队列和c++11的高性能线程池
线程使用c++11库
和线程池之间的消息通讯使用一个简单的无锁消息队列
适用于linux平台,gcc 4.6以上

标签: <无>

代码片段(6)[全屏查看所有代码]

1. [代码]lckfree.h

?


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

// lckfree.h

// Implementation of lock free queue using CAS operations

// for simple multi-threading use cases like:

// 1. multiple worker to process incoming messages

// 2. async processing using a thread pool

// 3. simple tcp server deal with async requests

// Author: [email protected]

// Refrence: http://coolshell.cn/articles/8239.html

#ifndef __LCKFREE_H__

#define __LCKFREE_H__

#include <string>

using namespace std;

namespace bfd {

struct LinkNode {

  string data;

  LinkNode* next;

};

typedef struct LinkNode LinkNode;

class LckFreeQueue {

 public:

  LckFreeQueue();

  ~LckFreeQueue();

  int push(const string &msg);

  string pop();  // non-block pop method

//  string bpop(); // block pop method

  bool empty();

 private:

  LinkNode * head_;

  LinkNode * tail_;

  bool empty_;

  unsigned int length_;

};

} // namespace bfd

#endif

2. [代码]lckfree.cpp

?


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

#include <lckfree.h>

namespace bfd {

LckFreeQueue::LckFreeQueue(): head_(NULL), tail_(NULL), empty_(true), length_(0) {

  head_ = new LinkNode;

  head_->next = NULL;

  tail_ = head_;

}

LckFreeQueue::~LckFreeQueue() {

  LinkNode *p = head_;

  if (p) {

    LinkNode *q = p->next;

    delete p;

    p = q;

  }

}

int LckFreeQueue::push(const string &msg) {

  LinkNode * q = new LinkNode;

  q->data = msg;

  q->next = NULL;

  LinkNode * p = tail_;

  LinkNode * oldp = p;

  do {

    while (p->next != NULL)

        p = p->next;

  } while( __sync_bool_compare_and_swap(&(p->next), NULL, q) != true); //如果没有把结点链在尾上,再试

  __sync_bool_compare_and_swap(&tail_, oldp, q); //置尾结点

  return 0;

}

string LckFreeQueue::pop() {

  LinkNode * p;

  do{

    p = head_;

    if (p->next == NULL){

      return "";

    }

  } while( __sync_bool_compare_and_swap(&head_, p, p->next) != true );

  return p->next->data;

}

bool LckFreeQueue::empty() {

  return empty_;

}

}

3. [代码]workthreadpool.h

?


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

// workthreadpool.h

// 一个用于将消息分发给多个进程,并使用多个进程处理的库,工作进程并不返回数据

#ifndef __WORK_THREAD_POOL__

#define __WORK_THREAD_POOL__

#include <stdio.h>

#include <thread>

#include <queue>

#include <string>

#include <vector>

#include "lckfree.h"

using namespace std;

namespace bfd {

class WorkThreadPool {

 public:

  WorkThreadPool(int size);

  virtual ~WorkThreadPool();

  // 需要子类继承并实现的函数,每个线程实际执行的内容

  virtual void Init() {};

  virtual void Finish() {};

  virtual void Handle(const string &msg)=0;

  // 将消息放入处理队列, 消息只支持string类型

  int SendMessage(const string &msg);

  int Start();

  int Stop();

 private:

  void Worker();

  int size_;

  LckFreeQueue msg_queue_; // 线程池的协作基于这个无锁队列

  vector<thread> thread_pool_;

};

} // namespace

#endif

4. [代码]workthreadpool.cpp

?


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

#include "workthreadpool.h"

#include <sstream>

#include <unistd.h>

namespace bfd {

WorkThreadPool::WorkThreadPool(int size) {

  if (size <= 0) { // 最小也需要有1个线程

    size_ = 1;

  } else {

    size_ = size;

  }

}

WorkThreadPool::~WorkThreadPool() {

}

int WorkThreadPool::SendMessage(const string &msg) {

  msg_queue_.push(msg);

  return 0;

}

void WorkThreadPool::Worker() {

  unsigned int msg_count = 0;

  while (1) {

    string msg = msg_queue_.pop();

    if (msg.empty()) {

      printf("no msg got, sleep for 0.1 sec\n");

      usleep(100000); // 0.1 sec

      continue;

    }

    if (msg == "__exit__") {

      stringstream ss;

      ss << "exit worker: " << std::this_thread::get_id() << ", processed: " << msg_count << "..";

      printf("%s\n", ss.str().c_str());

      return;

    }

    Handle(msg);

    msg_count++;

    if (msg_count % 1000 == 0) {

      printf("every 1000 msg count\n");

    }

  }

}

int WorkThreadPool::Start() {

  for (int i=0; i < size_; i++) {

    thread_pool_.push_back( thread(&WorkThreadPool::Worker, this) );

  }

  return 0;

}

int WorkThreadPool::Stop() {

  for (int i=0; i < size_; i++) {

    SendMessage("__exit__");

  }

  for (int i=0; i < size_; i++) {

    thread_pool_[i].join();

  }

  return 0;

}

}

5. [代码]main.cpp

?


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

#include "workthreadpool.h"

#include <sstream>

#include <math.h>

class MyThreadPool : public bfd::WorkThreadPool {

 public:

  MyThreadPool(int size) : bfd::WorkThreadPool(size) {

  }

  void Handle(const string &msg) {

    stringstream ss;

    ss << "worker (" << std::this_thread::get_id() << ") got msg: " << msg;

    printf("%s\n", ss.str().c_str());

    for (int i=0; i<=999999; i++) {

      double result = sqrt(sqrt(i) / 93.234);

    }

  }

};

int main() {

  printf("start running ....\n");

  MyThreadPool pool(5);

  pool.Start();

  for (int i=0; i<100; i++) {

    pool.SendMessage("msg info ----------");

  }

  pool.Stop();

  return 0;

}

6. [代码]Makefile

?


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

LIB_SRC_FILES = src/workthreadpool.cpp src/lckfree.cpp

TEST_SRC_FILES = src/main.cpp

INCLUDE_DIR = src

STD_FLAG = -std=c++0x

all: main.o libs

    g++ $(STD_FLAG) -o test_workthreadpool main.o libworkthreadpool.so -lpthread

    

main.o: $(TEST_SRC_FILES)

    g++ $(STD_FLAG) -c $(TEST_SRC_FILES) -I$(INCLUDE_DIR)

libs: $(LIB_SRC_FILES)

    g++ $(STD_FLAG) -o libworkthreadpool.so -fPIC -O2 -shared -Wl,--no-as-needed -Isrc $(LIB_SRC_FILES) -lpthread

.PHONY : clean

clean :

    rm -f test_workthreadpool main.o libworkthreadpool.so

举报

原文地址:https://www.cnblogs.com/lidabo/p/9767068.html

时间: 2024-11-04 08:13:24

基于无锁队列和c++11的高性能线程池的相关文章

基于循环数组的无锁队列

在之前的两篇博客(线程安全的无锁RingBuffer的实现,多个写线程一个读线程的无锁队列实现)中,分别写了在只有一个读线程.一个写线程的情况下,以及只有一个写线程.两个读线程的情况下,不采用加锁技术,甚至原子运算的循环队列的实现.但是,在其他的情况下,我们也需要尽可能高效的线程安全的队列的实现.本文实现了一种基于循环数组和原子运算的无锁队列.采用原子运算(compare and swap)而不是加锁同步,可以很大的提高运行效率.之所以用循环数组,是因为这样在使用过程中不需要反复开辟内存空间,可

【DPDK】【ring】从DPDK的ring来看x86无锁队列的实现

[前言] 队列是众多数据结构中最常见的一种之一.曾经有人和我说过这么一句话,叫做“程序等于数据结构+算法”.因此在设计模块.写代码时,队列常常作为一个很常见的结构出现在模块设计中.DPDK不仅是一个加速网络IO的框架,其内部还提供众多的功能组件,rte_ring就是DPDK内部提供的一种无锁队列,本篇文章将从使用的角度出发阐述DPDK的ring怎么用?在怎么用的角度上再来阐述ring无锁的实现,最后将探讨实现无锁队列的关键以及在不通平台上如何实现,本文将会探讨x86平台下无锁队列的实现. 权当抛

多个写线程一个读线程的无锁队列实现

在之前的一篇博客中,写了一个在特殊情况下,也就是只有一个读线程和一个写线程的情况下,的无锁队列的实现.其中甚至都没有利用特殊的原子加减操作,只是普通的运算.这样做的原因是,即使是特殊的原子加减操作,也比普通的加减运算复杂度高很多.因此文中的实现方法可以达到很高的运行效率. 但是,有的情况下并不是只有一个读线程和一个写线程.越是一般化的实现,支持的情况越多,但是往往损失的性能也越多.作者看到过一个实现(http://www.oschina.net/code/snippet_732357_13465

DIOCP开源项目-高效稳定的服务端解决方案(DIOCP + 无锁队列 + ZeroMQ + QWorkers) 出炉了

[概述] 自从上次发布了[DIOCP开源项目-利用队列+0MQ+多进程逻辑处理,搭建稳定,高效,分布式的服务端]文章后,得到了很多朋友的支持和肯定.这加大了我的开发动力,经过几个晚上的熬夜,终于在昨天晚上,DEMO基本成型,今天再加入了QWorkers来做逻辑处理进程,进一步使得逻辑处理进程更加方便和高效.今天特意写篇blog来记录我的心得与大家分享. [功能实现说明] 沿用上次的草图 目前DEMO图上的功能都已经实现.下面谈谈各部分的实现. 通信服务, 由DIOCP实现,担当与客户端的通信工作

C++ boost库无锁队列多线程并行测试与编译方法

阅读了网络中关于Boost库无锁队列的源代码,但却缺少编译方法.经过测试,确定了ubuntu 14.04中编译boost库的方法,特做记录. 无锁(free-lock)是实现高性能多线程并发编程的重要技术. 作为C++11 STL参考实现的boost库,不仅支持11标准,而且做了许多扩展,掌握其使用方法,对于提高代码质量,尤其重要. 以其多线程并行无锁队列为例,结合代码和说明,演示了无锁boost库的使用和编译方法. 代码及说明如下: //source: boost_queue.cpp //目的

boost 无锁队列

一哥们翻译的boost的无锁队列的官方文档 原文地址:http://blog.csdn.net/great3779/article/details/8765103 Boost_1_53_0终于迎来了久违的Boost.Lockfree模块,本着学习的心态,将其翻译如下.(原文地址:http://www.boost.org/doc/libs/1_53_0/doc/html/lockfree.html) Chapter 17. Boost.Lockfree 第17章.Boost.Lockfree Ta

转载:无锁队列的实现(CAS同步)

转自:http://coolshell.cn/articles/8239.html 关于无锁队列的实现,网上有很多文章,虽然本文可能和那些文章有所重复,但是我还是想以我自己的方式把这些文章中的重要的知识点串起来和大家讲一讲这个技术.下面开始正文. 关于CAS等原子操作 在开始说无锁队列之前,我们需要知道一个很重要的技术就是CAS操作——Compare & Set,或是 Compare & Swap,现在几乎所有的CPU指令都支持CAS的原子操作,X86下对应的是 CMPXCHG 汇编指令.

并发无锁队列

1.前言 队列在计算机中非常重要的一种数据结构,尤其在操作系统中.队列典型的特征是先进先出(FIFO),符合流水线业务流程.在进程间通信.网络通信之间经常采用队列做缓存,缓解数据处理压力.结合自己在工作中遇到的队列问题,总结一下对不同场景下的队列实现.根据操作队列的场景分为:单生产者--单消费者.多生产者--单消费者.单生产者--多消费者.多生产者--多消费者四大模型.其实后面三种的队列,可以归纳为一种多对多.根据队列中数据分为:队列中的数据是定长的.队列中的数据是变长的. 2.队列操作模型 (

锁、CAS操作和无锁队列的实现

https://blog.csdn.net/yishizuofei/article/details/78353722 锁的机制 锁和人很像,有的人乐观,总会想到好的一方面,所以只要越努力,就会越幸运:有的人悲观,总会想到不好的一方面,患得患失,所以经常会做不好事.我一直把前一个当作为我前进的动力和方向,快乐充实的过好每一天. 常用的锁机制也有两种: 1.乐观锁:假设不会发生并发冲突,每次不加锁而去完成某项操作,只在提交操作时,检查是否违反数据完整性.如果因为冲突失败就继续重试,直到成功为止.而乐