Nodejs - 框架类库 - Nodejs异步流程控制Async

简介

Async是一个流程控制工具包,提供了直接而强大的异步功能

应用场景

业务流程逻辑复杂,适应异步编程,减少回调的嵌套

安装

npm insatll async

函数介绍

  • Collections

each: 如果想对同一个集合中的所有元素都执行同一个异步操作。

  1 var async = require(‘async‘);
  2
  3 var t = require(‘./t‘);
  4 var log = t.log;
  5
  6 /**
  7  *
  8  * async提供了三种方式:
  9  * 1. 集合中所有元素并行执行
 10  * 2. 一个一个顺序执行
 11  * 3. 分批执行,同一批内并行,批与批之间按顺序
 12  *
 13  * 如果中途出错,则错误将上传给最终的callback处理。其它已经启动的任务继续执行,未启动的忽略。
 14  */
 15 // each(arr, iterator(item, callback), callback(err))
 16
 17
 18 var arr = [{name:‘Jack‘, delay: 200},
 19            {name:‘Mike‘, delay: 100},
 20            {name:‘Freewind‘, delay: 300}];
 21
 22 /**
 23  * 所有操作并发执行,且全部未出错,最终得到的err为undefined。注意最终callback只有一个参数err。
 24  */
 25 // 1.1
 26 async.each(arr, function(item, callback) {
 27     log(‘1.1 enter: ‘ + item.name);
 28     setTimeout(function(){
 29         log(‘1.1 handle: ‘ + item.name);
 30         callback(null, item.name);
 31     }, item.delay);
 32 }, function(err) {
 33     log(‘1.1 err: ‘ + err);
 34 });
 35 // 输出如下:
 36 // 42.244> 1.1 enter: Jack
 37 // 42.245> 1.1 enter: Mike
 38 // 42.245> 1.1 enter: Freewind
 39 // 42.350> 1.1 handle: Mike
 40 // 42.445> 1.1 handle: Jack
 41 // 42.554> 1.1 handle: Freewind
 42 // 42.554> 1.1 err: undefined
 43
 44
 45 /**
 46  * 如果中途出错,则出错后马上调用最终的callback。其它未执行完的任务继续执行。
 47  */
 48 async.each(arr,function(item, callback) {
 49     log(‘1.2 enter: ‘ +item.name);
 50     setTimeout(function() {
 51         log(‘1.2 handle: ‘ + item.name);
 52         if(item.name===‘Jack‘) {
 53             callback(‘myerr‘);
 54         }
 55     }, item.delay);
 56 }, function(err) {
 57     log(‘1.2 err: ‘ + err);
 58 });
 59 // 输出如下:
 60 // 42.246> 1.2 enter: Jack
 61 // 42.246> 1.2 enter: Mike
 62 // 42.246> 1.2 enter: Freewind
 63 // 42.350> 1.2 handle: Mike
 64 // 42.445> 1.2 handle: Jack
 65 // 42.446> 1.2 err: myerr
 66 // 42.555> 1.2 handle: Freewind
 67
 68 /**
 69  * 与each相似,但不是并行执行。而是一个个按顺序执行。
 70  */
 71 async.eachSeries(arr, function(item, callback) {
 72     log(‘1.3 enter: ‘ + item.name);
 73     setTimeout(function(){
 74         log(‘1.3 handle: ‘ + item.name);
 75         callback(null, item.name);
 76     }, item.delay);
 77 }, function(err) {
 78     log(‘1.3 err: ‘ + err);
 79 });
 80 // 42.247> 1.3 enter: Jack
 81 // 42.459> 1.3 handle: Jack
 82 // 42.459> 1.3 enter: Mike
 83 // 42.569> 1.3 handle: Mike
 84 // 42.569> 1.3 enter: Freewind
 85 // 42.883> 1.3 handle: Freewind
 86 // 42.883> 1.3 err: undefined
 87
 88 /**
 89  * 如果中途出错,则马上把错误传给最终的callback,还未执行的不再执行。
 90  */
 91 async.eachSeries(arr,function(item, callback) {
 92     log(‘1.4 enter: ‘ +item.name);
 93     setTimeout(function() {
 94         log(‘1.4 handle: ‘ + item.name);
 95         if(item.name===‘Jack‘) {
 96             callback(‘myerr‘);
 97         }
 98     }, item.delay);
 99 }, function(err) {
100     log(‘1.4 err: ‘ + err);
101 });
102 // 42.247> 1.4 enter: Jack
103 // 42.460> 1.4 handle: Jack
104 // 42.460> 1.4 err: myerr
105
106 /**
107  * 分批执行,第二个参数是每一批的个数。每一批内并行执行,但批与批之间按顺序执行。
108  */
109 async.eachLimit(arr, 2, function(item, callback) {
110     log(‘1.5 enter: ‘ + item.name);
111     setTimeout(function(){
112         log(‘1.5 handle: ‘ + item.name);
113         callback(null, item.name);
114     }, item.delay);
115 }, function(err) {
116     log(‘1.5 err: ‘ + err);
117 });
118 // 42.247> 1.5 enter: Jack
119 // 42.248> 1.5 enter: Mike
120 // 42.351> 1.5 handle: Mike
121 // 42.352> 1.5 enter: Freewind
122 // 42.461> 1.5 handle: Jack
123 // 42.664> 1.5 handle: Freewind
124 // 42.664> 1.5 err: undefined
125
126 /**
127  * 如果中途出错,错误将马上传给最终的callback。同一批中的未执行完的任务还将继续执行,但下一批及以后的不再执行。
128  */
129 async.eachLimit(arr,2,function(item, callback) {
130     log(‘1.6 enter: ‘ +item.name);
131     setTimeout(function() {
132         log(‘1.6 handle: ‘ + item.name);
133         if(item.name===‘Jack‘) {
134             callback(‘myerr‘);
135         }
136     }, item.delay);
137 }, function(err) {
138     log(‘1.6 err: ‘ + err);
139 });
140 // 42.248> 1.6 enter: Jack
141 // 42.248> 1.6 enter: Mike
142 // 42.352> 1.6 handle: Mike
143 // 42.462> 1.6 handle: Jack
144 // 42.462> 1.6 err: myerr

