php模拟并发

原文: http://blog.csdn.net/zhang_xinglong/article/details/16339867

-------------------------------------------------------------------------------------------------------------------------------

并发请求理论描述:假设有一个client,程序逻辑是要请求三个不同的server,处理各自的响应。传统模型当然是顺序执行,先发送第一个请求,等待收到响应数据后再发送第二个请求,以此类推。就像是单核CPU,一次只能处理一件事,其他事情被暂时阻塞。而并发模式可以让三个server同时处理各自请求,这就可以使大量时间复用。

画个图更好说明问题:

前者为阻塞模式,忽略请求响应等时间,总耗时为700ms;而后者非阻塞模式,由于三个请求可以同时得到处理,总耗时只有300ms。所谓阻塞方式block,顾名思义,就是进程或是线程执行到这些函数时必须等待某个事件的发生,如果事件没有发生,进程或线程就被阻塞,函数不能立即返回。所谓非阻塞方式non-block,就是进程或线程执行此函数时不必非要等待事件的发生,一旦执行肯定返回,以返回值的不同来反映函数的执行情况,如果事件发生则与阻塞方式相同,若事件没有发生则返回一个代码来告知事件未发生,而进程或线程继续执行,所以效率较高。

PHP本身是不支持多线程的,但是它可以利用Linux和apache的多线程能力。php模拟的多线程其实只是多进程,并不是真正的多线程。以下是几种php模拟多线程的方法:
1.php+shell (利用linux os)

php代码(test.php):

[php] view plain copy

  1. <?php
  2. for($i = 0; $i < 10; $i++)
  3. {
  4. echo $i;
  5. sleep(5); //这里为了方便看效果sleep一下让脚本执行时间更长
  6. }
  7. ?>

shell代码(test.sh):

[plain] view plain copy

  1. #!/bin/bash
  2. for i in 1 2 3 4 5
  3. do
  4. /usr/bin/php -r -q test.php &
  5. done

注意:
在请求php代码的那行末尾有一个&符号,这个是关键,不加的话是不能进行多线程的,&表示将服务推送到后台执行,因此在shell的每次的循环中不必等php的代码全部执行完在请求下一个文件,而是同时进行的,这样就实现了多线程,下面运行下shell看下效果,这里你将看到5个test.php进程,再利用linux的定时器,定时请求这个shell,在处理一些需要多线程的任务,例如,批量下载时,非常好用!
参考:http://blog.csdn.net/tianmohust/article/details/8208627

2.php+pcntl(利用linux os)
只能用在Unix Like OS,Windows不可用。且推荐仅仅在CLI模式运行,不要在WEB服务器环境运行。

[php] view plain copy

  1. <?php
  2. declare(ticks=1);
  3. //是否等待进程结束
  4. $bWaitFlag = FALSE;
  5. //进程总数
  6. $intNum = 10;
  7. //进程PID数组
  8. $pids = array();
  9. echo ("Start\n");
  10. for($i = 0; $i < $intNum; $i++)
  11. {
  12. //产生子进程,而且从当前行之下开试运行代码,而且不继承父进程的数据信息
  13. $pids[$i] = pcntl_fork();
  14. if( ! $pids[$i])
  15. {
  16. //子进程进程代码段_Start
  17. $str = "";
  18. sleep(5+$i);
  19. for ($j = 0; $j < $i; $j++)
  20. {
  21. $str .= "*";
  22. }
  23. echo "$i -> " . time() . " $str \n";
  24. exit();
  25. //子进程进程代码段_End
  26. }
  27. }
  28. if ($bWaitFlag)
  29. {
  30. for($i = 0; $i < $intNum; $i++)
  31. {
  32. pcntl_waitpid($pids[$i], $status, WUNTRACED);
  33. echo "wait $i -> " . time() . "\n";
  34. }
  35. }
  36. echo ("End\n");
  37. ?>

运行结果如下:

