无锁编程实战演练

前段时间研究过一阵子无锁化编程。刚写了几个简单的程序,来验证了下自己学到的一些概念。

测试场景:假设有一个应用:现在有一个全局变量,用来计数,再创建10个线程并发执行,每个线程中循环对这个全局变量进行++操作(i++),循环加2000000次。

所以很容易知道,这必然会涉及到并发互斥操作。下面通过三种方式来实现这种并发操作。并对比出其在效率上的不同之处。

这里先贴上代码,共5个文件:2个用于做时间统计的文件:timer.h  timer.cpp。这两个文件是临时封装的,只用来计时,可以不必细看。

timer.h

#ifndef TIMER_H
#define TIMER_H

#include <sys/time.h>
class Timer
{
public:
	Timer();
	// 开始计时时间
	void Start();
	// 终止计时时间
	void Stop();
	// 重新设定
	void Reset();
	// 耗时时间
	void Cost_time();
private:
	struct timeval t1;
	struct timeval t2;
	bool b1,b2;
};
#endif

timer.cpp

#include "timer.h"
#include <stdio.h>

Timer::Timer()
{
	b1 = false;
	b2 = false;
}
void Timer::Start()
{
	gettimeofday(&t1,NULL);
	b1 = true;
	b2 = false;
}

void Timer::Stop()
{
	if (b1 == true)
	{
		gettimeofday(&t2,NULL);
		b2 = true;
	}
}

void Timer::Reset()
{
	b1 = false;
	b2 = false;
}

void Timer::Cost_time()
{
	if (b1 == false)
	{
		printf("计时出错,应该先执行Start(),然后执行Stop(),再来执行Cost_time()");
		return ;
	}
	else if (b2 == false)
	{
		printf("计时出错,应该执行完Stop(),再来执行Cost_time()");
		return ;
	}
	else
	{
		int usec,sec;
		bool borrow = false;
		if (t2.tv_usec > t1.tv_usec)
		{
			usec = t2.tv_usec - t1.tv_usec;
		}
		else
		{
			borrow = true;
			usec = t2.tv_usec+1000000 - t1.tv_usec;
		}

		if (borrow)
		{
			sec = t2.tv_sec-1 - t1.tv_sec;
		}
		else
		{
			sec = t2.tv_sec - t1.tv_sec;
		}
		printf("花费时间:%d秒 %d微秒\n",sec,usec);
	}
}

传统互斥量加锁方式 lock.cpp

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <time.h>
#include "timer.h"

pthread_mutex_t mutex_lock;
static volatile int count = 0;
void *test_func(void *arg)
{
        int i = 0;
        for(i = 0; i < 2000000; i++)
		{
                pthread_mutex_lock(&mutex_lock);
                count++;
                pthread_mutex_unlock(&mutex_lock);
        }
        return NULL;
}

int main(int argc, const char *argv[])
{
	Timer timer; // 为了计时,临时封装的一个类Timer。
	timer.Start();	// 计时开始
	pthread_mutex_init(&mutex_lock, NULL);
	pthread_t thread_ids[10];
	int i = 0;
	for(i = 0; i < sizeof(thread_ids)/sizeof(pthread_t); i++)
	{
		pthread_create(&thread_ids[i], NULL, test_func, NULL);
	}

	for(i = 0; i < sizeof(thread_ids)/sizeof(pthread_t); i++)
	{
		pthread_join(thread_ids[i], NULL);
	}

	timer.Stop();// 计时结束
	timer.Cost_time();// 打印花费时间
	printf("结果:count = %d\n",count);

	return 0;
}

no lock 不加锁的形式 nolock.cpp

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>
#include <time.h>
#include "timer.h"

int mutex = 0;
int lock = 0;
int unlock = 1;

static volatile int count = 0;
void *test_func(void *arg)
{
        int i = 0;
        for(i = 0; i < 2000000; i++)
	{
		while (!(__sync_bool_compare_and_swap (&mutex,lock, 1) ))usleep(100000);
		 count++;
		 __sync_bool_compare_and_swap (&mutex, unlock, 0);
        }
        return NULL;
}