map: 对集合中的每一个元素,执行某个异步操作,得到结果。所有的结果将汇总到最终的callback里。与each的区别是,each只关心操作不管最后的值,而map关心的最后产生的值。

  1 var async = require(‘async‘);
  2
  3 var t = require(‘./t‘);
  4 var log = t.log;
  5
  6 /**
  7
  8  * 提供了两种方式:
  9  * 1. 并行执行。同时对集合中所有元素进行操作,结果汇总到最终callback里。如果出错,则立刻返回错误以及已经执行完的任务的结果,未执行完的占个空位
 10  * 2. 顺序执行。对集合中的元素一个一个执行操作,结果汇总到最终callback里。如果出错,则立刻返回错误以及已经执行完的结果,未执行的被忽略。
 11  */
 12 // map(arr, iterator(item, callback), callback(err, results))
 13
 14 var arr = [{name:‘Jack‘, delay:200}, {name:‘Mike‘, delay: 100}, {name:‘Freewind‘, delay:300}, {name:‘Test‘, delay: 50}];
 15
 16 /**
 17  * 所有操作均正确执行,未出错。所有结果按元素顺序汇总给最终的callback。
 18  */
 19 // 1.1
 20 async.map(arr, function(item, callback) {
 21     log(‘1.1 enter: ‘ + item.name);
 22     setTimeout(function() {
 23         log(‘1.1 handle: ‘ + item.name);
 24         callback(null, item.name + ‘!!!‘);
 25     }, item.delay);
 26 }, function(err,results) {
 27     log(‘1.1 err: ‘, err);
 28     log(‘1.1 results: ‘, results);
 29 });
 30 // 54.569> 1.1 enter: Jack
 31 // 54.569> 1.1 enter: Mike
 32 // 54.569> 1.1 enter: Freewind
 33 // 54.569> 1.1 enter: Test
 34 // 54.629> 1.1 handle: Test
 35 // 54.679> 1.1 handle: Mike
 36 // 54.789> 1.1 handle: Jack
 37 // 54.879> 1.1 handle: Freewind
 38 // 54.879> 1.1 err:
 39 // 54.879> 1.1 results: [ ‘Jack!!!‘, ‘Mike!!!‘, ‘Freewind!!!‘, ‘Test!!!‘ ]
 40
 41 /**
 42 *  如果中途出错,立刻将错误、以及已经执行完成的结果汇总给最终callback。未执行完的将会在结果数组中用占个空位。
 43 */
 44 async.map(arr, function(item, callback) {
 45     log(‘1.2 enter: ‘ + item.name);
 46     setTimeout(function() {
 47         log(‘1.2 handle: ‘ + item.name);
 48         if(item.name===‘Jack‘) callback(‘myerr‘);
 49         else callback(null, item.name+‘!!!‘);
 50     }, item.delay);
 51 }, function(err, results) {
 52     log(‘1.2 err: ‘, err);
 53     log(‘1.2 results: ‘, results);
 54 });
 55 // 54.569> 1.2 enter: Jack
 56 // 54.569> 1.2 enter: Mike
 57 // 54.569> 1.2 enter: Freewind
 58 // 54.569> 1.2 enter: Test
 59 // 54.629> 1.2 handle: Test
 60 // 54.679> 1.2 handle: Mike
 61 // 54.789> 1.2 handle: Jack
 62 // 54.789> 1.2 err: myerr
 63 // 54.789> 1.2 results: [ undefined, ‘Mike!!!‘, , ‘Test!!!‘ ]
 64 // 54.879> 1.2 handle: Freewind
 65
 66 /**
 67 * 顺序执行,一个完了才执行下一个。
 68 */
 69 async.mapSeries(arr, function(item, callback) {
 70     log(‘1.3 enter: ‘ + item.name);
 71     setTimeout(function() {
 72         log(‘1.3 handle: ‘ + item.name);
 73         callback(null, item.name+‘!!!‘);
 74     }, item.delay);
 75 }, function(err,results) {
 76     log(‘1.3 err: ‘, err);
 77     log(‘1.3 results: ‘, results);
 78 });
 79 // 54.569> 1.3 enter: Jack
 80 // 54.789> 1.3 handle: Jack
 81 // 54.789> 1.3 enter: Mike
 82 // 54.899> 1.3 handle: Mike
 83 // 54.899> 1.3 enter: Freewind
 84 // 55.209> 1.3 handle: Freewind
 85 // 55.209> 1.3 enter: Test
 86 // 55.269> 1.3 handle: Test
 87 // 55.269> 1.3 err:
 88 // 55.269> 1.3 results: [ ‘Jack!!!‘, ‘Mike!!!‘, ‘Freewind!!!‘, ‘Test!!!‘ ]
 89
 90 /**
 91 * 顺序执行过程中出错,只把错误以及执行完的传给最终callback,未执行的忽略。
 92 */
 93 async.mapSeries(arr, function(item, callback) {
 94     log(‘1.4 enter: ‘ + item.name);
 95     setTimeout(function() {
 96         log(‘1.4 handle: ‘ + item.name);
 97         if(item.name===‘Mike‘) callback(‘myerr‘);
 98         else callback(null, item.name+‘!!!‘);
 99     }, item.delay);
100 }, function(err, results) {
101     log(‘1.4 err: ‘, err);
102     log(‘1.4 results: ‘, results);
103 });
104 // 47.616> 1.4 enter: Jack
105 // 47.821> 1.4 handle: Jack
106 // 47.821> 1.4 enter: Mike
107 // 47.931> 1.4 handle: Mike
108 // 47.931> 1.4 err: myerr
109 // 47.932> 1.4 results: [ ‘Jack!!!‘, undefined ]
110
111 /**
112  * 并行执行,同时最多2个函数并行,传给最终callback。
113  */
114 //1.5
115 async.mapLimit(arr,2, function(item, callback) {
116     log(‘1.5 enter: ‘ + item.name);
117     setTimeout(function() {
118         log(‘1.5 handle: ‘ + item.name);
119         if(item.name===‘Jack‘) callback(‘myerr‘);
120         else callback(null, item.name+‘!!!‘);
121     }, item.delay);
122 }, function(err, results) {
123     log(‘1.5 err: ‘, err);
124     log(‘1.5 results: ‘, results);
125 });
126 //57.797> 1.5 enter: Jack
127 //57.800> 1.5 enter: Mike
128 //57.900> 1.5 handle: Mike
129 //57.900> 1.5 enter: Freewind
130 //58.008> 1.5 handle: Jack
131 //58.009> 1.5 err: myerr
132 //58.009> 1.5 results: [ undefined, ‘Mike!!!‘ ]
133 //58.208> 1.5 handle: Freewind
134 //58.208> 1.5 enter: Test
135 //58.273> 1.5 handle: Test

filter: 使用异步操作对集合中的元素进行筛选, 需要注意的是,iterator的callback只有一个参数,只能接收true或false。
reject: reject跟filter正好相反,当测试为true时则抛弃

  1 var async = require(‘async‘);
  2
  3 var t = require(‘./t‘);
  4 var log = t.log;
  5
  6 /**
  7  * 对于出错,该函数没有做出任何处理,直接由nodejs抛出。所以需要注意对Error的处理。
  8  *
  9  * async提供了两种方式:
 10  * 1. 并行执行:filter
 11  * 2. 顺序执行:filterSereis
 12  */
 13 // filter(arr, iterator(item, callback(test)), callback(results))
 14
 15 var arr = [1,2,3,4,5];
 16
 17 /**
 18  * 并行执行,对arr进行筛选。
 19  */
 20 async.filter(arr, function(item, callback) {
 21     log(‘1.1 enter: ‘ + item);
 22     setTimeout(function() {
 23         log(‘1.1 test: ‘ + item);
 24         callback(item>=3);
 25     }, 200);
 26 }, function(results) {
 27     log(‘1.1 results: ‘, results);
 28 });
 29 //16.739> 1.1 enter: 1
 30 //16.749> 1.1 enter: 2
 31 //16.749> 1.1 enter: 3
 32 //16.749> 1.1 enter: 4
 33 //16.749> 1.1 enter: 5
 34 //16.749> 1.3 enter: 1
 35 //16.949> 1.1 test: 1
 36 //16.949> 1.1 test: 2
 37 //16.949> 1.1 test: 3
 38 //16.949> 1.1 test: 4
 39 //16.949> 1.1 test: 5
 40 //16.949> 1.1 results: [ 3, 4, 5 ]
 41
 42
 43 /**
 44 * 串行执行,对arr进行筛选。
 45 */
 46 // 1.3
 47 async.filterSeries(arr, function(item, callback) {
 48     log(‘1.3 enter: ‘ + item);
 49     setTimeout(function() {
 50         log(‘1.3 handle: ‘ + item);
 51         callback(item>=3);
 52     }, 200);
 53 }, function(results) {
 54     log(‘1.3 results: ‘, results);
 55 });
 56 // 16.749> 1.3 enter: 1
 57 // 16.949> 1.3 handle: 1
 58 // 16.949> 1.3 enter: 2
 59 // 17.149> 1.3 handle: 2
 60 // 17.149> 1.3 enter: 3
 61 // 17.369> 1.3 handle: 3
 62 // 17.369> 1.3 enter: 4
 63 // 17.589> 1.3 handle: 4
 64 // 17.589> 1.3 enter: 5
 65 // 17.789> 1.3 handle: 5
 66 // 17.789> 1.3 results: [ 3, 4, 5 ]
 67
 68
 69 /*
 70 * 并行reject
 71 */
 72 // reject(arr, iterator(item, callback(test)), callback(results)
 73 async.reject(arr, function(item, callback) {
 74     log(‘1.4 enter: ‘ + item);
 75     setTimeout(function() {
 76         log(‘1.4 test: ‘ + item);
 77         callback(item>=3);
 78     }, 200);
 79 }, function(results) {
 80     log(‘1.4 results: ‘, results);
 81 });
 82 // 31.359> 1.4 enter: 1
 83 // 31.359> 1.4 enter: 2
 84 // 31.359> 1.4 enter: 3
 85 // 31.359> 1.4 enter: 4
 86 // 31.359> 1.4 enter: 5
 87 // 31.559> 1.4 test: 1
 88 // 31.559> 1.4 test: 2
 89 // 31.559> 1.4 test: 3
 90 // 31.559> 1.4 test: 4
 91 // 31.559> 1.4 test: 5
 92 // 31.569> 1.4 results: [ 1, 2 ]
 93
 94
 95 /**
 96  * 串行执行,对arr进行筛选。
 97  */
 98 // 1.3
 99 async.rejectSeries(arr, function(item, callback) {
100     log(‘1.5 enter: ‘ + item);
101     setTimeout(function() {
102         log(‘1.5 handle: ‘ + item);
103         callback(item>=3);
104     }, 200);
105 }, function(results) {
106     log(‘1.5 results: ‘, results);
107 });
108 //43.592> 1.5 enter: 1
109 //43.799> 1.5 handle: 1
110 //43.800> 1.5 enter: 2
111 //44.004> 1.5 handle: 2
112 //44.007> 1.5 enter: 3
113 //44.210> 1.5 handle: 3
114 //44.211> 1.5 enter: 4
115 //44.412> 1.5 handle: 4
116 //44.413> 1.5 enter: 5
117 //44.614> 1.5 handle: 5
118 //44.616> 1.5 results: [ 1, 2 ]

reduce: 可以让我们给定一个初始值,用它与集合中的每一个元素做运算,最后得到一个值。reduce从左向右来遍历元素,如果想从右向左,可使用reduceRight。

 1 var async = require(‘async‘);
 2
 3 var t = require(‘./t‘);
 4 var log = t.log;
 5
 6
 7 //reduce(arr, memo, iterator(memo,item,callback), callback(err,result))
 8 //alias: inject, foldl
 9