[plain] view plain copy

  1. [[email protected] qiao]$ php test.php
  2. Start
  3. End
  4. [[email protected] qiao]$ ps -aux | grep "php"
  5. qiao      32275   0.0   0.5 49668 6148 pts/1     S     14:03    0:00 /usr/local/php4/b
  6. qiao      32276   0.0   0.5 49668 6152 pts/1     S     14:03    0:00 /usr/local/php4/b
  7. qiao      32277   0.0   0.5 49668 6152 pts/1     S     14:03    0:00 /usr/local/php4/b
  8. qiao      32278   0.0   0.5 49668 6152 pts/1     S     14:03    0:00 /usr/local/php4/b
  9. qiao      32279   0.0   0.5 49668 6152 pts/1     S     14:03    0:00 /usr/local/php4/b
  10. qiao      32280   0.0   0.5 49668 6152 pts/1     S     14:03    0:00 /usr/local/php4/b
  11. qiao      32281   0.0   0.5 49668 6152 pts/1     S     14:03    0:00 /usr/local/php4/b
  12. qiao      32282   0.0   0.5 49668 6152 pts/1     S     14:03    0:00 /usr/local/php4/b
  13. qiao      32283   0.0   0.5 49668 6152 pts/1     S     14:03    0:00 /usr/local/php4/b
  14. qiao      32284   0.0   0.5 49668 6152 pts/1     S     14:03    0:00 /usr/local/php4/b
  15. qiao      32286   0.0   0.0   1620   600 pts/1     S     14:03    0:00 grep php
  16. [[email protected] qiao]$ 0 -> 1133503401
  17. 1 -> 1133503402 *
  18. 2 -> 1133503403 **
  19. 3 -> 1133503404 ***
  20. 4 -> 1133503405 ****
  21. 5 -> 1133503406 *****
  22. 6 -> 1133503407 ******
  23. 7 -> 1133503408 *******
  24. 8 -> 1133503409 ********
  25. 9 -> 1133503410 *********
  26. [[email protected] qiao]$

如果$bWaitFlag=TURE,则结果如下:

[plain] view plain copy

  1. [[email protected] qiao]$ php test.php
  2. Start
  3. 0 -> 1133503602
  4. wait 0 -> 1133503602
  5. 1 -> 1133503603 *
  6. wait 1 -> 1133503603
  7. 2 -> 1133503604 **
  8. wait 2 -> 1133503604
  9. 3 -> 1133503605 ***
  10. wait 3 -> 1133503605
  11. 4 -> 1133503606 ****
  12. wait 4 -> 1133503606
  13. 5 -> 1133503607 *****
  14. wait 5 -> 1133503607
  15. 6 -> 1133503608 ******
  16. wait 6 -> 1133503608
  17. 7 -> 1133503609 *******
  18. wait 7 -> 1133503609
  19. 8 -> 1133503610 ********
  20. wait 8 -> 1133503610
  21. 9 -> 1133503611 *********
  22. wait 9 -> 1133503611
  23. End
  24. [[email protected] qiao]$

从多进程的例子可以看出,使用pcntl_fork()之后,将生成一个子进程,而且子进程运行的代码,从pcntl_fork()之后的代码开始,而子进程不继承父进程的数据信息(实际上是把父进程的数据做了一个全新的拷贝),因而使用if(!$pids[$i]) 来控制子进程实际运行的代码段。
参考:http://hi.baidu.com/tangyubinsir/item/43c04f85ea7709d4d1f8cd84和http://www.itlearner.com/article/4908
3.php+pthreads
参考:http://blog.csdn.net/leinchu/article/details/11795985
4.php+socket(利用web server)
假设你要建立一个服务来检查正在运行的n台服务器,以确定他们还在正常运转。你可能会写下面这样的代码:

[php] view plain copy

  1. <?php
  2. $hosts = array("www.baidu.com", "www.sohu.com", "www.163.com");
  3. $timeout = 15;
  4. $status = array();
  5. foreach ($hosts as $host)
  6. {
  7. $errno = 0;
  8. $errstr = "";
  9. $s = fsockopen($host, 80, $errno, $errstr, $timeout);
  10. if ($s)
  11. {
  12. $status[$host] = "Connected\n";
  13. fwrite($s, "HEAD / HTTP/1.0\r\nHost: $host\r\n\r\n"); //第二个参数是HTTP协议中规定的请求头,不明白的请看RFC中的定义
  14. do
  15. {
  16. $data = fread($s, 8192);
  17. if (strlen($data) == 0)
  18. {
  19. break;
  20. }
  21. $status[$host] .= $data; //返回连接状态
  22. }
  23. while (true);
  24. fclose($s);
  25. }
  26. else
  27. {
  28. $status[$host] = "Connection failed: $errno $errstrn";
  29. }
  30. }
  31. echo ‘<pre>‘;
  32. print_r($status);
  33. ?>

