async源码学习 - waterfall函数的使用及原理实现

waterfall函数会连续执行数组中的函数,每次通过数组下一个函数的结果。然而,数组任务中的任意一个函数结果传递失败,那么该函数的下一个函数将不会执行,并且主回调函数立马把错误作为参数执行。

以上是我翻译的,我都不知道翻译的什么鬼。

其实该函数的作用就是: 上一个异步函数返回结果可以传给下一个异步函数,如果传递过程中,第一个参数出错了也就是真值的话,下一个回调函数将会停止调用,并且直接调用waterfall函数的第二个参数,其实也是个回调函数。并且把错误的参数传过去。

先看看官网的demo:
        async.waterfall([
            fn1,
            fn2,
            fn3
        ], function(err, arg1) {
            console.log(err);
            console.log(arg1);
        });

        function fn1(next) {
            next(null, ‘11‘);
        };

        function fn2(arg1, next) {
            console.log(arg1);
            next(null, ‘22‘, ‘33‘);
        };

        function fn3(arg1, arg2, next) {
            console.log(arg1);
            console.log(arg2);
            next(null, ‘done‘);
        };
        async.waterfall([function(aaa) {
            console.log(11);
            aaa(null, ‘one‘);
        }, function(arg1, bbb) {
            console.log(arg1);
            bbb(null, ‘two‘, ‘three‘);
        }, function(arg1, arg2, ccc) {
            console.log(arg1);
            console.log(arg2);
            ccc(null, ‘down‘, ‘down2‘);
        }], function(err, result, res2) {
            console.log(err);
            console.log(result);
            console.log(res2);
        });

自己搞个创建文件的实例,看看

class File {
            constructor() {}

            // 创建文件
            createFile(callback) {
                setTimeout(() => {
                    if (0) {
                        console.log(‘创建文件失败‘);
                        callback(‘err‘);
                    } else {
                        console.log(‘创建文件成功‘);
                        callback(null);
                    };
                }, 3000);
            }

            // 写文件
            writeFile(callback) {
                setTimeout(() => {
                    if (1) {
                        console.log(‘写文件失败‘);
                        callback(‘err‘);
                    } else {
                        console.log(‘写文件成功‘);
                        callback(null);
                    };
                }, 2000);
            }

            // 读文件
            readFile(callback) {
                setTimeout(() => {
                    if (0) {
                        console.log(‘读文件失败‘);
                        callback(‘err‘);
                    } else {
                        console.log(‘读文件成功‘);
                        callback(null, ‘I love async!‘);
                    };
                }, 4000);
            }
        };
        let file = new File();

        async.waterfall([function(callback) {
            file.createFile(function(err) {
                if (!err) {
                    callback(null, ‘createFile Ok‘);
                } else {
                    callback(‘createFileFail‘);
                };
            });
        }, function(err, callback) {
            file.writeFile(function(err) {
                if (!err) {
                    callback(null, ‘writeFile Ok‘);
                } else {
                    callback(‘writeFileFail‘);
                };
            });
        }, function(err, callback) {
            file.readFile(function(err) {
                if (!err) {
                    callback(null, ‘readFile Ok‘);
                } else {
                    callback(‘readFileFail‘);
                };
            });
        }], function(err, result) {
            console.log(err);
            console.log(result);
        });

我一直纳闷,他怎么做到,上一个异步什么时候做完后,通知下一个异步开始执行,并且把参数传给下一个异步函数的。看看源码实现:

/**
 * Created by Sorrow.X on 2017/5/28.
 */