int main(int argc, const char *argv[])
{
	Timer timer;
	timer.Start();
	pthread_t thread_ids[10];
	int i = 0;

	for(i = 0; i < sizeof(thread_ids)/sizeof(pthread_t); i++)
	{
			pthread_create(&thread_ids[i], NULL, test_func, NULL);
	}

	for(i = 0; i < sizeof(thread_ids)/sizeof(pthread_t); i++)
	{
			pthread_join(thread_ids[i], NULL);
	}

	timer.Stop();
	timer.Cost_time();
	printf("结果:count = %d\n",count);

	return 0;
}

原子函数进行统计方式 atomic.cpp

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>
#include <time.h>
#include "timer.h"

static volatile int count = 0;
void *test_func(void *arg)
{
        int i = 0;
        for(i = 0; i < 2000000; i++)
        {
	        __sync_fetch_and_add(&count, 1);
        }
        return NULL;
}

int main(int argc, const char *argv[])
{
	Timer timer;
	timer.Start();
	pthread_t thread_ids[10];
	int i = 0;

	for(i = 0; i < sizeof(thread_ids)/sizeof(pthread_t); i++){
			pthread_create(&thread_ids[i], NULL, test_func, NULL);
	}

	for(i = 0; i < sizeof(thread_ids)/sizeof(pthread_t); i++){
			pthread_join(thread_ids[i], NULL);
	}

	timer.Stop();
	timer.Cost_time();
	printf("结果:count = %d\n",count);
    return 0;
}

#################################################################3

好,代码粘贴完毕。下面进入测试环节:

编译:

[[email protected] test3]$ g++ lock.cpp ./timer.cpp  -lpthread -o lock ;

[[email protected] test3]$ g++ nolock.cpp ./timer.cpp  -lpthread -o nolock ;

[[email protected] test3]$  g++ atomic.cpp ./timer.cpp  -lpthread -o atomic ;

每一个线程循环加2000000次。

第一组测验

[[email protected] test3]$ ./lock

花费时间:3秒 109807微秒

结果:count = 20000000

[[email protected] test3]$ ./nolock

花费时间:7秒 595784微秒

结果:count = 20000000

[[email protected] test3]$ ./atomic

花费时间:0秒 381022微秒

结果:count = 20000000

结论:

可以看出,原子操作函数的速度是最快的,其他两种方式根本就没法比。而无锁操作是在原子操作函数的基础上形成的。

为什么无锁操作的效率会这么低?如果效率低的话,那还有什么意义,为什么现在大家都提倡无锁编程呢?为什么?咱先不

解释,先用数据说话。

第二组测验:

原子操作代码不变,加锁操作代码不变。改动一下无锁操作的代码。

将如下代码更改

while (!(__sync_bool_compare_and_swap (&mutex,lock, 1) ));

更改后:while (!(__sync_bool_compare_and_swap (&mutex,lock, 1) )) usleep(1);

让他睡一微秒。

为什么要这样改代码?这样启不是会更慢?你的猜测是不无道理的,但是一个不休息的人干的活未必比有休息的人干的活多。

[[email protected] test3]$ ./lock

花费时间:2秒 970773微秒

结果:count = 20000000

[[email protected] test3]$ ./nolock

花费时间:0秒 685404微秒

结果:count = 20000000

[[email protected] test3]$ ./atomic

花费时间:0秒 380675微秒

结果:count = 20000000

结论:

不用明说,大家看到的结果是不是很诧异?是不是!有木有!怎么会是这样。无锁加上usleep(1),睡一会,反而会变得这么快。

虽和原子操作相比次了一点,但已经甩开有锁同步好几条街了,无锁比有锁快是应该的,但为什么睡一会会更快,不睡就比有锁

还慢那么多呢?怎么回事。是不是这个测试的时候cpu出现了不稳定的事情。

那好,那再测试几次。

[[email protected] test3]$ ./nolock

花费时间:0秒 684938微秒

结果:count = 20000000

[[email protected] test3]$ ./nolock

花费时间:0秒 686039微秒

结果:count = 20000000

[[email protected] test3]$ ./nolock

花费时间:0秒 685928微秒

结果:count = 20000000

现在总没话可说了,这是事实!但为什么,我也不会解释。

很好奇,为什么越休息,效率越高。电脑是机器,它可不是人。怎么会这样?

那我就让它多休息一会:

while (!(__sync_bool_compare_and_swap (&mutex,lock, 1) ))usleep(10); //之前是1,现在改成10了。

下面就再单独对比一个nolock无锁方式。

[[email protected] test3]$ ./nolock //usleep(1);

花费时间:0秒 686039微秒

结果:count = 20000000

[[email protected] test3]$ ./nolock //usleep(10);

花费时间:0秒 680307微秒

结果:count = 20000000

nolock,结果usleep(10)居然比uleep(1)还要快一点。

那么这样呢:

while (!(__sync_bool_compare_and_swap (&mutex,lock, 1) ))usleep(100); //之前是10,现在改成100了。

[[email protected] test3]$ ./nolock //usleep(100)

花费时间:0秒 661935微秒

结果:count = 20000000

还是睡的越久,效率越高。

那我再试一下usleep(1000)

while (!(__sync_bool_compare_and_swap (&mutex,lock, 1) ))usleep(1000); //之前是100,现在改成1000了。

[[email protected] test3]$ ./nolock // usleep(1000);

花费时间:0秒 652411微秒

结果:count = 20000000

还是睡的越久,效率越高。

那我再试一下usleep(10000)

while (!(__sync_bool_compare_and_swap (&mutex,lock, 1) ))usleep(10000); //之前是1000,现在改成10000了。

[[email protected] test3]$ ./nolock

花费时间:0秒 626267微秒

结果:count = 20000000

还是睡的越久,效率越高。

那我再试一下usleep(100000)

while (!(__sync_bool_compare_and_swap (&mutex,lock, 1) ))usleep(100000); //之前是10000,现在改成100000了,也就是0.1秒。

[[email protected] test3]$ ./nolock

花费时间:0秒 942445微秒

结果:count = 20000000

哦,现在开始速度慢了。

执行环境:

gcc版本信息:

[[email protected] test3]$ g++ -v

Using built-in specs.

Target: x86_64-redhat-linux

gcc version 4.4.5 20110214 (Red Hat 4.4.5-6) (GCC)

cpu信息:

[[email protected] test3]$ cat /proc/cpuinfo | grep name | cut -f2 -d: | uniq -c

4  Intel(R) Core(TM) i5-3470 CPU @ 3.20GHz

通过编程测试及测试得出结论:

1、如果是想用全局变量来做统计操作。而又不得不考虑多线程间的互斥访问的话,最好使用编译器支持的原子操作函数。

再满足互斥访问的前提下,编程最简单,效率最高。

2、lock-free,无锁编程方式确实能够比传统加锁方式效率高,经上面测试可以发现,可以快到5倍左右。所以在高并发程序中

采用无锁编程的方式可以进一步提高程序效率。

3、但是,得对无锁方式有足够熟悉的了解,不然效率反而会更低。而且容易出错。

4、没想明白的疑问:为什么上面的循环检测时,加uleep比不加,效率更高。为什么在一定程度上,usleep越久效率越高?

请高手路过的时候,为小弟解答一下。谢谢。

无锁编程实战演练

时间: 2024-08-04 21:38:19

无锁编程实战演练的相关文章

无锁编程

无锁编程,即不使用锁的情况下实现多线程之间的变量同步,也就是在没有线程被阻塞的情况下实现变量的同步,所以也叫非阻塞同步(Non-blocking Synchronization). 实现非阻塞同步的方案称为"无锁编程算法"( Non-blocking algorithm). 多线程编程条件下,多个线程需要对同一共享变量写操作时,一般使用互斥锁来解决竞争问题,如下: 1 extern int g_var; 2 3 mutex_lock; 4 g_var ++; 5 mutex_unloc

无锁编程:lock-free原理;CAS;ABA问题

转自:http://blog.csdn.net/kangroger/article/details/47867269 定义 无锁编程是指在不使用锁的情况下,在多线程环境下实现多变量的同步.即在没有线程阻塞的情况下实现同步.这样可以避免竞态.死锁等问题. 原理 CAS是指Compare-and-swap或Compare-and-Set CAS是一个原子操作,用于多线程环境下的同步.它比较内存中的内容和给定的值,只有当两者相同时(说明其未被修改),才会修改内存中的内容. 实现如下: int comp

4.锁--无锁编程以及CAS