它运行的很好,但是在fsockopen()分析完hostname并且建立一个成功的连接(或者延时$timeout秒)之前,扩充这段代码来管理大量服务器将耗费很长时间。
因此我们必须放弃这段代码;我们可以建立异步连接-不需要等待fsockopen返回连接状态。PHP仍然需要解析hostname(所以直接使用ip更加明智),不过将在打开一个连接之后立刻返回,继而我们就可以连接下一台服务器。
有两种方法可以实现;PHP5中可以使用新增的stream_socket_client()函数直接替换掉fsocketopen()。PHP5之前的版本,你需要自己动手,用sockets扩展解决问题。
下面是PHP5中的解决方法:

[php] view plain copy

  1. <?php
  2. $hosts = array("www.baidu.com", "www.sohu.com", "www.163.com");
  3. $timeout = 15;
  4. $status = array();
  5. $sockets = array();
  6. /* Initiate connections to all the hosts simultaneously */
  7. foreach ($hosts as $id => $host)
  8. {
  9. $s = stream_socket_client(
  10. "$host:80", $errno, $errstr, $timeout,
  11. TREAM_CLIENT_ASYNC_CONNECT|STREAM_CLIENT_CONNECT);
  12. /* 这里需要稍微延迟一下,否则下面fwrite中的socket句柄不一定能真正使用
  13. * 这里应该是PHP的一处bug,查了一下,官方bug早在08年就有人提交了
  14. * 我的5.2.8中尚未解决,不知最新的5.3中是否修正
  15. */
  16. usleep(10);
  17. if ($s)
  18. {
  19. $sockets[$id] = $s;
  20. $status[$hosts[$id]] = "in progress";
  21. }
  22. else
  23. {
  24. $status[$hosts[$id]] = "failed, $errno $errstr";
  25. }
  26. }
  27. /* Now, wait for the results to come back in */
  28. while (count($sockets))
  29. {
  30. $read = $write = $sockets;
  31. //     $e = null;
  32. /* This is the magic function - explained below */
  33. $n = stream_select($read, $write, $e, $timeout);
  34. if ($n > 0) //据说stream_select返回值不总是可信任的
  35. //     if (count($read))
  36. {
  37. /* readable sockets either have data for us, or are failed connection attempts */
  38. foreach ($read as $r)
  39. {
  40. /* stream_select generally shuffles $read, so we need to
  41. compute from which socket(s) we‘re reading. */
  42. $id = array_search($r, $sockets);
  43. $data = fread($r, 8192);
  44. /* A socket is readable either because it has
  45. data to read, OR because it‘s at EOF. */
  46. if (strlen($data) == 0)
  47. {
  48. if ($status[$hosts[$id]] == "in progress")
  49. {
  50. $status[$hosts[$id]] = "failed to connect";
  51. }
  52. fclose($r);
  53. unset($sockets[$id]);
  54. }
  55. else
  56. {
  57. $status[$hosts[$id]] = $data;
  58. }
  59. }
  60. /* writeable sockets can accept an HTTP request */
  61. foreach ($write as $w)
  62. {
  63. $id = array_search($w, $sockets);
  64. if(is_resource($w) && feof($w) === FALSE)
  65. {
  66. @fwrite($w, "HEAD / HTTP/1.0\r\nHost: " . $hosts[$id] .  "\r\n\r\n");
  67. //                     $flag && $status[$hosts[$id]] = "waiting for response";
  68. }
  69. }
  70. }
  71. else
  72. {
  73. /* timed out waiting; assume that all hosts associated with $sockets are faulty */
  74. foreach ($sockets as $id => $s)
  75. {
  76. $status[$hosts[$id]] = "timed out " . $status[$hosts[$id]];
  77. }
  78. break;
  79. }
  80. }
  81. echo ‘<pre>‘;var_dump($status);
  82. ?>