var waterfall = (function() {

    var isArray = Array.isArray;    // 把数组的isArray赋给isArray变量

    // 是否支持Symbol
    var supportsSymbol = typeof Symbol === ‘function‘;

    var setImmediate$1 = wrap(_defer);

    function wrap(defer) {
        return function (fn/*, ...args*/) {
            var args = slice(arguments, 1);
            defer(function () {
                fn.apply(null, args);
            });
        };
    };

    // 是否是异步
    function isAsync(fn) {
        return supportsSymbol && fn[Symbol.toStringTag] === ‘AsyncFunction‘;
    };

    // 空函数
    function noop() {
        // No operation performed.
    };

    // 一次(偏函数)
    function once(fn) {    // fn: waterfall的第二个参数(回调函数)
        return function () {
            if (fn === null) return;
            var callFn = fn;
            fn = null;    // 把上级函数作用域中的fn置空
            callFn.apply(this, arguments);    // 调用回调函数
        };
    };

    // 包装成异步
    function wrapAsync(asyncFn) {
        return isAsync(asyncFn) ? asyncify(asyncFn) : asyncFn;
    };

    function asyncify(func) {
        return initialParams(function (args, callback) {
            var result;
            try {
                result = func.apply(this, args);
            } catch (e) {
                return callback(e);
            }
            // if result is Promise object
            if (isObject(result) && typeof result.then === ‘function‘) {
                result.then(function(value) {
                    invokeCallback(callback, null, value);
                }, function(err) {
                    invokeCallback(callback, err.message ? err : new Error(err));
                });
            } else {
                callback(null, result);
            }
        });
    };

    function isObject(value) {
        var type = typeof value;
        return value != null && (type == ‘object‘ || type == ‘function‘);
    };

    function invokeCallback(callback, error, value) {
        try {
            callback(error, value);
        } catch (e) {
            setImmediate$1(rethrow, e);
        }
    };

    // 重写数组中的slice方法
    function slice(arrayLike, start) {    // arrayLike: 类数组对象  start: 开始位置
        start = start|0;
        var newLen = Math.max(arrayLike.length - start, 0);    // 长度
        var newArr = Array(newLen);    // 创建一个长度为newLen的数组
        for(var idx = 0; idx < newLen; idx++)  {
            newArr[idx] = arrayLike[start + idx];
        };
        return newArr;    // 返回数组
    };

    // 执行一次
    function onlyOnce(fn) {
        return function() {
            if (fn === null) throw new Error("Callback was already called.");    // 回调已被调用
            var callFn = fn;
            fn = null;
            callFn.apply(this, arguments);    //调用callFn 参数就是用户回调函数中的参数
        };
    };

    var waterfall = function(tasks, callback) {    // tasks: 异步函数数组容器, callback: 回调
        callback = once(callback || noop);    // 回调函数
        if (!isArray(tasks)) return callback(new Error(‘First argument to waterfall must be an array of functions‘));    // 第一个参数必须是数组(函数数组)!
        if (!tasks.length) return callback();    // 空数组的话直接调用回调函数(无参数)
        var taskIndex = 0;    // 任务索引

        function nextTask(args) {    // 参数数组
            var task = wrapAsync(tasks[taskIndex++]);    // 数组中的任务
            args.push(onlyOnce(next));    // 把next方法添加到args数组中去
            task.apply(null, args);    // 调用数组中task函数(参数是数组)
        };

        function next(err/*, ...args*/) {    // 其实就是函数参数中的回调函数callback
            if (err || taskIndex === tasks.length) {    // 只要有错误或者函数数组任务都完成了
                return callback.apply(null, arguments);    // 就执行回调
            };
            nextTask(slice(arguments, 1));    // 数组中的函数没循环完且没出错,那就继续调用
        };

        nextTask([]);    // 调用
    };
}());
waterfall函数中有个next方法,其实我们写的回调就是next方法。

好吧,以上代码直接抽取async中的代码,可以直接使用。如果只想要这一个功能的话。
时间: 2025-01-01 04:06:09

async源码学习 - waterfall函数的使用及原理实现的相关文章

async源码学习 - 全部源码