无锁编程以及CAS 无锁编程 / lock-free / 非堵塞同步 无锁编程,即不使用锁的情况下实现多线程之间的变量同步,也就是在没有线程被堵塞的情况下实现变量的同步,所以也叫非堵塞同步(Non-blocking Synchronization). 实现非堵塞同步的方案称为"无锁编程算法"( Non-blocking algorithm). lock-free是眼下最常见的无锁编程的实现级别(一共三种级别). 为什么要 Non-blocking sync ? 使用lock实现线程同步

海量并发的无锁编程 (lock free programming)

最近在做在线架构的实现,在线架构和离线架构近线架构最大的区别是服务质量(SLA,Service Level Agreement,SLA 99.99代表10K的请求最多一次失败或者超时)和延时.而离线架构在意的是吞吐,SLA的不会那么严苛,比如99.9.离线架构一般要有流控,以控制用户发送请求的速度.以免多于服务端处理能力的请求造成大量的数据在buffer或者队列里堆积,造成大量的超时.在线架构不可能有流控了,你不能限制用户的请求.因此在线架构对于弹性扩容有很高的要求,在大量请求到来时自动扩展后台

[转载] 无锁编程本质论

原文: http://weibo.com/p/1001603876869958445266 作者:新浪微博(@NP等不等于P) 计算机学习微信公众号(jsj_xx) 无锁编程真的是不涉及锁么?无锁编程实现的本质是什么?需要操作系统或者编译器的支持么?本文尝试解答这些问题. 1 锁引发的问题 使用锁时要特别防止出现死锁或活锁.死锁的情况很简单,就是申请者在申请过程中由于顺序原因(多个锁没有按固定顺序申请)进入堵塞状态了,指定顺序即可规避.我们只看一个活锁的例子: 两个线程都在尽量避免死锁,但是却有

无锁编程与有锁编程的性能对比与分析

最近维护的一个网络服务器遇到性能问题,于是就对原有的程序进行了较大的框架改动.改动最多的是线程工作模式与数据传递方式,最终的结果是改变锁的使用模式.经过一番改进,基本上可以做到 GMb 网卡全速工作处理.在 性能达标之后,一度在想有没有什么办法使用更加轻量级锁,或者去掉锁的使用,为此搜索一些相关的研究成果,并做了一些实验来验证这些成果,因而就有这篇文章.希望有做类似工作的同行可以有所借鉴.如果有人也有相关的经验,欢迎和我交流. 1 无锁编程概述 本节主要对文献 [1] 进行概括,做一些基础知识的

无锁编程以及CAS

无锁编程以及CAS 无锁编程 / lock-free / 非阻塞同步 无锁编程,即不使用锁的情况下实现多线程之间的变量同步,也就是在没有线程被阻塞的情况下实现变量的同步,所以也叫非阻塞同步(Non-blocking Synchronization). 实现非阻塞同步的方案称为"无锁编程算法"( Non-blocking algorithm). lock-free是目前最常见的无锁编程的实现级别(一共三种级别). 为什么要 Non-blocking sync ? 使用lock实现线程同步

透过 Linux 内核看无锁编程

非阻塞型同步 (Non-blocking Synchronization) 简介 如何正确有效的保护共享数据是编写并行程序必须面临的一个难题,通常的手段就是同步.同步可分为阻塞型同步(Blocking Synchronization)和非阻塞型同步( Non-blocking Synchronization). 阻塞型同步是指当一个线程到达临界区时,因另外一个线程已经持有访问该共享数据的锁,从而不能获取锁资源而阻塞,直到另外一个线程释放锁.常见的同步原语有 mutex.semaphore 等.如

无锁编程:采用不可变类减少锁的使用

很多的同学很少使用.或者干脆不了解不可变类(Immutable Class).直观上很容易认为Immutable类效率不高,或者难以理解他的使用场景.其实不可变类是非常有用的,可以提高并行编程的效率和优化设计.让我们跳过一些宽泛的介绍,从一个常见的并行编程场景说起: 假设系统需要实时地处理大量的订单,这些订单的处理依赖于用户的配置,例如用户的会员级别.支付方式等.程序需要通过这些配置的参数来计算订单的价格.而用户配置同时被另外一些线程更新.显然,我们在订单计算的过程中保持配置的一致性. 上面的例