我们用stream_select()等待sockets打开的连接事件。stream_select()调用系统的select()函数来工作:前面三个参数是你要使用的streams的数组;你可以对其读取,写入和获取异常(分别针对三个参数)。stream_select()可以通过设置$timeout(秒)参数来等待事件发生-事件发生时,相应的sockets数据将写入你传入的参数。
下面是PHP4.1.0之后版本的实现,如果你已经在编译PHP时包含了sockets(ext/sockets)支持,你可以使用根上面类似的代 码,只是需要将上面的streams/filesystem函数的功能用ext/sockets函数实现。主要的不同在于我们用下面的函数代替 stream_socket_client()来建立连接:

[php] view plain copy

  1. <?php
  2. // This value is correct for Linux, other systems have other values
  3. define(‘EINPROGRESS‘, 115);
  4. function non_blocking_connect($host, $port, &$errno, &$errstr, $timeout) {
  5. $ip = gethostbyname($host);
  6. $s = socket_create(AF_INET, SOCK_STREAM, 0);
  7. if (socket_set_nonblock($s)) {
  8. $r = @socket_connect($s, $ip, $port);
  9. if ($r || socket_last_error() == EINPROGRESS) {
  10. $errno = EINPROGRESS;
  11. return $s;
  12. }
  13. }
  14. $errno = socket_last_error($s);
  15. $errstr = socket_strerror($errno);
  16. socket_close($s);
  17. return false;
  18. }
  19. ?>

现在用socket_select()替换掉stream_select(),用socket_read()替换掉fread(),用socket_write()替换掉fwrite(),用socket_close()替换掉fclose()就可以执行脚本了! PHP5的先进之处在于,你可以用stream_select()处理几乎所有的stream。例如你可以通过include STDIN用它接收键盘输入并保存进数组,你还可以接收通过proc_open()打开的管道中的数据。
注:select在socket编程中还是比较重要的,可是对于初学socket的人来说都不太爱用select写程序,他们只是习惯写诸如connect、 accept、recv或recvfrom这样的阻塞程序。可是使用select就可以完成非阻塞方式工作的程序,它能够监视我们需要监视的文件描述符的变化情况——读写或是异常。
参考:http://blog.csdn.net/21aspnet/article/details/7420024
5.php+curl
(1)经典curl并发机制和存在问题
经典的cURL实现机制在网上很容易找到, 比如参考PHP在线手册的如下实现方式:

[php] view plain copy

  1. <?php
  2. function classic_curl($urls, $delay) {
  3. $queue = curl_multi_init();
  4. $map = array();
  5. foreach ($urls as $url) {
  6. // create cURL resources
  7. $ch = curl_init();
  8. // set URL and other appropriate options
  9. curl_setopt($ch, CURLOPT_URL, $url);
  10. curl_setopt($ch, CURLOPT_TIMEOUT, 1);
  11. curl_setopt($ch, CURLOPT_RETURNTRANSFER, 1);
  12. curl_setopt($ch, CURLOPT_HEADER, 0);
  13. curl_setopt($ch, CURLOPT_NOSIGNAL, true);
  14. // add handle
  15. curl_multi_add_handle($queue, $ch);
  16. $map[$url] = $ch;
  17. }
  18. $active = null;
  19. // execute the handles
  20. do {
  21. $mrc = curl_multi_exec($queue, $active);
  22. } while ($mrc == CURLM_CALL_MULTI_PERFORM);
  23. while ($active > 0 && $mrc == CURLM_OK) {
  24. if (curl_multi_select($queue, 0.5) != -1) {
  25. do {
  26. $mrc = curl_multi_exec($queue, $active);
  27. } while ($mrc == CURLM_CALL_MULTI_PERFORM);
  28. }
  29. }
  30. $responses = array();
  31. foreach ($map as $url=>$ch) {
  32. $responses[$url] = callback(curl_multi_getcontent($ch), $delay);
  33. curl_multi_remove_handle($queue, $ch);
  34. curl_close($ch);
  35. }
  36. curl_multi_close($queue);
  37. return $responses;
  38. }
  39. ?>

