[关闭]
@FunC 2018-01-02T16:31:51.000000Z 字数 7833 阅读 2043

Node.js Design Patterns | CH03~04

使用回调函数的异步流程控制模式

Node.js

CH03 使用callback

本章概要/总结
1. 通过一个爬虫应用介绍了如何用原生 JS 解决回调地狱的问题
2. 顺序执行、顺序遍历模式
3. 并发执行、限制并发数的模式
4. 介绍 async 库

异步编程的难点

在编写异步代码时,闭包和内联匿名函数确保了流畅的编码体验,符合KISS原则。
然而,这牺牲了部分的模块性、可复用性和可维护性。随着嵌套的回调函数越来越多,代码将越发容易失去控制。
很多时候我们创建的闭包是多余的,所以实际上这更多的是一个编码规范的问题。能否意识到代码即将变得笨重,是新手和专家的主要区别。

回调地狱

形如以下格式的代码被称为回调地狱

  1. asyncFoo( err => {
  2. asyncBar( err => {
  3. asyncFooBar( err => {
  4. // …
  5. });
  6. });
  7. });

回调地狱会导致一系列的问题:
1. 代码可读性差
2. 代码难以组织
3. 很难追踪一个函数在哪里结束,下一个函数从何处开始
4. 变量名容易被覆盖

使用原生 JavaScript 编写异步代码

回调函数使用准则

  1. 尽快结束。使用return, continue, break来立即结束当前声明。
  2. 使用具名函数。让其远离闭包,并将中间结果作为其参数。另外,使用具名函数还有助于栈追踪。
  3. 尽可能模块化代码。尽可能地将代码分割成更小,可复用的函数。

实践

  1. // before
  2. if (err) {
  3. callback(err);
  4. } else {
  5. // code to execute when there are no errors
  6. }
  7. // after
  8. if (err) {
  9. return callback(err);
  10. }
  11. // code to execute when there are no errors

注意不用在意此处return的值,真正的结果是异步产生并传递给回调函数的。当然,为了语义更加清晰,可以进一步写成:

  1. callback(…);
  2. return;

顺序执行(Sequential execution)

顺序执行意味着每次只执行一个任务,一个接着下一个。其执行顺序必须确保,因为任务执行的结果可能会影响到下一个任务。
使用异步CPS模式书写顺序执行代码时,很容易会导致回调地狱。

顺序执行一系列已知任务

我们可以将代码组织成如下格式:

  1. function task1(callback) {
  2. asyncOperation(() => {
  3. task2(callback);
  4. });
  5. }
  6. function task2(callback) {
  7. asyncOperation(result => {
  8. task3(callback);
  9. });
  10. }
  11. function task3(callback) {
  12. asyncOperation(() => {
  13. callback(); // finally execute the callback pass from task1;
  14. });
  15. }
  16. task1(() => {
  17. // callback to be executed when task1, task2 and task3 are completed
  18. console.log('task1,2 and 3 executed!');
  19. });

这段代码的侧重点在于将任务模块化。同时为我们展示了在编写异步代码时不总是需要使用闭包。

顺序遍历

上述的模式适用于已知任务数量的情况,在不清楚任务数量的情况下,我们不能硬编码任务队列。
下面给出一种“顺序遍历模式”(sequential iterator pattern)来解决这种情况:

  1. function iterateSeries(collection, iteratorCallback, finalCallback) {
  2. function iterate(index) {
  3. if (index === collection.length) {
  4. return finalCallback();
  5. }
  6. const task = collection[index];
  7. // iteratorCallback must execute after task finish
  8. task(iteratorCallback(iterate, index)); // seems too complex
  9. }
  10. iterate(0);
  11. }
  12. // example of iteratorCallback
  13. function iteratorCallback(iterator, index) {
  14. iterator(index + 1);
  15. }

通过这种方式,我们可以使用generatorpromise模拟async/await

  1. function autorun(gen) {
  2. return new Promise((resolve, reject) => {
  3. var g = gen();
  4. function iterator(execute) {
  5. try {
  6. var yielded = execute();
  7. } catch (err) {
  8. return reject(err);
  9. }
  10. if (yielded.done) {
  11. return resolve(yielded.value);
  12. }
  13. return Promise.resolve(yielded.value)
  14. .then((result) => iterator(() => g.next(result)),
  15. (error) => iterator(() => g.throw(error)));
  16. }
  17. iterator(() => g.next())
  18. });
  19. }

并发执行

当对结果的返回顺序没有要求,只关心任务全部完成的时刻时,可以让其并发(concurrency)执行。根据第一章的知识,单线程的 Node.j 通过 Event Loop 和异步 API 来处理并发

可以想到,实现这种模式只需通过一个计数器,来计算何时全部任务执行完毕即可。

  1. const task = [/* ... */];
  2. let completed = 0; // counter
  3. tasks.forEach(task => {
  4. task(() => {
  5. if (++completed === tasks.length) {
  6. finish();
  7. }
  8. });
  9. });
  10. function finish() { /* execute when all the tasks completed */}

解决并发任务重的竞争情况