10 var arr = [1,3,5];
11
12 /**
13  * 顺序执行
14  */
15 async.reduce(arr, 100, function(memo, item, callback) {
16     log(‘1.1 enter: ‘ + memo +‘, ‘ + item);
17     setTimeout(function() {
18         callback(null, memo+item);
19     }, 100);
20 },function(err, result) {
21     log(‘1.1 err: ‘, err);
22     log(‘1.1 result: ‘, result);
23 });
24 // 28.789> 1.1 enter: 100, 1
25 // 28.889> 1.1 enter: 101, 3
26 // 28.999> 1.1 enter: 104, 5
27 // 29.109> 1.1 err:
28 // 29.109> 1.1 result: 109
29
30 /**
31  * 顺序执行过程中出错,只把错误传给最终callback,结果是null
32  */
33 async.reduce(arr, 100, function(memo, item, callback) {
34     log(‘1.2 enter: ‘ + memo +‘, ‘ + item);
35     setTimeout(function() {
36         if(item===3) callback(‘myerr‘);
37         else callback(null, memo+item);
38     }, 100);
39 },function(err, result) {
40     log(‘1.2 err: ‘, err);
41     log(‘1.2 result: ‘, result);
42 });
43 // 05.541> 1.2 enter: 100, 1
44 // 05.649> 1.2 enter: 101, 3
45 // 05.760> 1.2 err: myerr
46 // 05.760> 1.2 result:
47
48 /**
49  * 顺序执行从右向左
50  *
51  * alias: foldr
52  */
53 async.reduceRight(arr, 100, function(memo, item, callback) {
54     log(‘1.3 enter: ‘ + memo +‘, ‘ + item);
55     setTimeout(function() {
56         callback(null, memo+item);
57     }, 100);
58 },function(err, result) {
59     log(‘1.3 err: ‘, err);
60     log(‘1.3 result: ‘, result);
61 });
62 // 28.789> 1.3 enter: 100, 5
63 // 28.889> 1.3 enter: 105, 3
64 // 28.999> 1.3 enter: 108, 1
65 // 29.109> 1.3 err:
66 // 29.109> 1.3 result: 109

detect: 用于取得集合中满足条件的第一个元素。

 1 var async = require(‘async‘);
 2
 3 var t = require(‘./t‘);
 4 var log = t.log;
 5
 6 // detect(array, iterator(item,callback(test)), callback(result)
 7
 8 var arr = [
 9     {value:1,delay:500},
10     {value:2,delay:200},
11     {value:3,delay:300}
12 ];
13
14 /**
15  *  并行执行,通过t.inc做一个累加器,得到第一个满足条件的结果对象
16  */
17 async.detect(arr, function(item,callback){
18     log(‘1.1 enter: ‘, item.value);
19     t.inc(item.value, function(err,n) {
20         log(‘1.1 handle: ‘, item.value);
21         callback(n%2===0);
22     }, item.delay);
23 }, function(result) {
24     log(‘1.1 result: ‘, result);
25 });
26 // 09.928> 1.1 enter: 1
27 // 09.928> 1.1 enter: 2
28 // 09.928> 1.1 enter: 3
29 // 10.138> 1.1 handle: 2
30 // 10.228> 1.1 handle: 3
31 // 10.228> 1.1 result: { value: 3, delay: 300 }
32 // 10.438> 1.1 handle: 1
33 // 10.438> 1.1 handle: 1
34
35 /**
36  *  串行执行,通过t.inc做一个累加器,得到第一个满足条件的结果对象
37  */
38 async.detectSeries(arr, function(item,callback) {
39     log(‘1.2 enter: ‘, item.value);
40     t.inc(item.value, function(err,n) {
41         log(‘1.1 handle: ‘, item.value);
42         callback(n%2===0);
43     }, item.delay);
44 }, function(result) {
45     log(‘1.2 result: ‘, result);
46 });
47 // 09.928> 1.2 enter: 1
48 // 10.438> 1.2 result: { value: 1, delay: 500 }

sortBy: 对集合内的元素进行排序,依据每个元素进行某异步操作后产生的值,从小到大排序。

 1 var async = require(‘async‘);
 2
 3 var t = require(‘./t‘);
 4 var log = t.log;
 5
 6 // sortBy(array, iterator(item,callback(err,result)), callback(err,results))
 7
 8 var arr = [3,6,1];
 9
10 /**
11  * 通过异步迭代器,对集合进行排序
12  */
13 async.sortBy(arr, function(item, callback) {
14     setTimeout(function() {
15         callback(null,item);
16     }, 200);
17 }, function(err,results) {
18     log(‘1.1 err: ‘, err);
19     log(‘1.1 results: ‘, results);
20 });
21 // 26.562> 1.1 err: null
22 // 26.562> 1.1 results: [ 1, 3, 6 ]
23
24 /**
25  * 迭代出错,callback返回err,没有results
26  */
27 async.sortBy(arr, function(item, callback) {
28     setTimeout(function() {
29         if(item===6) callback(‘myerr‘);
30         else callback(null,item);
31     }, 200);
32 }, function(err,results) {
33     log(‘1.2 err: ‘, err);
34     log(‘1.2 results: ‘, results);
35 });
36 // 26.572> 1.2 err: myerr
37 // 26.572> 1.2 results:

some: 当集合中是否有至少一个元素满足条件时,即最终callback得到的值为true,否则为false.

 1 var async = require(‘async‘);
 2
 3 var t = require(‘./t‘);
 4 var log = t.log;
 5
 6 // some(arr, iterator(item,callback(test)), callback(result))
 7 //alias: any
 8
 9 var arr = [1,2,3,6];
10
11 /**
12  * 串行执行,集合中至少有一个元素<=3,所以结果为true
13  */
14 // 1.1
15 async.some(arr, function(item,callback){
16     log(‘1.1 enter: ‘,item);
17     setTimeout(function(){
18         log(‘1.1 handle: ‘,item);
19         callback(item<=3);
20     },100);
21 }, function(result) {
22     log(‘1.1 result: ‘, result);
23 });
24 // 36.165> 1.1 enter: 1
25 // 36.165> 1.1 enter: 2
26 // 36.165> 1.1 enter: 3
27 // 36.165> 1.1 enter: 6
28 // 36.275> 1.1 handle: 1
29 // 36.275> 1.1 result: true
30 // 36.275> 1.1 handle: 2
31 // 36.275> 1.1 handle: 3
32 // 36.275> 1.1 handle: 6
33
34
35 /**
36  * 串行执行,集合中没有一个元素>10,所以结果为false
37  */
38 async.some(arr, function(item,callback){
39     log(‘1.2 enter: ‘,item);
40     setTimeout(function(){
41         log(‘1.2 handle: ‘,item);
42         callback(item>10);
43     },100);
44 }, function(result) {
45     log(‘1.2 result: ‘, result);
46 });
47 // 36.165> 1.2 enter: 1
48 // 36.165> 1.2 enter: 2
49 // 36.165> 1.2 enter: 3
50 // 36.165> 1.2 enter: 6
51 // 36.275> 1.2 handle: 1
52 // 36.275> 1.2 handle: 2
53 // 36.275> 1.2 handle: 3
54 // 36.275> 1.2 handle: 6
55 // 36.275> 1.2 result: false

every: 如果集合里每一个元素都满足条件,则传给最终回调的result为true,否则为false

 1 var async = require(‘async‘);
 2
 3 var t = require(‘./t‘);
 4 var log = t.log;
 5
 6 // every(arr, iterator(item,callback), callback(result))
 7 //alias: all
 8
 9 var arr = [1,2,3,6];
10
11 /**
12  * 串行执行,集合中所有的元素都<=10,所以为true
13  */
14 async.every(arr, function(item,callback){
15     log(‘1.1 enter: ‘,item);
16     setTimeout(function(){
17         log(‘1.1 handle: ‘,item);
18         callback(item<=10);
19     },100);
20 }, function(result) {
21     log(‘1.1 result: ‘, result);
22 });
23 // 32.113> 1.1 enter: 1
24 // 32.123> 1.1 enter: 2
25 // 32.123> 1.1 enter: 3
26 // 32.123> 1.1 enter: 6
27 // 32.233> 1.1 handle: 1
28 // 32.233> 1.1 handle: 2
29 // 32.233> 1.1 handle: 3
30 // 32.233> 1.1 handle: 6
31 // 32.233> 1.1 result: true
32
33 /**
34  * 串行执行,集合中至少有一个元素不大于2,所以为false
35  */
36 async.every(arr, function(item,callback){
37     log(‘1.2 enter: ‘,item);
38     setTimeout(function(){
39         log(‘1.2 handle: ‘,item);
40         callback(item>2);
41     },100);
42 }, function(result) {
43     log(‘1.2 result: ‘, result);
44 });
45 // 32.123> 1.2 enter: 1
46 // 32.123> 1.2 enter: 2
47 // 32.123> 1.2 enter: 3
48 // 32.123> 1.2 enter: 6
49 // 32.233> 1.2 handle: 1
50 // 32.233> 1.2 result: false
51 // 32.233> 1.2 handle: 2
52 // 32.233> 1.2 handle: 3
53 // 32.233> 1.2 handle: 6