首先将所有的URL压入并发队列, 然后执行并发过程, 等待所有请求接收完之后进行数据的解析等后续处理. 在实际的处理过程中, 受网络传输的影响, 部分URL的内容会优先于其他URL返回, 但是经典cURL并发必须等待最慢的那个URL返回之后才开始处理, 等待也就意味着CPU的空闲和浪费. 如果URL队列很短, 这种空闲和浪费还处在可接受的范围, 但如果队列很长, 这种等待和浪费将变得不可接受.
(2)改进的rolling curl并发方式
仔细分析不难发现经典cURL并发还存在优化的空间, 优化的方式时当某个URL请求完毕之后尽可能快的去处理它, 边处理边等待其他的URL返回, 而不是等待那个最慢的接口返回之后才开始处理等工作, 从而避免CPU的空闲和浪费. 闲话不多说, 下面贴上具体的实现:

[php] view plain copy

  1. <?php
  2. function rolling_curl($urls, $delay) {
  3. $queue = curl_multi_init();
  4. $map = array();
  5. foreach ($urls as $url) {
  6. $ch = curl_init();
  7. curl_setopt($ch, CURLOPT_URL, $url);
  8. curl_setopt($ch, CURLOPT_TIMEOUT, 1);
  9. curl_setopt($ch, CURLOPT_RETURNTRANSFER, 1);
  10. curl_setopt($ch, CURLOPT_HEADER, 0);
  11. curl_setopt($ch, CURLOPT_NOSIGNAL, true);
  12. curl_multi_add_handle($queue, $ch);
  13. $map[(string) $ch] = $url;
  14. }
  15. $responses = array();
  16. do {
  17. while (($code = curl_multi_exec($queue, $active)) == CURLM_CALL_MULTI_PERFORM) ;
  18. if ($code != CURLM_OK) { break; }
  19. // a request was just completed -- find out which one
  20. while ($done = curl_multi_info_read($queue)) {
  21. // get the info and content returned on the request
  22. $info = curl_getinfo($done[‘handle‘]);
  23. $error = curl_error($done[‘handle‘]);
  24. $results = callback(curl_multi_getcontent($done[‘handle‘]), $delay);
  25. $responses[$map[(string) $done[‘handle‘]]] = compact(‘info‘, ‘error‘, ‘results‘);
  26. // remove the curl handle that just completed
  27. curl_multi_remove_handle($queue, $done[‘handle‘]);
  28. curl_close($done[‘handle‘]);
  29. }
  30. // Block for data in / output; error handling is done by curl_multi_exec
  31. if ($active > 0) {
  32. curl_multi_select($queue, 0.5);
  33. }
  34. } while ($active);
  35. curl_multi_close($queue);
  36. return $responses;
  37. }
  38. ?>

(3)两种并发实现的性能对比
性能测试中用到的回调函数为:

[php] view plain copy

  1. function callback($data, $delay) {
  2. preg_match_all(‘/<h3>(.+)<\/h3>/iU‘, $data, $matches);
  3. usleep($delay);
  4. return compact(‘data‘, ‘matches‘);
  5. }

数据处理回调无延迟时: Rolling Curl略优, 但性能提升效果不明显.数据处理回调延迟5毫秒: Rolling Curl完胜, 性能提升40%左右.通过上面的性能对比, 在处理URL队列并发的应用场景中Rolling cURL应该是更加的选择, 并发量非常大(1000+)时, 可以控制并发队列的最大长度, 比如20, 每当1个URL返回并处理完毕之后立即加入1个尚未请求的URL到队列中, 这样写出来的代码会更加健壮, 不至于并发数太大而卡死或崩溃.

时间: 2024-11-07 01:01:32

php模拟并发的相关文章

C# 模拟并发

每次写博客,第一句话都是这样的:程序员很苦逼,除了会写程序,还得会写博客! 当然,题外话说多了,咱进入正题! 在处理大数据的时候,经常会发生并发,并发的情况发生后,会出现数据污读,从而产生脏数据. 首先通过一段程序进行说明.<有兴趣的小伙伴可以复制粘贴这段程序>. 项目背景:模拟大转盘抽奖程序. 场下坐有近万名群众,他们在同一时刻同时抽奖,奖品分为一等奖:奔驰汽车10辆,二等奖:别克汽车20辆,三等奖:现代汽车30辆.(奖品信息存入数据库) 奖品信息如下(数据库部分): create tabl

