本文章信息量较大,从 IO 多路复用,到生成器的使用,再到 async、await 背后的实现原理,深入浅出,剖析得非常透彻,非常硬核!
tornado
特有的 feature
,现在已经有 async
、await
关键字支持了。思考了一下其实现,回顾了下这些年的演变,觉得还有点意思。都是单线程,为什么原来低效率的代码用了 async
、await
加一些异步库就变得效率高了?
0x00 开始之前
generator
的概念。0x01 IO 多路复用
0x02
。# 处理请求
pass
# 你的 handler 运行在 while 循环中
while True:
# 获取一个新请求
request = accept()
# 根据路由映射获取到用户写的业务逻辑函数
handler = get_handler(request)
# 运行用户的handler,处理请求
handler(request)
handler
,在接收到请求后需要一个 API 调用才能响应结果。handler
的运行放到其他线程上,每个线程处理一个请求,本线程阻塞不影响新请求进入。这能一定程度上解决问题,但对于并发比较大的系统,过多线程调度会带来很大的性能开销。# 向操作系统IO注册自己关注的IO操作的id和类型
io_register(io_id, io_type)
io_register(io_id, io_type)
# 获取完成的IO操作
events = io_get_finished()
for (io_id, io_type) in events:
if io_type == READ:
data = read_data(io_id)
elif io_type == WRITE:
write_data(io_id,data)
def handler(req):
# do jobs here
io_register(io_id, io_type)
def call_back(result):
# 使用返回的result完成剩余工作...
call_backs[io_id] = call_back
# 新的循环
while True:
# 获取已经完成的io事件
events = io_get_finished()
for (io_id, io_type) in events:
if io_type == READ: # 读取
data = read(io_id)
call_back = call_backs[io_id]
call_back(data)
else:
# 其他类型io事件的处理
pass
# 获取一个新请求
request = accept()
# 根据路由映射获取到用户写的业务逻辑函数
handler = get_handler(request)
# 运行用户的handler,处理请求
handler(request)
handler
对于 IO 操作,注册了回调就立刻返回,同时每次迭代都会对已完成的 IO 执行回调,网络请求不再阻塞整个服务器。call_backs
字典拆分到单独模块,就能得到一个 EventLoop
,也就是 Python 标准库 asyncio
包中提供的 ioloop
。0x02 用生成器消除 callback
handler
函数,在有独立的 ioloop
后,它现在变成类似这样:# 业务逻辑代码...
# 需要执行一次 API 请求
def call_back(result):
# 使用 API 返回的result完成剩余工作
print(result)
# 没有io_call这个方法,这里只是示意,表示注册一个IO操作
asyncio.get_event_loop().io_call(api, call_back)
value = yield 2
print("get", value)
return value
g = example()
# 启动生成器,我们会得到 2
got = g.send(None)
print(got) # 2
try:
# 再次启动 会显示 "get 4", 就是我们传入的值
got = g.send(got*2)
except StopIteration as e:
# 生成器运行完成,将会print(4),e.value 是生成器return的值
print(e.value)
yield
关键字,调用函数将会得到一个生成器,生成器一个关键的方法 send()
可以跟生成器交互。g.send(None)
会运行生成器内代码直到遇到 yield
,并返回其后的对象,也就是 2
,生成器代码就停在这里了,直到我们再次执行 g.send(got*2)
,会把 2*2
也就是 4
赋值给yield
前面的变量 value
,然后继续运行生成器代码。yield
在这里就像一扇门,可以把一件东西从这里送出去,也可以把另一件东西拿进来。send
让生成器运行到下一个 yield
前就结束了,send
调用会引发一个特殊的异常StopIteration
,这个异常自带一个属性 value
,为生成器 return
的值。handler
用 yield
关键字转换成一个生成器,运行它来把 IO 操作的具体内容返回,IO 完成后的回调函数中把 IO 结果放回并恢复生成器运行,那就解决了业务代码不流畅的问题了:# 业务逻辑代码...
# 需要执行一次 API 请求,直接把 IO 请求信息yield出去
result = yield io_info
# 使用 API 返回的result完成剩余工作
print(result)
# 这个函数注册到ioloop中,用来当有新请求的时候回调
def on_request(request):
# 根据路由映射获取到用户写的业务逻辑函数
handler = get_handler(request)
g = handler(request)
# 首次启动获得io_info
io_info = g.send(None)
# io完成回调函数
def call_back(result):
# 重新启动生成器
g.send(result)
asyncio.get_event_loop().io_call(io_info, call_back)
handler
代码已经不会被打散到 callback
中,on_request
函数使用 callback
和 ioloop
交互,但它会被实现在 Web 框架中,对用户不可见。callback
的启发,但局限性有两点:-
业务逻辑中仅发起一次网络 IO,但实际中往往更多 -
业务逻辑没有调用其他异步函数(协程),但实际中我们往往会调用其他协程
0x03 解决完整调用链
request
执行真正的 IO,func1
、func2
仅调用。显然我们的代码只能写成这样:ret = yield request("http://test.com/foo")
ret = yield func2(ret)
return ret
def func2(data):
result = yield request("http://test.com/"+data)
return result
def request(url):
# 这里模拟返回一个io操作,包含io操作的所有信息,这里用字符串简化代替
result = yield "iojob of %s" % url
return result
request
,我们把 IO 操作通过 yield
暴露给框架。func1
和 func2
,调用 request
显然也要加 yield
关键字,否则 request
调用返回一个生成器后不会暂停,继续执行后续逻辑显然会出错。yield from
、aysnc
、await
时代,在 tornado
框架中写异步代码的样子。-
调用 func1()
得到生成器 -
调用 send(None)
启动它得到会得到request("http://test.com/foo")
的结果,还是生成器对象 -
send(None)
启动由request()
产生的生成器,会得到 IO 操作,由框架注册到ioloop
并指定回调 -
IO 完成后的回调函数内唤醒 request
生成器,生成器会走到return
语句结束 -
捕获异常得到 request
生成器的返回值,将上一层func1
唤醒,同时又得到func2()
生成器 -
继续执行...
send
就能不断得到所有 IO 操作信息并推动调用链前进,实现方法如下:-
第一个生成器入栈 -
调用 send
,如果得到生成器就入栈并进入下一轮迭代 -
遇到到 IO 请求 yield
出来,让框架注册到ioloop
-
IO 操作完成后被唤醒,缓存结果并出栈,进入下一轮迭代,目的让上层函数使用 IO 结果恢复运行 -
如果一个生成器运行完毕,也需要和4一样让上层函数恢复运行
send
,就能整个调用链中的 IO,完成这些 IO,继续推动调用链内的逻辑执行,直到整体逻辑结束:# 第一层调用 入栈
stack = Stack()
stack.push(gen)
# 开始逐层调用
while True:
# 获取栈顶元素
item = stack.peak()
result = None
# 生成器
if isgenerator(item):
try:
# 尝试获取下层调用并入栈
child = item.send(result)
stack.push(child)
# result 使用过后就还原为None
result = None
# 入栈后直接进入下次循环,继续向下探索
continue
except StopIteration as e:
# 如果自己运行结束了,就暂存result,下一步让自己出栈
result = e.value
else: # IO 操作
# 遇到了 IO 操作,yield 出去,IO 完成后会被用 IO 结果唤醒并暂存到 result
result = yield item
# 走到这里则本层已经执行完毕,出栈,下次迭代将是调用链上一层
stack.pop()
# 没有上一层的话,那整个调用链都执行完成了,return
if stack.empty():
print("finished")
return result
# 将会得到 "iojob of http://test.com/foo"
w.send(None)
# 上个iojob foo 完成后的结果"bar"传入,继续运行,得到 "iojob of http://test.com/bar"
w.send("bar")
# 上个iojob bar 完成后的结构"barz"传入,继续运行,结束。
w.send("barz")
ready = []
def on_request(request):
handler = get_handler(request)
# 使用 wrapper 包装后,可以只通过 send 处理 IO 了
g = wrapper(func1())
# 把开始状态直接视为结果为None的就绪状态
ready.append((g, None))
# 让ioloop每轮循环都执行此函数,用来处理的就绪的IO
def process_ready(self):
def call_back(g, result):
ready.append((g, result))
# 遍历所有已经就绪生成器,将其向下推进
for g, result in self.ready:
# 用result唤醒生成器,并得到下一个io操作
io_job = g.send(result)
# 注册io操作 完成后把生成器加入就绪列表,等待下一轮处理
asyncio.get_event_loop().io_call(
io_job, lambda result: ready.append((g, result)
ioloop
每轮迭代都来扫一遍,推动就绪的状态的生成器向下运行,并把新的 IO 操作注册,IO 完成后再次加入就绪,经过几轮 ioloop
的迭代一个 handler
最终会被执行完成。0x04 提高扩展性
sleep
几秒,用 time.sleep()
又会让整个线程阻塞住,就需要特殊实现。再比如,可以把一些 CPU 密集的操作通过多线程异步化,让另一个线程通知事件已经完成后再执行后续。ioloop
来注册 IO 事件还是开一个线程完全由你自己,并提供了一个标准「占位符」Future
,表示他的结果等到未来才会有,其部分原型如下:# 设置结果
def set_result(result): pass
# 获取结果
def result(): pass
# 表示这个future对象是不是已经被设置过结果了
def done(): pass
# 设置在他被设置结果时应该执行的回调函数,可以设置多个
def add_done_callback(callback): pass
Future
,让扩展性变得更强。对于用户代码的中的网络请求函数 request
:def request(url):
# future 理解为占位符
fut = Future()
def callback(result):
# 当网络IO完成回调的时候给占位符赋值
fut.set_result(result)
asyncio.get_event_loop().io_call(url, callback)
# 返回占位符
return future
request
不再是一个生成器,而是直接返回 future
。def callback(fut):
# future被设置结果会被放入就绪列表
ready.append((g, fut.result()))
# 遍历所有已经就绪生成器,将其向下推进
for g, result in self.ready:
# 用result唤醒生成器,得到的不再是io操作,而是future
fut = g.send(result)
# future被设置结果的时候会调用callback
fut.add_done_callback(callback)
0x05 发展和变革
tornado
的时候,大概只有一个 yield
关键字可用,协程要想实现,就是这么个思路,甚至 yield
关键字和 return
关键字不能一个函数里面出现,你要想在生成器运行完后返回一个值,需要手动 raise
一个异常,虽然效果跟现在 return
一样,但写起来还是很别扭,不优雅。yield from
表达式。它可以做什么?wrapper
所做的事:通过栈实现调用链遍历的 ,它是 wrapper
逻辑的语法糖。# 注意 yield from
ret = yield from request("http://test.com/foo")
# 注意 yield from
ret = yield from func2(ret)
return ret
def func2(data):
# 注意 yield from
result = yield from request("http://test.com/"+data)
return result
# 现在 request 函数,不是生成器,它返回future
def request(url):
# 同上基于future实现的request
wrapper
函数了:# 返回第一个请求的 future
g.send(None)
# 继续运行,自动进入func2 并得到第它里面的那个future
g.send("bar")
# 继续运行,完成调用链剩余逻辑,抛出StopIteration异常
g.send("barz")
yield from
直接打通了整个调用链,已经是很大的进步了,但是用来异步编程看着还是别扭,其他语言都有专门的协程 async
、await
关键字了,直到再后来的版本把这些内容用专用的 async
、await
关键字包装,才成为今天比较优雅的样子。0x06 总结和比较
-
基于 IO 多路复用技术,让整个应用在 IO 上非阻塞,实现高效率 -
通过生成器让分散的 callback
代码变成同步代码,减少业务编写困难
Future
类比 Promise
本质相同。gevent
算作一类,都是自己实现 runtime
,并 patch
掉系统调用接入自己的 runtime
,自己来调度协程,gevent
专注于网络相关,基于网络 IO 调度,比较简单,而 Go 实现了完善的多核支持,调度更加复杂和完善,而且创造了基于 channel
新编程范式。本文为原创文章,版权归知行编程网所有,欢迎分享本文,转载请保留出处!
你可能也喜欢
- ♥ python如何判断一个变量是否为none?11/03
- ♥ python如何编写多行字符串09/13
- ♥ 包是如何嵌套在 Python 中的10/04
- ♥ 什么是 Python 中的有序字典11/12
- ♥ 如何在python中使用wraps函数?12/16
- ♥ 如何正确使用Pandas库提升项目的运行速度?05/18
内容反馈