concat: 将多个异步操作的结果合并为一个数组。

 1 var async = require(‘async‘);
 2
 3 var t = require(‘./t‘);
 4 var log = t.log;
 5
 6 // concat(arr, iterator(item,callback(err,result)), callback(err,result))
 7
 8 var data = {
 9     aaa: [11,22,33],
10     bbb: [44,55],
11     ccc: 66
12 };
13
14 var keys = [
15     {name: ‘aaa‘, delay: 300},
16     {name: ‘bbb‘, delay: 100},
17     {name: ‘ccc‘, delay: 200}
18 ];
19
20 /**
21  * 以并行方式对集合中各元素进行异步操作,然后把得到的结果合并为一个数组,传给最后的callback。
22  */
23 // 1.1
24 async.concat(keys, function(key,callback) {
25     setTimeout(function() {
26         callback(null, data[key.name]);
27     }, key.delay);
28 }, function(err, values) {
29     log(‘1.1 err: ‘, err);
30     log(‘1.1 values: ‘, values);
31 });
32 // 13.539> 1.1 err:
33 // 13.539> 1.1 values: [ 44, 55, 66, 11, 22, 33 ]
34
35 /**
36  * 如果中途出错,则把错误以及已经完成的操作的结果交给最后callback。未执行完的则忽略。
37  */
38 // 1.2
39 async.concat(keys, function(key,callback) {
40     setTimeout(function() {
41         if(key.name===‘ccc‘) callback(‘myerr‘);
42         else callback(null, data[key.name]);
43     }, key.delay);
44 }, function(err, values) {
45     log(‘1.2 err: ‘, err);
46     log(‘1.2 values: ‘, values);
47 });
48 // 13.439> 1.2 err: myerr
49 // 13.439> 1.2 values: [ 44, 55 ]
50
51 /**
52  * 按数组中的元素顺序来执行异步操作,一个完成后才对下一个进行操作。所有结果会汇集成一个数组交给最后的callback。
53  */
54 // concatSeries(arr, iterator, callback)
55
56 // 1.3
57 async.concatSeries(keys, function(key,callback) {
58     setTimeout(function() {
59         callback(null, data[key.name]);
60     }, key.delay);
61 }, function(err, values) {
62     log(‘1.3 err: ‘, err);
63     log(‘1.3 values: ‘, values);
64 });
65 // 13.859> 1.3 err:
66 // 13.859> 1.3 values: [ 11, 22, 33, 44, 55, 66 ]

  • ControlFlow

series: 串行执行,一个函数数组中的每个函数,每一个函数执行完成之后才能执行下一个函数。

 1 var async = require(‘async‘);
 2
 3 var t = require(‘./t‘);
 4 var log = t.log;
 5
 6 /**
 7  * 如果任何一个函数向它的回调函数中传了一个error,则后面的函数都不会被执行,并且会立刻将该error以及已经执行了的函数的结果,传给series中最后那个callback。
 8  * 当所有的函数执行完后(没有出错),则会把每个函数传给其回调函数的结果合并为一个数组,传给series最后的那个callback。
 9  * 还可以json的形式来提供tasks。每一个属性都会被当作函数来执行,并且结果也会以json形式传给series最后的那个callback。这种方式可读性更高一些。
10  */
11 // series(tasks, [callback])
12
13 /**
14  * 全部函数都正常执行。每个函数产生的值将按顺序合并为一个数组,传给最终的callback。
15  */
16 // 1.1
17 async.series([
18     function(cb) { t.inc(3, cb); },
19     function(cb) { t.inc(8, cb); },
20     function(cb) { t.inc(2, cb); }
21 ], function(err, results) {
22     log(‘1.1 err: ‘, err);
23     log(‘1.1 results: ‘, results);
24 });
25 //05.155> 1.1 err: null
26 //05.156> 1.1 results: [ 4, 9, 3 ]
27
28 /**
29  * 中间有函数出错。出错之后的函数不会执行,错误及之前正常执行的函数结果将传给最终的callback。
30  */
31 // 1.2
32 async.series([
33     function(cb) { t.inc(3, cb); },
34     function(cb) { t.err(‘test_err‘, cb); },
35     function(cb) { t.inc(8, cb); }
36 ], function (err, results) {
37     log(‘1.2 err: ‘, err);
38     log(‘1.2 results: ‘, results);
39 });
40 //04.964> 1.2 err: test_err
41 //04.973> 1.2 results: [ 4, undefined ]
42
43 /**
44  * 如果某个函数传的数据是undefined, null, {}, []等,它们会原样传给最终callback。
45  */
46 // 1.3
47 async.series([
48     function(cb) { t.fire(3, cb);},
49     function(cb) { t.fire(undefined, cb); },
50     function(cb) { t.fire(null, cb); },
51     function(cb) { t.fire({}, cb); },
52     function(cb) { t.fire([], cb); },
53     function(cb) { t.fire(‘abc‘, cb) }
54 ], function(err, results) {
55     log(‘1.3 err: ‘, err);
56     log(‘1.3 results: ‘, results);
57 });
58 //05.794> 1.3 err: null
59 //05.795> 1.3 results: [ 3, undefined, null, {}, [], ‘abc‘ ]
60
61 /**
62  * 以json形式传入tasks。其结果也将以json形式传给最终callback。
63  */
64 async.series({
65     a: function(cb) { t.inc(3, cb); },
66     b: function(cb) { t.fire(undefined, cb); },
67     c: function (cb) { t.err(‘myerr‘, cb); },
68     d: function (cb) { t.inc(8, cb); }
69 }, function (err, results) {
70     log(‘1.4 err: ‘, err);
71     log(‘1.4 results: ‘, results);
72 });
73 //05.178> 1.4 err: myerr
74 //05.179> 1.4 results: { a: 4, b: undefined, c: undefined }

parallel: 并行执行多个函数,每个函数都是立即执行,不需要等待其它函数先执行。传给最终callback的数组中的数据按照tasks中声明的顺序,而不是执行完成的顺序。

 1 var async = require(‘async‘);
 2 var t = require(‘./t‘);
 3 var log = t.log;
 4
 5 /**
 6  * 如果某个函数出错,则立刻将err和已经执行完的函数的结果值传给parallel最终的callback。其它未执行完的函数的值不会传到最终数据,但要占个位置。
 7  * 同时支持json形式的tasks,其最终callback的结果也为json形式。
 8  */
 9 // parallel(tasks, [callback])
10
11 /**
12  * 并行执行多个函数,每个函数的值将按函数声明的先后顺序汇成一个数组,传给最终callback。
13  */
14 // 1.1
15 async.parallel([
16     function(cb) { t.fire(‘a400‘, cb, 400) },
17     function(cb) { t.fire(‘a200‘, cb, 200) },
18     function(cb) { t.fire(‘a300‘, cb, 300) }
19 ], function (err, results) {
20     log(‘1.1 err: ‘, err);
21     log(‘1.1 results: ‘, results);
22 });
23 //36.929> 1.1 err: null
24 //36.930> 1.1 results: [ ‘a400‘, ‘a200‘, ‘a300‘ ]
25
26 /**
27 * 如果中途有个函数出错,则将该err和已经完成的函数值汇成一个数组,传给最终的callback。还没有执行完的函数的值将被忽略,但要在最终数组中占个位置
28 */
29 // 1.2
30 async.parallel([
31     function(cb) { log(‘1.2.1: ‘, ‘start‘); t.fire(‘a400‘, cb, 400) }, // 该函数的值不会传给最终callback,但要占个位置
32     function(cb) { log(‘1.2.2: ‘, ‘start‘); t.err(‘e200‘, cb, 200) },
33     function(cb) { log(‘1.2.3: ‘, ‘start‘); t.fire(‘a100‘, cb, 100) }
34 ], function(err, results) {
35     log(‘1.2 err: ‘, err);
36     log(‘1.2 results: ‘, results);
37 });
38 //36.537> 1.2.1: start
39 //36.540> 1.2.2: start
40 //36.541> 1.2.3: start
41 //36.741> 1.2 err: e200
42 //36.744> 1.2 results: [ , undefined, ‘a100‘ ]
43
44 /**
45 * 以json形式传入tasks,最终results也为json
46 */
47 // 1.3
48 async.parallel({
49     a: function(cb) { t.fire(‘a400‘, cb, 400) },
50     b: function(cb) { t.fire(‘c300‘, cb, 300) }
51 }, function(err, results) {
52     log(‘1.3 err: ‘, err);
53     log(‘1.3 results: ‘, results);
54 });
55 //36.943> 1.3 err: null
56 //36.944> 1.3 results: { b: ‘c300‘, a: ‘a400‘ }
57
58 /**
59 * 如果中途出错,会将err与已经完成的函数值(汇成一个json)传给最终callback。未执行完成的函数值被忽略,不会出现在最终json中。
60 */
61 // 1.4
62 async.parallel({
63     a: function(cb) { t.fire(‘a400‘, cb, 400) }, // 该函数的值不会传给最终的callback
64     b: function(cb) { t.err(‘e300‘, cb, 300) },
65     c: function(cb) { t.fire(‘c200‘, cb, 200) }
66 }, function(err, results) {
67     log(‘1.4 err: ‘, err);
68     log(‘1.4 results: ‘, results);
69 });
70 //36.853> 1.4 err: e300
71 //36.854> 1.4 results: { c: ‘c200‘, b: undefined }
72
73 /**
74  * 并行执行,同时最多2个函数并行,传给最终callback。
75  */
76 //1.5
77 async.parallelLimit({
78     a:function(cb) { log(‘a start‘); t.fire(‘a400‘, cb, 200) },
79     b:function(cb) { log(‘b start‘); t.fire(‘b200‘, cb, 200) },
80     c:function(cb) { log(‘c start‘); t.fire(‘c100‘, cb, 100) },
81     d:function(cb) { log(‘d start‘); t.fire(‘d600‘, cb, 600) },
82     e:function(cb) { log(‘e start‘); t.fire(‘e300‘, cb, 300) }
83 },2, function(err, results) {
84     log(‘1.5 err: ‘, err);
85     log(‘1.5 results: ‘, results);
86 });
87 //26.993> a start
88 //26.996> b start
89 //27.200> c start
90 //27.202> d start
91 //27.313> e start
92 //27.809> 1.5 err:
93 //27.810> 1.5 results: { a: ‘a400‘, b: ‘b200‘, c: ‘c100‘, e: ‘e300‘, d: ‘d600‘ }

