[关闭]
@noogel 2017-11-16T17:25:49.000000Z 字数 5478 阅读 384

Tornado 异步非阻塞浅析(一)


[以下代码基于 Tornado 3.2.1 版本讲解]
[主要目标:讲解 gen.coroutine、Future、Runner 之间的关系]

这里是示例运行代码

  1. #!/usr/bin/python
  2. # coding: utf-8
  3. """
  4. File: demo.py
  5. Author: zhangxu01 <zhangxu01@zhihu.com>
  6. Date: 2017-08-28 22:59
  7. Description: demo
  8. """
  9. import tornado
  10. from tornado import gen, web
  11. @gen.coroutine
  12. def service_method():
  13. raise gen.Return("abc")
  14. class NoBlockHandler(tornado.web.RequestHandler):
  15. @web.asynchronous
  16. @gen.coroutine
  17. def get(self):
  18. result = yield service_method()
  19. self.write(result)
  20. self.finish()
  21. class Application(tornado.web.Application):
  22. def __init__(self):
  23. settings = {
  24. "xsrf_cookies": False,
  25. }
  26. handlers = [
  27. (r"/api/noblock", NoBlockHandler),
  28. ]
  29. tornado.web.Application.__init__(self, handlers, **settings)
  30. if __name__ == "__main__":
  31. Application().listen(2345)
  32. tornado.ioloop.IOLoop.instance().start()

演示运行效果...

讲解从 coroutine 修饰器入手,这个函数实现了简单的异步,它通过 generator 中的 yield 语句使函数暂停执行,将中间结果临时保存,然后再通过 send() 函数将上一次的结果送入函数恢复函数执行。

  1. def coroutine(func):
  2. @functools.wraps(func)
  3. def wrapper(*args, **kwargs):
  4. future = TracebackFuture()
  5. if 'callback' in kwargs:
  6. print("gen.coroutine callback:{}".format(kwargs['callback']))
  7. callback = kwargs.pop('callback')
  8. IOLoop.current().add_future(
  9. future, lambda future: callback(future.result()))
  10. try:
  11. print("gen.coroutine run func:{}".format(func))
  12. result = func(*args, **kwargs)
  13. except (Return, StopIteration) as e:
  14. result = getattr(e, 'value', None)
  15. except Exception:
  16. future.set_exc_info(sys.exc_info())
  17. return future
  18. else:
  19. if isinstance(result, types.GeneratorType):
  20. def final_callback(value):
  21. deactivate()
  22. print("gen.coroutine final set_result:{}".format(value))
  23. future.set_result(value)
  24. print("gen.coroutine will Runner.run() result:{}".format(result))
  25. runner = Runner(result, final_callback)
  26. runner.run()
  27. return future
  28. print("@@ gen.coroutine will set_result and return:{}".format(result))
  29. future.set_result(result)
  30. return future
  31. return wrapper
Created with Raphaël 2.1.2create future objectrun functionis not exceptionis generatorRunner.run()return futureEndyesnoyesno

首先创建一个Future实例,然后执行被修饰的函数,一般函数返回的是一个生成器对象,接下来交由 Runner 处理,如果函数返回的是 Return, StopIteration 那么表示函数执行完成将结果放入 future 中并 set_done() 返回。

下面是Future的简版:

  1. class Future(object):
  2. def __init__(self):
  3. self._result = None
  4. self._callbacks = []
  5. def result(self, timeout=None):
  6. self._clear_tb_log()
  7. if self._result is not None:
  8. return self._result
  9. if self._exc_info is not None:
  10. raise_exc_info(self._exc_info)
  11. self._check_done()
  12. return self._result
  13. def add_done_callback(self, fn):
  14. if self._done:
  15. fn(self)
  16. else:
  17. self._callbacks.append(fn)
  18. def set_result(self, result):
  19. self._result = result
  20. self._set_done()
  21. def _set_done(self):
  22. self._done = True
  23. for cb in self._callbacks:
  24. try:
  25. cb(self)
  26. except Exception:
  27. app_log.exception('Exception in callback %r for %r', cb, self)
  28. self._callbacks = None

在tornado中大多数的异步操作返回一个Future对象,这里指的是 Runner 中处理的异步返回结果。我们可以将该对象抽象成一个占位对象,它包含很多属性和函数。一个 Future 对象一般对应这一个异步操作。当这个对象的异步操作完成后会通过 set_done() 函数去处理 _callbacks 中的回调函数,这个回调函数是在我们在做修饰定义的时候传入 coroutine 中的。