因为工作需要,可能我离前端走远了,偏node方向了.所以异步编程的需求很多,于是乎,不得不带着学习async了. 我有个习惯,用别人的东西之前,喜欢稍微搞明白点,so就带着看看其源码. github: https://github.com/caolan/async 文档:http://caolan.github.io/async/ 里面提供的工具方法,控制流程方法还是很多的.所以需要哪些方法,就看相应的源码. 下面是其全部源码. (function (global, factory) { typ

ThinkPHP源码学习 cookie函数 设置 取值 删除

/** * Cookie 设置.获取.删除 * @param string $name cookie名称 * @param mixed $value cookie值 * @param mixed $option cookie参数 * @return mixed */ 系统内置了一个cookie函数用于支持和简化Cookie的相关操作,该函数可以完成Cookie的设置.获取.删除操作. Cookie设置 cookie('author','津沙港湾','3600'); 执行代码段 $expire =

ThinkPHP源码学习---is_ssl() 函数 判断是否SSL协议

/** * 判断是否SSL协议 * @return boolean */ function is_ssl() { if(isset($_SERVER['HTTPS']) && ('1' == $_SERVER['HTTPS'] ||     'on' == strtolower($_SERVER['HTTPS']))){         return true;     }elseif(isset($_SERVER['SERVER_PORT']) && ('443' == 

ThinkPHP源码学习 redirect函数 URL重定向

/**  * URL重定向  * @param string $url 重定向的URL地址  * @param integer $time 重定向的等待时间(秒)  * @param string $msg 重定向前的提示信息  * @return void  */  $url='http://www.baidu.com';  $time=3; function redirect($url, $time=0, $msg='') { //多行URL地址支持     $url        = st

ThinkPHP源码学习 to_guid_string函数 根据PHP各种类型变量生成唯一标识号

/** * 根据PHP各种类型变量生成唯一标识号 * @param mixed $mix 变量 * @return string */ function to_guid_string($mix) {     if (is_object($mix)) {         return spl_object_hash($mix); //spl_object_hash - 返回指定对象的hash id      } elseif (is_resource($mix)) { //is_resource 

ThinkPHP源码学习 data_to_xml函数 数据转成xml格式

/** * 数据XML编码 * @param mixed  $data 数据 * @param string $item 数字索引时的节点名称 * @param string $id   数字索引key转换为的属性名 * @return string */ function data_to_xml($data, $item='item', $id='id') {     $xml = $attr = '';     foreach ($data as $key => $val) {       

ThinkPHP源码学习 xml_encode函数 数据转成xml格式

/** * XML编码 * @param mixed $data 数据 * @param string $root 根节点名 * @param string $item 数字索引的子节点名 * @param string $attr 根节点属性 * @param string $id   数字索引子节点key转换的属性名 * @param string $encoding 数据编码 * @return string */ function xml_encode($data, $root='thi

jQuery源码学习笔记:扩展工具函数

// 扩展工具函数 jQuery.extend({ // http://www.w3school.com.cn/jquery/core_noconflict.asp // 释放$的 jQuery 控制权 // 许多 JavaScript 库使用 $ 作为函数或变量名,jQuery 也一样. // 在 jQuery 中,$ 仅仅是 jQuery 的别名,因此即使不使用 $ 也能保证所有功能性. // 假如我们需要使用 jQuery 之外的另一 JavaScript 库,我们可以通过调用 $.noC

Hadoop源码学习笔记(1) ——第二季开始——找到Main函数及读一读Configure类

Hadoop源码学习笔记(1) ——找到Main函数及读一读Configure类 前面在第一季中,我们简单地研究了下Hadoop是什么,怎么用.在这开源的大牛作品的诱惑下,接下来我们要研究一下它是如何实现的. 提前申明,本人是一直搞.net的,对java略为生疏,所以在学习该作品时,会时不时插入对java的学习,到时也会摆一些上来,包括一下设计模式之类的.欢迎高手指正. 整个学习过程,我们主要通过eclipse来学习,之前已经讲过如何在eclipse中搭建调试环境,这里就不多述了. 在之前源码初