whilst: 相当于while,但其中的异步调用将在完成后才会进行下一次循环。
doWhilst: 相当于do…while, doWhilst交换了fn,test的参数位置,先执行一次循环,再做test判断。
until: until与whilst正好相反,当test为false时循环,与true时跳出。其它特性一致。
doUntil: doUntil与doWhilst正好相反,当test为false时循环,与true时跳出。其它特性一致。
forever: 无论条件循环执行,如果不出错,callback永远不被执行。

  1 var async = require(‘async‘);
  2 var t = require(‘./t‘);
  3 var log = t.log;
  4
  5 /**
  6  * 该函数的功能比较简单,条件变量通常定义在外面,可供每个函数访问。在循环中,异步调用时产生的值实际上被丢弃了,因为最后那个callback只能传入错误信息。
  7  *
  8  * 另外,第二个函数fn需要能接受一个函数cb,这个cb最终必须被执行,用于表示出错或正常结束。
  9  */
 10 // whilst(test, fn, callback)
 11
 12 /**
 13 * 正常情况,没有出错。第二个函数虽然是异步调用,但被同步执行。所以第三个函数被调用时,已经过了3秒。
 14 */
 15 // 1.1
 16 var count1 = 0;
 17 async.whilst(
 18     function() { return count1 < 3 },
 19     function(cb) {
 20         log(‘1.1 count: ‘, count1);
 21         count1++;
 22         setTimeout(cb, 1000);
 23     },
 24     function(err) {
 25         // 3s have passed
 26         log(‘1.1 err: ‘, err);
 27     }
 28 );
 29 //10.318> 1.1 count: 0
 30 //11.330> 1.1 count: 1
 31 //12.342> 1.1 count: 2
 32 //13.356> 1.1 err:
 33
 34
 35 /**
 36 * 中途出错。出错后立刻调用第三个函数。
 37 */
 38 // 1.2
 39 var count2 = 0;
 40 async.whilst(
 41     function() { return count2 < 3 },
 42     function(cb) {
 43         log(‘1.2 count: ‘, count2);
 44         if(count2===1) {
 45             t.err(‘myerr‘, cb, 200);
 46         } else {
 47             count2++;
 48             setTimeout(cb, 1000);
 49         }
 50     },
 51     function(err) {
 52         // 2s have passed
 53         log(‘1.2 err: ‘, err);
 54     }
 55 );
 56 //12.805> 1.2 count: 0
 57 //13.824> 1.2 count: 1
 58 //14.026> 1.2 err: myerr
 59
 60 /**
 61 * 第二个函数即使产生值,也会被忽略。第三个函数只能得到err。
 62 */
 63 // 1.3
 64 var count3 = 0;
 65 async.whilst(
 66     function() {return count3 < 3 },
 67     function(cb) {
 68         log(‘1.3 count: ‘, count3);
 69         t.inc(count3++, cb);
 70     },
 71     function(err,result){ // result没有用
 72         log(‘1.3 err: ‘, err);
 73         log(‘1.3 result: ‘, result);
 74     }
 75 );
 76 //45.311> 1.3 count: 0
 77 //45.514> 1.3 count: 1
 78 //45.718> 1.3 count: 2
 79 //45.920> 1.3 err:
 80 //45.923> 1.3 result:
 81
 82 /**
 83 *  doWhilst交换了fn,test的参数位置,先执行一次循环,再做test判断。 和javascript中do..while语法一致。
 84 */
 85 // doWhilst(fn, test, callback)
 86 //1.4
 87 var count4 = 0;
 88 async.doWhilst(
 89     function(cb) {
 90         log(‘1.4 count: ‘, count4);
 91         t.inc(count4++, cb);
 92     },
 93     function() { log("1.4 test"); return count4 < 3 },
 94     function(err,result){ // result没有用
 95         log(‘1.4 err: ‘, err);
 96         log(‘1.4 result: ‘, result);
 97     }
 98 );
 99 //33.643> 1.4 count: 0
100 //33.848> 1.4 test
101 //33.850> 1.4 count: 1
102 //34.054> 1.4 test
103 //34.057> 1.4 count: 2
104 //34.269> 1.4 test
105 //34.270> 1.4 err:
106 //34.270> 1.4 result:
107
108 /**
109 * until与whilst正好相反,当test为false时循环,与true时跳出。其它特性一致。
110 */
111 // 1.5
112 var count5 = 0;
113 async.until(
114     function() { return count5>3 },
115     function(cb) {
116         log(‘1.5 count: ‘, count5);
117         count5++;
118         setTimeout(cb, 200);
119     },
120     function(err) {
121         // 4s have passed
122         log(‘1.5 err: ‘,err);
123     }
124 );
125 //42.498> 1.5 count: 0
126 //42.701> 1.5 count: 1
127 //42.905> 1.5 count: 2
128 //43.107> 1.5 count: 3
129 //43.313> 1.5 err:
130
131 /**
132 * doUntil与doWhilst正好相反,当test为false时循环,与true时跳出。其它特性一致。
133 */
134 // doUntil(fn, test, callback)
135 // 1.6
136 var count6 = 0;
137 async.doUntil(
138     function(cb) {
139         log(‘1.6 count: ‘, count6);
140         count6++;
141         setTimeout(cb, 200);
142     },
143     function() { log(‘1.6 test‘);return count6>3 },
144     function(err) {
145         // 4s have passed
146         log(‘1.6 err: ‘,err);
147     }
148 );
149 //41.831> 1.6 count: 0
150 //42.035> 1.6 test
151 //42.037> 1.6 count: 1
152 //42.241> 1.6 test
153 //42.244> 1.6 count: 2
154 //42.456> 1.6 test
155 //42.457> 1.6 count: 3
156 //42.660> 1.6 test
157 //42.661> 1.6 err:
158
159 /**
160  * forever,无论条件循环执行,如果不出错,callback永远不被执行
161  */
162 //forever(fn, callback)
163 //1.7
164 var count7 = 0;
165 async.forever(
166     function(cb) {
167         log(‘1.7 count: ‘, count7);
168         count7++;
169         setTimeout(cb, 200);
170     },
171     function(err) {
172         log(‘1.7 err: ‘,err);
173     }
174 );
175 //52.770> 1.7 count: 0
176 //52.973> 1.7 count: 1
177 //53.175> 1.7 count: 2
178 //53.377> 1.7 count: 3
179 //53.583> 1.7 count: 4
180 //53.785> 1.7 count: 5
181 //53.987> 1.7 count: 6
182 //54.189> 1.7 count: 7
183 //54.391> 1.7 count: 8
184 //54.593> 1.7 count: 9
185 //54.795> 1.7 count: 10
186 //54.997> 1.7 count: 11
187 //55.199> 1.7 count: 12

waterfall: 按顺序依次执行一组函数。每个函数产生的值,都将传给下一个。

 1 var async = require(‘async‘);
 2 var t = require(‘./t‘);
 3 var log = t.log;
 4
 5 /**
 6  *  这个函数名为waterfall(瀑布),可以想像瀑布从上到下,中途冲过一层层突起的石头。
 7  *
 8  * 注意,该函数不支持json格式的tasks
 9  */
10 // async.waterfall(tasks, [callback]);
11
12 /**
13  * 所有函数正常执行,每个函数的结果都将变为下一个函数的参数。
14  *
15  * 注意,所有的callback都必须形如callback(err, result),但err参数在前面各函数中无需声明,它被自动处理。
16  */
17 // 1.1
18 async.waterfall([
19     function(cb) { log(‘1.1.1: ‘, ‘start‘); cb(null, 3); },
20     function(n, cb) { log(‘1.1.2: ‘,n); t.inc(n, cb); },
21     function(n, cb) { log(‘1.1.3: ‘,n); t.fire(n*n, cb); }
22 ], function (err, result) {
23     log(‘1.1 err: ‘, err);
24     log(‘1.1 result: ‘, result);
25 });
26 //31.749> 1.1.1: start
27 //31.752> 1.1.2: 3
28 //31.953> 1.1.3: 4
29 //32.156> 1.1 err: null
30 //32.159> 1.1 result: 16
31
32 /**
33 * 中途有函数出错,其err直接传给最终callback,结果被丢弃,后面的函数不再执行。
34 */
35 // 1.2
36 async.waterfall([
37     function(cb) { log(‘1.2.1: ‘, ‘start‘); cb(null, 3); },
38     function(n, cb) { log(‘1.2.2: ‘, n); t.inc(n, cb); },
39     function(n, cb) { log(‘1.2.3: ‘, n); t.err(‘myerr‘, cb); },
40     function(n, cb) { log(‘1.2.4: ‘, n); t.fire(n, cb); }
41 ], function (err, result) {
42     log(‘1.2 err: ‘, err);
43     log(‘1.2 result: ‘, result);
44 });
45 //44.935> 1.2.1: start
46 //44.939> 1.2.2: 3
47 //45.140> 1.2.3: 4
48 //45.344> 1.2 err: myerr
49 //45.348> 1.2 result:
50
51 /**
52 * 注意: 以json形式传入tasks,将不会被执行!!
53 */
54 async.waterfall({
55     a: function(cb) { log(‘1.3.1: ‘, ‘start‘); cb(null, 3); },
56     b: function(n, cb) { log(‘1.3.2: ‘, n); t.inc(n, cb); },
57     c: function(n, cb) { log(‘1.3.3: ‘, n); t.fire(n*n, cb); }
58 }, function (err, result) {
59     log(‘1.3 err: ‘, err);
60     log(‘1.3 result: ‘, result);
61 });
62 //49.222> 1.3 err: [Error: First argument to waterfall must be an array of functions]
63 //49.228> 1.3 result:

compose: 创建一个包括一组异步函数的函数集合,每个函数会消费上一次函数的返回值。把f(),g(),h()异步函数,组合成f(g(h()))的形式,通过callback得到返回值。

 1 var async = require(‘async‘);
 2 var t = require(‘./t‘);
 3 var log = t.log;
 4
 5 // compose(fn1, fn2...)
 6
 7 /**
 8  * 通过compose组合,f(g(h()))的形式,从内层到外层的执行的顺序。
 9  */
10 //1.1
11 function f(n,callback){
12     log(‘1.1.f enter: ‘,n);
13     setTimeout(function () {
14         callback(null, n + 1);
15     }, 10);
16 }
17 function g(n, callback) {
18     log(‘1.1.g enter: ‘,n);
19     setTimeout(function () {
20         callback(null, n * 2);
21     }, 10);
22 }
23 function h(n, callback) {
24     log(‘1.1.h enter: ‘,n);
25     setTimeout(function () {
26         callback(null, n - 10);
27     }, 10);
28 }
29 var fgh = async.compose(f,g,h);
30 fgh(4,function(err,result){
31     log(‘1.1 err: ‘, err);
32     log(‘1.1 result: ‘, result);
33 });
34 //05.307> 1.1.h enter: 4
35 //05.329> 1.1.g enter: -6
36 //05.341> 1.1.f enter: -12
37 //05.361> 1.1 err: null
38 //05.362> 1.1 result: -11

applyEach: 实现给一数组中每个函数传相同参数,通过callback返回。如果只传第一个参数,将返回一个函数对象,我可以传参调用。

 1 var async = require(‘async‘);
 2 var t = require(‘./t‘);
 3 var log = t.log;
 4
 5 // applyEach(fns, args..., callback)
 6
 7 /**
 8  * 异步执行,给数组中的函数,他们有相同的参数。
 9  */
10 //1.1
11 async.applyEach([
12     function (name,cb) {
13         setTimeout(function () {
14             log("1.1 handler: " + name + " A");
15             cb(null, name);
16         }, 500);
17     }, function (name,cb) {
18         setTimeout(function () {
19             log("1.1 handler: " + name + " B");
20             cb(null, name);
21         }, 150);
22     }
23 ], ‘Hello‘, function (err) {
24     log(‘1.1 err: ‘, err);
25 });
26 //06.739> 1.1 handler: Hello B
27 //07.079> 1.1 handler: Hello A
28 //07.080> 1.1 err: null
29
30 /**
31  *  异步执行,当只设置第一参数后,得到函数对象,再传参调用这个函数。
32  */
33 //1.2
34 var fn = async.applyEach([
35     function (name,cb) {
36         setTimeout(function () {
37             log("1.2 handler: " + name + " A");
38         }, 500);
39     }, function (name,cb) {
40         setTimeout(function () {
41             log("1.2 handler: " + name + " B");
42         }, 150);
43     }
44 ]);
45 fn("simgle",function(err){
46     log(‘err: ‘,err);
47 });
48 //29.351> 1.2 handler: simgle B
49 //29.688> 1.2 handler: simgle A
50
51 /**
52  *   applyEachSeries与applyEach唯一不同的是,数组的函数同步执行。
53  */
54 //applyEachSeries(arr, args..., callback)
55 //1.3
56 async.applyEachSeries([
57     function (name,cb) {
58         setTimeout(function () {
59             log("1.3 handler: " + name + " A");
60             cb(null, name);
61         }, 500);
62     }, function (name,cb) {
63         setTimeout(function () {
64             log("1.3 handler: " + name + " B");
65             cb(null, name);
66         }, 150);
67     }
68 ], "aaa", function (err) {
69     log(‘1.3 err: ‘, err);
70 });
71 //10.669> 1.3 handler: aaa A
72 //10.831> 1.3 handler: aaa B
73 //10.834> 1.3 err: null

queue: 是一个串行的消息队列,通过限制了worker数量,不再一次性全部执行。当worker数量不够用时,新加入的任务将会排队等候,直到有新的worker可用。

  1 var async = require(‘async‘);
  2 var t = require(‘./t‘);
  3 var log = t.log;
  4
  5 /*
  6  * 该函数有多个点可供回调,如worker用完时、无等候任务时、全部执行完时等。
  7  */
  8 // queue(worker, concurrency)
  9
 10 /**
 11  * 定义一个queue,设worker数量为2
 12  */
 13 var q = async.queue(function(task, callback) {
 14     log(‘worker is processing task: ‘, task.name);
 15     task.run(callback);
 16 }, 2);
 17
 18 /**
 19  * 监听:如果某次push操作后,任务数将达到或超过worker数量时,将调用该函数
 20  */
 21 q.saturated = function() {
 22     log(‘all workers to be used‘);
 23 }
 24
 25 /**
 26  * 监听:当最后一个任务交给worker时,将调用该函数
 27  */
 28 q.empty = function() {
 29     log(‘no more tasks wating‘);
 30 }
 31
 32 /**
 33  * 监听:当所有任务都执行完以后,将调用该函数
 34  */
 35 q.drain = function() {
 36     log(‘all tasks have been processed‘);
 37 }
 38
 39 /**
 40 * 独立加入2个任务
 41 */
 42 q.push({name:‘t1‘, run: function(cb){
 43     log(‘t1 is running, waiting tasks: ‘, q.length());
 44     t.fire(‘t1‘, cb, 400); // 400ms后执行
 45 }}, function(err) {
 46     log(‘t1 executed‘);
 47 });
 48 log(‘pushed t1, waiting tasks: ‘, q.length());
 49
 50 q.push({name:‘t2‘,run: function(cb){
 51     log(‘t2 is running, waiting tasks: ‘, q.length());
 52     t.fire(‘t2‘, cb, 200); // 200ms后执行
 53 }}, function(err) {
 54     log(‘t2 executed‘);
 55 });
 56 log(‘pushed t2, waiting tasks: ‘, q.length());
 57 //54.448> pushed t1, waiting tasks: 1
 58 //54.451> all workers to be used
 59 //54.452> pushed t2, waiting tasks: 2
 60 //54.452> worker is processing task: t1
 61 //54.453> t1 is running, waiting tasks: 1
 62 //54.455> no more tasks wating
 63 //54.455> worker is processing task: t2
 64 //54.455> t2 is running, waiting tasks: 0
 65 //54.656> t2 executed
 66 //54.867> t1 executed
 67 //54.868> all tasks have been processed
 68
 69
 70 // 同时加入多个任务
 71 q.push([
 72     {
 73         name:‘t3‘, run: function(cb){
 74             log(‘t3 is running, waiting tasks: ‘, q.length());
 75             t.fire(‘t3‘, cb, 300); // 300ms后执行
 76         }
 77     },{
 78         name:‘t4‘, run: function(cb){
 79             log(‘t4 is running, waiting tasks: ‘, q.length());
 80             t.fire(‘t4‘, cb, 500); // 500ms后执行
 81         }
 82     },{
 83         name:‘t5‘, run: function(cb){
 84             log(‘t5 is running, waiting tasks: ‘, q.length());
 85             t.fire(‘t5‘, cb, 100); // 100ms后执行
 86         }
 87     },{
 88         name:‘t6‘, run: function(cb){
 89             log(‘t6 is running, waiting tasks: ‘, q.length());
 90             t.fire(‘t6‘, cb, 400); // 400ms后执行
 91         }
 92     }
 93 ], function(err) {
 94     log(‘err: ‘,err);
 95 });
 96 log(‘pushed t3,t4,t5,t6 into queue, waiting tasks: ‘, q.length());
 97 //53.755> all workers to be used
 98 //53.758> pushed t3,t4,t5,t6 into queue, waiting tasks: 4
 99 //53.759> worker is processing task: t3
100 //53.760> t3 is running, waiting tasks: 3
101 //53.762> worker is processing task: t4
102 //53.762> t4 is running, waiting tasks: 2
103 //54.073> err: null
104 //54.074> worker is processing task: t5
105 //54.076> t5 is running, waiting tasks: 1
106 //54.183> err: null
107 //54.184> no more tasks wating
108 //54.185> worker is processing task: t6
109 //54.186> t6 is running, waiting tasks: 0
110 //54.265> err: null
111 //54.588> err: null
112 //54.589> all tasks have been processed

cargo: 一个串行的消息队列,类似于queue,通过限制了worker数量,不再一次性全部执行。不同之处在于,cargo每次会加载满额的任务做为任务单元,只有任务单元中全部执行完成后,才会加载新的任务单元。

 1 var async = require(‘async‘);
 2 var t = require(‘./t‘);
 3 var log = t.log;
 4
 5 // cargo(worker, [payload])
 6
 7 /**
 8  * 创建cargo实例
 9  */