尽管 Node.js 时单线程,我们仍可能会遇到竞争的情况。问题源自于异步任务的触发完成之间有时间差
一个简单的例子是:根据 url 下载一个页面,当这个页面已经存在时就跳过不再下载。而检查页面是否已下载的方法是利用fs.readFile 尝试读取该文件。
细心的你可能已经发现,fs.readFile是一个异步 API ,当其执行时,main 函数可能又尝试启动了一次对同一个 url 的下载,此时上一次下载仍未完成,所以出现了重复下载。

解决的方法也很简答,只要在并发任务开始执行时同步的设置一个标记即可:

  1. const spidering = new Map(); // flag
  2. function spider(url, nesting, callback) {
  3. if (spidering.has(url)) {
  4. return process.nextTick(callback);
  5. }
  6. spidering.set(url, true); // set flag
  7. // ...
  8. }

有节制地并发执行

无节制地执行大量并发任务,例如向一个网站发送大量请求。将有可能耗尽对方服务器资源(也被称作DDoS攻击)。所以,我们需要对并发任务设置一个上限:

  1. const tasks = [ /* ... */ ];
  2. let concurrency = 2, running = 0, completed = 0, index = 0;
  3. function next() {
  4. while(running < concurrencu && completed < tasks.length) {
  5. task = tasks[index++];
  6. task(() => {
  7. if (completed === tasks.length) {
  8. finish();
  9. }
  10. completed++;
  11. running--;
  12. next();
  13. }
  14. }
  15. next();

全局性地限制并发数

使用上述方法,只适用于单个任务队列。如果每个任务还会递归地触发更多的任务队列,那么只能保证限制每个任务队列的并发数。于是,我们需要全局的限制并发数的手段。

使用队列

使用下面这种队列结构是其中一种解决方案:

  1. Class TaskQueue {
  2. constructor(concurrency) {
  3. this.concurrency = concurrency;
  4. this.running = 0;
  5. this.queue = [];
  6. }
  7. pushTask(task) {
  8. this.queue.push(task);
  9. this.next();
  10. }
  11. next() {
  12. while(this.running < this.concurrency && this.queue.length) {
  13. const task = this.queue.shift();
  14. task(() => {
  15. this.running—;
  16. this.next();
  17. });
  18. this.running++;
  19. }
  20. }
  21. };

带来的好处是:
1. 能够动态地添加任务到队列
2. 有一个集中控制并发数量的中心

async库

async库是处理异步 JavaScript 代码的优秀解决方案,主要原理与本文提及模式相似,并提供了大量不同功能的 API 任君选择。


CH04 使用ES2015及以后的语法

Promise

什么是Promise

Promise 是一个抽象的概念,它允许函数返回一个名为promise的对象,来代表异步操作的最终结果。在 promise 中,如果一个异步操作未完成,那么它处于pending状态;如果成功完成,则变为fulfilled状态;如果失败,则变为rejected状态。而fulfilledrejected统称为settled状态。
其中 promise 的一个重要的特性,就是then()方法总会返回 promise,所以我们就能够链式调用 promises 了。
此外,promise 最棒的地方在于:如果在onFulfilled()或者onRejected()中抛出异常(使用 throw 抛出),将会自动将异常reject,交给遇到的第一个onRejected()

此前有很多库都实现了自己的 promise,然而它们不通用。为了解决这个问题,社区提出了Promise/A+实现,规定了最基本的then()方法的行为。

将 Node.js 风格函数 Promise 化

将基于回调的函数转化成 promise,这一过程通常被称为promisification

  1. function promisify(callbackBaseApi) {
  2. return function promisified() {
  3. const len = arguments.length;
  4. let args = [];
  5. for(let i = 0; i < len; i++) {
  6. args[i] = arguments[i];
  7. }
  8. return new Promise((resolve, reject) => {
  9. args.push((err, ...result) => {
  10. if (err) {
  11. return reject(err);
  12. }
  13. if (result.length = 1) {
  14. return resolve(result[0]);
  15. }
  16. return resolve(result);
  17. }
  18. callbackBaseApi.call(null, args);
  19. });
  20. }
  21. };

顺序遍历

想使用 promise 进行任务数量不定的顺序遍历,关键在于要利用好 promise 的返回值总是 promise 这一特性:
通过形如tasks.forEach(task => promise = promise.then(() => task()))的遍历操作编可构造出链式 promise
考虑到每次循环都在使用上一轮的结果,我们可以通过reduce()函数来更好地表达这一模式:

  1. let tasks = [ /* ... */ ];
  2. let promise = tasks.reduce((prev, task) => {
  3. return prev.then(() => task());
  4. }, Promise.resolve()); // Promise.resolve() is the first "prev"
  5. promise.then(() => {
  6. // All tasks completed
  7. });

并发执行

使用 promise 处理并发异常便捷,只需使用 Promise.all()即可:
Promise.all()接受一个数组并返回一个 promise。当数组中的全部成员都变成fulfilled状态时,这个 promise 才会被 fulfilled,且值为数组中每一个 promise fulfilled 的值。

限制并发数

很不幸,ES2015 的 Promise API 没有提供原生的限制并发任务的方法。但我们仍可以通过之前的TaskQueue类来实现,只需要稍微修改一下next()函数:

  1. next() {
  2. while(this.running < this.concurrency && this.queue.length) {
  3. const task = this.queue.shift();
  4. // task should be a promise
  5. task().then(() => {
  6. this.running--;
  7. this.next();
  8. });
  9. this.running++;
  10. }
  11. }

公共 API 该使用 callback 还是 promise?

有两种做法:
1. 坚持使用 callback,并提供 promising 的方法
2. 以传统的 callback API 形式提供,当缺少 callback 参数时,返回 promise

Genrator

Generator, 通常也被称为”半协程”。它能生成允许多个入口的子程序(有别于普通函数只能在调用时触发)

利用 generator 进行异步流程控制

思考以下代码:

  1. function asyncFlow(generatorFunction) {
  2. function callback(err) {
  3. if (err) {
  4. return generator.throw(err);
  5. }
  6. var results = [].slice.call(arguments, 1);
  7. generator.next(results.length > 1 ? results : results[0]);
  8. }
  9. var generator = generatorFunction(callback)
  10. generator.next();
  11. }

这样我们就能用同步的写法使用 callback 风格的异步 API 了:

  1. asyncFlow(function* (callback) {
  2. const fileName = path.basename(__filename);
  3. const myself = yield fs.readFile(filename, 'utf-8', callback);
  4. yield fs.writeFile(`clone_of_${filename}`, 'utf-8', callback);
  5. });

此处的 callback 参数容易让人感觉迷惑,我们可以通过 thunk 化来省略 callback 参数。(所谓 thunk 化就是把函数变为只剩下 callback 一个参数的函数):

  1. function asyncFlowWithThunk(generatorFunction) {
  2. function callback(err) {
  3. if (err) {
  4. return generator.throw(err);
  5. }
  6. const results = [].slice.call(arguments,1);
  7. const thunk = generator.next(results.length > 1 ? results : results[0]).value;
  8. // callback was invoke inside the asyncFlowWithThunk function
  9. thunk && thunk(callback);
  10. }
  11. const generator = generatorFunction();
  12. const thunk = generator.next().value;
  13. thunk && thunk(callback);
  14. }

使用 co 库

co 库提供了一系列工具函数,来完成thunk, promisify等工作

使用生产-消费模式来限制并发数

  1. class TaskQueue {
  2. constructor(concurrency) {
  3. this.concurrency = concurrency;
  4. this.taskQueue = [];
  5. this.comsumeQueue = [];
  6. this.running = 0;
  7. this.spawnWorkers(concurrency);
  8. }
  9. pushTask(task) {
  10. // length !== 0 means some worker is idle
  11. if (this.comsumeQueue.length !== 0) {
  12. this.comsumeQueue.shift()(null, task);
  13. } else {
  14. this.taskQueue.push(task);
  15. }
  16. }
  17. spawnWorkers(concurrency) {
  18. const self = this;
  19. // spawn workers
  20. for (let i = 0; i < concurrency; i++) {
  21. co(function* () {
  22. while (true) {
  23. // get task and run
  24. const task = yield self.nextTask();
  25. yield task;
  26. }
  27. });
  28. }
  29. }
  30. nextTask() {
  31. return callback => {
  32. if (this.taskQueue.length !== 0) {
  33. return callback(null, this.taskQueue.shift());
  34. }
  35. // if no task in queue, push callback into comsumeQueue indicate that some workers are idle
  36. this.comsumeQueue.push(callback);
  37. }
  38. }
  39. }

通过 Babel 提前使用 async/await

尽管巧妙利用 generator 来书写异步代码很方便,但是毕竟 generator 还是主要用来遍历的。用来写异步代码有时容易让人疑惑,这时我们可以寄望于ES2017的 async/await:

  1. async function main() {
  2. const html = await getPageHtml("www.baidu.com");
  3. console.log(html);
  4. }

比较

以上介绍了使用ES2015语法来处理异步流程的多种方法,下面简要列出各种方法的优缺点,以供参考:
1. 原生JS
优点:不依赖库,兼容性好,通常情况下性能最好,允许使用更高级的算法。
缺点:需要书写更多代码,需要书写相对复杂的代码

  1. Async 库
    优点:简化了常用的异步流程控制函数,callback 风格,性能好
    缺点: 引入额外的依赖,不适用于部分复杂流程

  2. Promise
    优点: 大幅简化常见异步流程,错误处理机制健壮
    缺点: 对于 callback 风格的 API 需要进行 promisify,耗费部分性能

  3. Generator
    优点:可以以同步风格书写异步代码,简化错误处理
    缺点:需要配合相应的库使用,仍需要使用 callback 和 promise, 对于非generator的 API 需要进行 thunk 化或 promisify

  4. Async/await
    优点:同步书写异步代码,语法简洁清晰
    缺点:仍未被原生支持(Node 8已支持),需要配合Babel才能使用

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注