下面的代码是在 coroutine 中定义的,用来添加对异步操作完成后的回调处理。

  1. if 'callback' in kwargs:
  2. print("gen.coroutine callback:{}".format(kwargs['callback']))
  3. callback = kwargs.pop('callback')
  4. IOLoop.current().add_future(
  5. future, lambda future: callback(future.result()))

这里是 IOLoop 中的 add_future 函数,它是来给 future 对象添加回调函数的。

  1. def add_future(self, future, callback):
  2. assert isinstance(future, Future)
  3. callback = stack_context.wrap(callback)
  4. future.add_done_callback(
  5. lambda future: self.add_callback(callback, future))

然后说 Runner 都做了什么。在 3.2.1 版本中 Runner 的作用更重要一些。那么 Runner() 的作用是什么?
它主要用来控制生成器的执行与终止,将异步操作的结果 send() 至生成器暂停的地方恢复执行。在生成器嵌套的时候,当 A 中 yield B 的时候,先终止 A 的执行去执行 B,然后当 B 执行结束后将结果 send 至 A 终止的地方继续执行 A。

  1. class Runner(object):
  2. def __init__(self, gen, final_callback):
  3. self.gen = gen
  4. self.final_callback = final_callback
  5. self.yield_point = _null_yield_point
  6. self.results = {}
  7. self.running = False
  8. self.finished = False
  9. def is_ready(self, key):
  10. if key not in self.pending_callbacks:
  11. raise UnknownKeyError("key %r is not pending" % (key,))
  12. return key in self.results
  13. def set_result(self, key, result):
  14. self.results[key] = result
  15. self.run()
  16. def pop_result(self, key):
  17. self.pending_callbacks.remove(key)
  18. return self.results.pop(key)
  19. def run(self):
  20. try:
  21. self.running = True
  22. while True:
  23. next = self.yield_point.get_result()
  24. self.yield_point = None
  25. try:
  26. print("gen.Runner.run() will send(next)")
  27. yielded = self.gen.send(next)
  28. print("gen.Runner.run() send(next) done.")
  29. except (StopIteration, Return) as e:
  30. print("gen.Runner.run() send(next) throw StopIteration or Return done.")
  31. self.finished = True
  32. self.yield_point = _null_yield_point
  33. self.final_callback(getattr(e, 'value', None))
  34. self.final_callback = None
  35. return
  36. if isinstance(yielded, (list, dict)):
  37. yielded = Multi(yielded)
  38. elif isinstance(yielded, Future):
  39. yielded = YieldFuture(yielded)
  40. self.yield_point = yielded
  41. self.yield_point.start(self)
  42. finally:
  43. self.running = False
  44. def result_callback(self, key):
  45. def inner(*args, **kwargs):
  46. if kwargs or len(args) > 1:
  47. result = Arguments(args, kwargs)
  48. elif args:
  49. result = args[0]
  50. else:
  51. result = None
  52. self.set_result(key, result)
  53. return wrap(inner)

实例化 Runner() 的时候将生成器对象和生成器执行结束时的回调函数传入,然后通过 run() 函数去继续执行生成器对象。

run() 函数的处理首先包了一层 while 循环,因为在生成器对象中可能包含多个 yield 语句。

yielded = self.gen.send(next),在第一次 send() 恢复执行的时候默认传入 None ,因为函数第一次执行并没有结果。然后将第二次执行的结果 yielded (返回的是一个 Future 对象),包装成一个 YieldFuture 对象,然后通过 start() 函数处理:

  1. def start(self, runner):
  2. if not self.future.done():
  3. self.runner = runner
  4. self.key = object()
  5. self.io_loop.add_future(self.future, runner.result_callback(self.key))
  6. else:
  7. self.runner = None
  8. self.result = self.future.result()

首先判断 future 是否被 set_done(),如果没有则注册一系列回调函数,如果完成则保存结果,以供下一次恢复执行时将结果送入生成器。
在 Runner.run() 执行完成后此时的 coroutine 中的 future 对象已经是被 set_done 的,然后直接返回 future 对象,最后被 外层的 @web.asynchronous 修饰器消费。


http://www.cnblogs.com/MnCu8261/p/6560502.html
https://www.cnblogs.com/chenchao1990/p/5406245.html
http://blog.csdn.net/u010168160/article/details/53019039
https://www.cnblogs.com/yezuhui/p/6863781.html
http://blog.csdn.net/zhaohongyan6/article/details/70888221

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注