linux系统里模拟并发请求siege

siege压力测试,siege会将接口进行模拟并发,返回每秒的并发数! 一.siege的安装下载:wget 一.siege的安装下载:wget http://download.joedog.org/siege/siege-latest.tar.gz http://www.joedog.org/pub/siege/siege-latest.tar.gz解压: tar -zxvf siege-latest.tar.gzcd siege-..*./configure --prefix=/usr/loc

shell模拟并发执行

参考: http://www.51testing.com/html/28/116228-238978.html http://blog.chinaunix.net/uid-27571599-id-3473078.html         在bash中,使用后台任务来实现任务的多进程化.在不加控制的模式下,不管有多少任务,全部都后台执行.也就是说,在这种情况下,有多少任务就有多少"进程"在同时执行.   实例一:正常脚本(脚本功能:查看一个文件中的IP列表循环测试主机连通性) [[ema

【转】Jmeter压力测试模拟并发

jmeter下载地址:http://jmeter.apache.org/download_jmeter.cgi JMeterPlugins(jmeter插件):http://jmeter-plugins.org/downloads/all/ 下面直接贴使用步骤: 第二步:添加Sampler 第三步:配置HTTP请求 第四步:添加监听器 最后配置线程组: 好了,现在按一下 ctrl+R 开始运行,并发发送请求了.

java线程池模拟并发

public class CountDownLatchTest1 implements Runnable{ final AtomicInteger number = new AtomicInteger(); volatile boolean bol = false; @Override public void run() { System.out.println(number.getAndIncrement()); synchronized (this) { try { if (!bol) {

[编码模式]单线程模拟并发

背景 曾几何时,机器支持的线程数目是一个8位的记录的.这就意味着最多支持的线程数目是255个.如果我们需要同时的执行流上千怎么办. 虽然现在有多核多线程,如果一个CPU已经满足需要你又想减少多线程开发的成本代价. 场景 此模式的核心是一个在单线程中执行的循环.循环通过等待需要的处理的任务. 实例 Task.java public interface Task {     public void execute(); } PrintTask.java public class PrintTask 

CountDownLatch和CyclicBarrier模拟同时并发请求

有时候要测试一下某个功能的并发能力,又不要想借助于其他测试工具,索性就自己写简单的demo模拟一个并发请求就最方便了.如果熟悉jemter的测试某接口的并发能力其实更专业,此处只是自己折腾着玩. CountDownLatch和CyclicBarrier是jdk concurrent包下非常有用的两个并发工具类,它们提供了一种控制并发流程的手段.其实查看源码它们都是在内部维护了一个计数器控制流程的 CountDownLatch:一个或者多个线程,等待其他多个线程完成某件事情之后才能执行:Cycli

sql server对并发的处理-乐观锁和悲观锁

假如两个线程同时修改数据库同一条记录,就会导致后一条记录覆盖前一条,从而引发一些问题. 例如: 一个售票系统有一个余票数,客户端每调用一次出票方法,余票数就减一. 情景: 总共300张票,假设两个售票点,恰好在同一时间出票,它们做的操作都是先查询余票数,然后减一. 一般的sql语句: 1 2 3 4 5 6 7 8 9 declare @count as int begin tran     select @count=count from ttt     WAITFOR DELAY '00:0

利用锁机制解决商品表和库存表并发问题

锁机制 问题:当一个脚本被一个客户端访问都正常,但当多个客户端同时并发访问时,这个脚本的结果会出现不正确,这个问题需要使用锁机制来解决.在我们这个网站中需要用到锁的地方就是高并发下定单时减少商品库存量时. 比如例子1: 有一个A 表里面一个ID数字: 现在写一个脚本操作这个A表,每次访问把ID减少: 这个脚使用AB模拟10个用户并发访问时会发现减少的数量并不是10: . 例子2:在高并发下定单时如果要减少库存量,那么库存就会出问题: 加锁之前: 加锁之后: 现在有两种锁机制:MYSQL中的表锁和