10 var cargo = async.cargo(function (tasks, callback) {
11     for(var i=0; i<tasks.length; i++){
12         log(‘start ‘ + tasks[i].name);
13     }
14     callback();
15 }, 2);
16
17
18 /**
19  * 监听:如果某次push操作后,任务数将达到或超过worker数量时,将调用该函数
20  */
21 cargo.saturated = function() {
22     log(‘all workers to be used‘);
23 }
24
25 /**
26  * 监听:当最后一个任务交给worker时,将调用该函数
27  */
28 cargo.empty = function() {
29     log(‘no more tasks wating‘);
30 }
31
32 /**
33  * 监听:当所有任务都执行完以后,将调用该函数
34  */
35 cargo.drain = function() {
36     log(‘all tasks have been processed‘);
37 }
38
39 /**
40  * 增加新任务
41  */
42 cargo.push({name: ‘A‘}, function (err) {
43     t.wait(300);
44     log(‘finished processing A‘);
45 });
46 cargo.push({name: ‘B‘}, function (err) {
47     t.wait(600);
48     log(‘finished processing B‘);
49 });
50 cargo.push({name: ‘C‘}, function (err) {
51     t.wait(500);
52     log(‘finished processing C‘);
53 });
54 cargo.push({name: ‘D‘}, function (err) {
55     t.wait(100);
56     log(‘finished processing D‘);
57 });
58 cargo.push({name: ‘E‘}, function (err) {
59     t.wait(200);
60     log(‘finished processing E‘);
61 });
62 //40.016> all workers to be used
63 //40.020> no more tasks wating
64 //40.020> start A
65 //40.020> start B
66 //40.322> finished processing A
67 //40.923> finished processing B
68 //40.923> no more tasks wating
69 //40.924> start C
70 //40.924> start D
71 //41.425> finished processing C
72 //41.526> finished processing D
73 //41.526> no more tasks wating
74 //41.527> start E
75 //41.728> finished processing E
76 //41.728> all tasks have been processed
77 //41.729> all tasks have been processed
78 //41.729> all tasks have been processed
79 //41.729> all tasks have been processed
80 //41.730> all tasks have been processed

auto: 用来处理有依赖关系的多个任务的执行。

  1 var async = require(‘async‘);
  2 var t = require(‘./t‘);
  3 var log = t.log;
  4
  5 /**
  6  * 比如某些任务之间彼此独立,可以并行执行;但某些任务依赖于其它某些任务,只能等那些任务完成后才能执行。
  7  * 虽然我们可以使用parallel和series结合起来实现该功能,但如果任务之间关系复杂,则代码会相当复杂,以后如果想添加一个新任务,也会很麻烦。
  8  * 这时使用auto,则会事半功倍。
  9  *
 10  * 如果有任务中途出错,则会把该错误传给最终callback,所有任务(包括已经执行完的)产生的数据将被忽略。
 11  * 如果不关心错误和最终数据,可以不用写最后那个callback。
 12  */
 13 // async.auto(tasks, [callback])
 14
 15 /**
 16  * 我要写一个程序,它要完成以下几件事:
 17  * 1. 从某处取得数据
 18  * 2. 在硬盘上建立一个新的目录
 19  * 3. 将数据写入到目录下某文件
 20  * 4. 发送邮件,将文件以附件形式发送给其它人。
 21  *
 22  * 分析该任务,可以知道1与2可以并行执行,3需要等1和2完成,4要等3完成。
 23  * 可以按以下方式来使用auto函数。
 24  */
 25 // 1.1
 26 async.auto({
 27     getData: function (callback) {
 28         setTimeout(function(){
 29             console.log(‘1.1: got data‘);
 30              callback(null, ‘mydata‘);
 31         }, 300);
 32     },
 33     makeFolder: function (callback) {
 34         setTimeout(function(){
 35             console.log(‘1.1: made folder‘);
 36             callback(null, ‘myfolder‘);
 37         }, 200);
 38     },
 39     writeFile: [‘getData‘, ‘makeFolder‘, function(callback) {
 40         setTimeout(function(){
 41             console.log(‘1.1: wrote file‘);
 42             callback(null, ‘myfile‘);
 43         }, 300);
 44     }],
 45     emailFiles: [‘writeFile‘, function(callback, results) {
 46         log(‘1.1: emailed file: ‘, results.writeFile);
 47         callback(null, results.writeFile);
 48     }]
 49 }, function(err, results) {
 50     log(‘1.1: err: ‘, err);
 51     log(‘1.1: results: ‘, results);
 52 });
 53 //1.1: made folder
 54 //1.1: got data
 55 //1.1: wrote file
 56 //20.120> 1.1: emailed file: myfile
 57 //20.125> 1.1: err: null
 58 //20.127> 1.1: results: { makeFolder: ‘myfolder‘,
 59 //    getData: ‘mydata‘,
 60 //    writeFile: ‘myfile‘,
 61 //    emailFiles: ‘myfile‘ }
 62
 63
 64
 65 /**
 66 * 如果中途出错,则会把错误交给最终callback,执行完任务的传给最终callback。未执行完成的函数值被忽略
 67 */
 68 // 1.2
 69 async.auto({
 70     getData: function (callback) {
 71         setTimeout(function(){
 72             console.log(‘1.2: got data‘);
 73             callback(null, ‘mydata‘);
 74         }, 300);
 75     },
 76     makeFolder: function (callback) {
 77         setTimeout(function(){
 78             console.log(‘1.2: made folder‘);
 79             callback(null, ‘myfolder‘);
 80         }, 200);
 81     },
 82     writeFile: [‘getData‘, ‘makeFolder‘, function(callback, results) {
 83         setTimeout(function(){
 84             console.log(‘1.2: wrote file‘);
 85             callback(‘myerr‘);
 86         }, 300);
 87     }],
 88     emailFiles: [‘writeFile‘, function(callback, results) {
 89         console.log(‘1.2: emailed file: ‘ + results.writeFile);
 90         callback(‘err sending email‘, results.writeFile);
 91     }]
 92 }, function(err, results) {
 93     log(‘1.2 err: ‘, err);
 94     log(‘1.2 results: ‘, results);
 95 });
 96 //1.2: made folder
 97 //1.2: got data
 98 //1.2: wrote file
 99 //51.399> 1.2 err: myerr
100 //51.401> 1.2 results: { makeFolder: ‘myfolder‘,
101 //    getData: ‘mydata‘,
102 //    writeFile: undefined }

iterator: 将一组函数包装成为一个iterator,初次调用此iterator时,会执行定义中的第一个函数并返回第二个函数以供调用。

 1 var async = require(‘async‘);
 2 var t = require(‘./t‘);
 3 var log = t.log;
 4
 5 /**
 6  * 也可通过手动调用 next() 得到以下一个函数为起点的新的iterator。
 7  * 该函数通常由async在内部使用,但如果需要时,也可在我们的代码中使用它。
 8  */
 9 // async.iterator(tasks)
10
11 var iter = async.iterator([
12     function () {log(‘I am 111‘)},
13     function () {log(‘I am 222‘)},
14     function () {log(‘I am 333‘)}
15 ]);
16
17 /**
18 * 直接调用(),会执行当前函数,并返回一个由下个函数为起点的新的iterator
19 */
20 //1.1
21 log(‘1.1 iter()‘);
22 var it1 = iter();
23 it1();
24 it1();
25 //28.368> 1.1 iter()
26 //28.371> I am 111
27 //28.372> I am 222
28 //28.372> I am 222
29
30 /**
31 * 通过iter()来调用下一个函数
32 */
33 log(‘1.2 iter()‘);
34 var it2 = iter();
35 var it3 = it2();
36 var it4 = it3();
37 //it4(); // 这句代码执行会报错
38 log(it4); // => ‘null‘
39 //32.449> 1.2 iter()
40 //32.452> I am 111
41 //32.452> I am 222
42 //32.453> I am 333
43 //32.454> null
44
45 /**
46  * 调用next(),不会执行当前函数,直接返回由下个函数为起点的新iterator
47  * 对于同一个iterator,多次调用next(),不会影响自己
48  */
49 //1.3
50 log(‘1.3 iter()‘);
51 var it5 = iter.next();
52 it5();
53 var it6 = iter.next().next();
54 it6();
55 iter();
56 //39.895> 1.3 iter()
57 //39.898> I am 222
58 //39.899> I am 333
59 //39.899> I am 111

apply: 可以让我们给一个函数预绑定多个参数并生成一个可直接调用的新函数,简化代码。

 1 var async = require(‘async‘);
 2
 3 var t = require(‘./t‘);
 4 var log = t.log;
 5
 6 /**
 7  * function(callback) { t.inc(3, callback); }
 8  * 等价于:
 9  * async.apply(t.inc, 3);
10  */
11 // apply(function, arguments..)
12
13 /**
14  * 通过名字绑定函数t.inc, t.fire,作为新函数给parallel调用
15  */
16 //1.1
17 async.parallel([
18     async.apply(t.inc, 3),
19     async.apply(t.fire, 100)
20 ], function (err, results) {
21     log(‘1.1 err: ‘, err);
22     log(‘1.1 results: ‘, results);
23 });
24 //58.605> 1.1 err: null
25 //58.613> 1.1 results: [ 4, 100 ]
26
27 /**
28  * 构造一个加法函数,通过apply简化代码
29  */
30 //1.2
31 function inc(a,b,callback,timeout){
32     var timeout = timeout || 200;
33     t.wait(200);
34     setTimeout(function() {
35         callback(null, a+b);
36     }, timeout);
37 }
38 var fn = async.apply(inc, 1, 2);
39 fn(function(err, n){
40     log(‘1.2 inc: ‘ + n);
41 });
42 //58.616> 1.2 inc: 3

nextTick: 与nodejs的nextTick一样,再最后调用函数。

 1 var async = require(‘async‘);
 2 var t = require(‘./t‘);
 3 var log = t.log;
 4
 5 /**
 6  * 但在浏览器端,只能使用setTimeout(callback,0),但这个方法有时候会让其它高优先级的任务插到前面去。
 7  * 所以提供了这个nextTick,让同样的代码在服务器端和浏览器端表现一致。
 8  */
 9 // nextTick(callback)
10
11 var calls = [];
12 async.nextTick(function() {
13     calls.push(‘two‘);
14 });
15 async.nextTick(function() {
16     log(‘1.1‘,calls);
17 });
18 calls.push(‘one‘);
19 log(‘1.2‘,calls);
20 async.nextTick(function() {
21     log(‘1.3‘,calls);
22 });
23 //09.838> 1.2[ ‘one‘ ]
24 //09.842> 1.1[ ‘one‘, ‘two‘ ]
25 //09.843> 1.3[ ‘one‘, ‘two‘ ]

times: 异步运行,times可以指定调用几次,并把结果合并到数组中返回
timesSeries: 与time类似,唯一不同的是同步执行

 1 var async = require(‘async‘);
 2 var t = require(‘./t‘);
 3 var log = t.log;
 4
 5 // times(n, callback)
 6
 7 function delay(n){return (n+12) % 7 *100;}
 8 var createUser = function(id, callback) {
 9     callback(null, {
10         id: ‘user‘ + id,
11         delay:delay(id)
12     })
13 }
14
15 /**
16  * 异步执行,调用3次createUser函数,结果被合并到数组返回
17  */
18 //1.1
19 async.times(3, function(n, callback){
20     log("1.1 enter: "+ n);
21     setTimeout(function(){
22         log(‘1.1 handler: ‘,n);
23         createUser(n, function(err, user) {
24             callback(err, user)
25         })
26     },delay(n));
27 }, function(err, users) {
28     log(‘1.1 err: ‘,err);
29     log(‘1.1 result: ‘,users);
30 });
31 //07.397> 1.1 enter: 0
32 //07.400> 1.1 enter: 1
33 //07.401> 1.1 enter: 2
34 //07.412> 1.1 handler: 2
35 //07.912> 1.1 handler: 0
36 //08.009> 1.1 handler: 1
37 //08.010> 1.1 err: null
38 //08.011> 1.1 result: [ { id: ‘user0‘, delay: 500 },
39 //    { id: ‘user1‘, delay: 600 },
40 //    { id: ‘user2‘, delay: 0 } ]
41
42 /**
43 *  timesSeries与time唯一不同的是,同步执行
44 */
45 //timesSeries(n, callback)
46
47 /**
48  * 同步执行,调用3次createUser函数,结果被合并到数组返回
49  */
50 //1.2
51 async.timesSeries(3, function(n, callback){
52     log("1.2 enter: "+ n);
53     setTimeout(function(){
54         log(‘1.2 handler: ‘,n);
55         createUser(n, function(err, user) {
56             callback(err, user)
57         })
58     },delay(n));
59 }, function(err, users) {
60     log(‘1.2 err: ‘,err);
61     log(‘1.2 result: ‘,users);
62 });
63 //16.642> 1.2 enter: 0
64 //17.159> 1.2 handler: 0
65 //17.162> 1.2 enter: 1
66 //17.763> 1.2 handler: 1
67 //17.767> 1.2 enter: 2
68 //17.778> 1.2 handler: 2
69 //17.779> 1.2 err: null
70 //17.780> 1.2 result: [ { id: ‘user0‘, delay: 500 },
71 //    { id: ‘user1‘, delay: 600 },
72 //    { id: ‘user2‘, delay: 0 } ]

  • Utils

memoize: 让某一个函数在内存中缓存它的计算结果。对于相同的参数,只计算一次,下次就直接拿到之前算好的结果。
unmemoize: 让已经被缓存的函数,返回不缓存的函数引用。
log: 执行某异步函数,并记录它的返回值,日志输出。
dir: 与log类似,不同之处在于,会调用浏览器的console.dir()函数,显示为DOM视图。
fire: 直接将obj的内容返回给async
err: 模拟一个错误的产生,让async各个函数末尾的callback接收到。
wait: 刻意等待mils的时间,mils的单位是毫秒。

时间: 2024-10-26 14:01:02

Nodejs - 框架类库 - Nodejs异步流程控制Async的相关文章

Nodejs中使用异步流程控制Async

首先,我们都知道,Node基于事件驱动的异步I/O架构,所谓异步就是非阻塞,说白了就是一个事件执行了,我不必等待它执行完成后我才能执行下一个事件.所以在Node环境中的模块基本都是异步的,上一篇说到我在项目中改用了easymysql模块代替mysql模块,两个模块作查询的操作都是异步的,所以要实现嵌套查询往往会很麻烦,而且很大可能会报错.为此,为了实现查询同步,我引进了异步流程控制async模块,让js异步操作变成同步操作,这样一方面方便阅读理解,另一方面能够很好实现需求的目标,亲测有效~ up

异步流程控制-7行代码学会co模块

首先请原谅我的标题党(●-●),tj 大神的 co 模块源码200多行,显然不是我等屌丝能随便几行代码就能重写的.只是当今大家都喜欢<7天学会xx语言>之类的速效仙丹,于是我也弄个类似的名字<7行代码学会co模块>来博眼球. 为了避免被拖出去弹小JJ,还是先放出所谓的 7 行代码给大家压压惊: function co(gen) { var it = gen(); var ret = it.next(); ret.value.then(function(res) { it.next(

js 异步流程控制之 avQ(avril.queue)

废话前言 写了多年的js,遇到过最蛋疼的事情莫过于callback hell, 相信大家也感同身受. 业界许多大大也为此提出了很多不错的解决方案,我所了解的主要有: 朴灵 event proxy, 简单明了容易上手 老赵的 wind.js, 写起来最舒坦,最能表达程序顺序执行逻辑 Promise,个人感觉为解决一个坑引入另外一个坑,写出来的代码一大坨,代码可读性最差 我这人闲着没事也爱折腾,我也自己造轮子,不为别的只为自己代码写的舒服流畅. 传送门:目前只支持 node.js 环境,以后有时间再

Node.js异步流程控制

原文地址:Node.js异步流程控制 原文地址:https://www.cnblogs.com/edward852/p/8580917.html

NodeJS异步流程控制简单介绍

转自:http://www.jianshu.com/p/cc90f44bdf89 有这样一个需求,用户注册的时候,判断用户名和邮箱是否已经被占用. 用户注册 传统的实现思路 根据用户名查找记录,如果存在记录,证明用户名已被占用 根据邮箱查找记录,如果存在记录,证明又想已被占用 但是在nodejs中,大家都知道,各种的回调.简单的查询数据库都是异步的.你可能会这么写: User.findOne({username: user.username}, function (err, doc) { if(

异步流程控制之Async模块

一.Async模块介绍 Async是一个使用比较广泛的JavaScript异步流程控制模块,除了可以在Node.js上运行,还可以在浏览器端运行. Async模块提供了约20多个实用的函数来帮助我们理清在实用Node.js过程中各种复杂的回调. 二.Async函数介绍 Async的内容分为三部分: 流程控制(Control Flow):简化十种常见流程的处理 集合处理(Collections):如何使用异步操作处理集合中的数据 工具类(Utils):几个常用的工具类 1). 集合: Collec

阿里2018前端测评题(Promise异步流程控制)

用Promise控制异步流程,三个异步任务,时间可能有先后,但是要按照想要的顺序输出. 我这里用四种方法解决,其实也就是考察你对Promise的理解,基础题了. //实现mergePromise函数,把传进去的数组顺序先后执行, //并且把返回的数据先后放到数组data中 const timeout = ms => new Promise((resolve, reject) => { setTimeout(() => { resolve(); }, ms); }); const ajax

async异步流程控制

http://cnodejs.org/topic/54acfbb5ce87bace2444cbfb 先安装:G:\www\nodejs\one\models>npm install async --save-dev 1.串行无关联:async.series(tasks,callback);多个函数依次执行,之间没有数据交换,其中一个函数出错,后续函数不再执行//匿名函数前必须有键名(one:,two:)async.series({ one: function(callback){ callbac

珠峰培训node正式课笔记 -- 【async】任务流程控制,异步流程控制

var async = require('async'); // series 串形任务 console.time('cost') async.series({ two:function(callback){ setTimeout(function(){ console.log('串形任务two'); //第一个参数 错误原因,当为真时,接收函数err参数接收到原因,并定为报错,停止执行后边的任务 callback(null,'串形任务two 执行完毕'); },1000) }, one:fun