Python 异步编程实践
学一下 asyncio
的用法. 其实很早就想学学异步相关的接口,
但总是觉得那几个概念有点唬人, 什么 callback, event loop, coroutine...
看起来不太好惹的样子.
但其实没深入到底层实现, 不去看 CPython 源码, 例如 event loop 到底是怎么调度的, 指针是怎么戳来戳去的, 单纯从 API 上看, 感觉也不是那么难懂. 这就是高层次抽象的好呀.
函数 v.s. 协程
- Subroutine vs Coroutine
- routine: 顺序执行的代码片段
- sub: 分出去
- co: 共同执行, "协作"
对应两种编程方式
- Synchronous vs Asynchronous
- 同步编程和异步编程
传统的同步编程:
1 | def f(a, b) -> int: |
- 调用
f(2, 3)
时,main
进入阻塞态 - 会创建属于
f
的栈帧(stack frame)- 栈顶添加一个返回指针, 告诉解释器, 函数返回时, 应该从哪里继续执行
- 函数执行完毕, 解释器释放
f
的栈帧, 并返回到main
的栈帧
大部分编程语言的函数调用都采用这种方式, C 语言就是一个典型案例.
- coroutine
- 封装异步执行的代码段
- task
- 封装协程, 可以包含多个协程
- 异步代码以 Task 的形式去运行
- event loop
- 维护多个 tasks(任务)
- 监听事件
- 像一个调度器, 负责 task 的切换
- 注意 event loop 它只能在一个时间执行一个任务
- 不存在系统级别的上下文切换, 和线程不一样
- 也就不存在 race condition 问题
- 最后效果看起来像是并发的, 但其实只是更好地利用了代码中间的等待时间.
所以如果你要优化的代码里不存在等待这件事的话, 采用 asyncio 就没有意义了.
Python 里的 asyncio
模块提供了异步编程的支持:
def
定义函数, "子程序",Callable
对象async def
定义的"协程", 也是一个Callable
对象- 直接 call 一个协程函数?
- python 返回了一个 coroutine object
- 协程不会直接执行
- 需要被等待
await
- 需要被等待
1 | import asyncio |
- 注意异步代码必须在
async def
定义的协程函数体内编写 - 用
asyncio.run
作为整个异步函数的入口 - awaitable object 包含 coroutine 和 task
- 还有一个
Future
对象, 但通常情况下没有必要在应用层级的代码中创建 Future 对象
- 还有一个
看到这里感觉两种编程方式没啥差别, 异步编程就多了一步要
await
?
- 但注意: coroutine 对象中的异步代码是可以被暂停和恢复的
对比下 time.sleep()
和 asyncio.sleep()
:
1 | import asyncio |
asyncio.gather
: 聚合两个协程 -> 单个协程- 本例中, 即并发地执行两次
call_api
- 本例中, 即并发地执行两次
运行代码时, 你会发现在第一次打印 Hello 之后程序陷入了阻塞状态:
1 | Hello |
等待 3s 之后, 才会继续
1 | Hello |
- 奇怪, 两次
call_api
都在同一时间开始执行, 但实际上是串行执行的? - 原因:
time.sleep
是一个阻塞调用, 它会阻塞事件循环- 在执行过程, 程序转到了一个子程序里, 硬控了 3s
修改下 call_api
里的内容:
1 | async def call_api(): |
1 | Hello |
- 第一个
call_api
调用print
- 第一个进入休眠状态, Event Loop 唤醒第二个
call_api
一个问题:
1 | await asyncio.gather(call_api(), call_api()) |
是否等同于
1 | await call_api() |
- 事实显示, 下面这种写法和用到同步函数的效果是一样的.
- 为什么会出现这样的结果?
- 原因: 没有将协程放到 Event Loop 中
- 在第一个
call_api()
阻塞时, Event Loop 的任务队列中并不包含第二个call_api()
- 就自然不会切换到第二个
call_api()
中继续执行了.
- 在第一个
- 更好的理解: 这两个
call_api
协程放到了同一个 task 中, Event loop 调度不了
解决方案:
1 | task_1 = asyncio.create_task( |
- 使用
asyncio.create_task
作为一种显式地创建 Task 的方式- 之前提到, 异步模型中 Event Loop 负责调度 Task 的执行.
- 上面
asyncio.gather
隐式地将 coroutine 转换成了 task, 同时也注册到 Event Loop 中.
再来看一个例子:
我们去掉上面的 await task, 看看会发生什么
1 | task_1 = asyncio.create_task( |
1 | Hello |
关注几个点:
- 创建了 task, 任务就会追加到 Event Loop 的任务队列中.
- main -> task_1 -> task_2
asyncio.run(main())
调用结束后, Event Loop 也就结束了.- 所以 task_1 和 task_2 并没有被执行完全.
- 而
await task
是在让 Event Loop 等待 Task 执行结束.task_1
说: 等我执行完毕你再走, 但也别干等我, 你看看是不是还有别人要忙.- 本质上是等待 Task 的
done
方法返回 True
asyncio
经过多个版本的迭代, 演变出了高层级的 API,
在以前, 我们还需要手动管理 Event Loop 的运行与停止
1 | import asyncio |
这个例子也能让我们观察到 Event Loop 的运行机制.
应用场景
异步编程很适合解决一些网络通讯上的问题
io bound tasks
- 最常见的一个例子就是 js 里, 前端向后端发送请求 api 会用到的
fetch
- 以及后端:
- 数据库发起查询请求
- 部署模型: AI 模型在后台运行
- 评测平台: 等待评测机运行结果
后端会有所谓异步视图, 例如, 在 FastAPI 框架中用的很多; 在 Django 4.1 版本之后, 也支持了异步查询 ORM 的相关功能.
1 | user = await User.objects.filter(username=my_input).afirst() |
- 在一个协程函数里, 当然不能从异步代码中调用堵塞的同步代码, 它会堵塞上面所述的 Event Loop.
- 所以需要用到 Django 提供的 异步查询 API
- 这里没有用
first
, 而是afirst
, 返回的是一个 Coroutine 对象.
- 这里没有用
再举个例子, OJ 的等待运行结果:
1 | # must wait for judge result |
假设多个用户发起了评测请求 r1, r2, r3, 那显然, 后端要等待评测机评测
r1 之后才能处理 r2 的请求, 为了防止主线程阻塞, 我们就要有意识地让
judge
的行为后台化, 就可以用一些异步框架去处理(例如:
celery, dramatiq, etc.):
1 |
|
具体可以参考多评测机的任务队列实现.
习题
- 直接 await 一个 coroutine object 和 task object 有什么区别?
- 观察下面的异步代码:
1 | import asyncio |
这里的异步是否起到作用?
解答:
- 区别: 是否将这个 awaitable object 注册到 Event Loop 中. 只有 Task 才会被 Event loop 调度. 注意到一个事情: await coroutine 并不会使得这个对象变成 task.
- aiohttp 库的异步代码是没问题的, 问题是 main 里的 for 循环: 相当于 await 了 30 个 fetch 协程, 但实际上并没有并发地让这些协程并发执行, 还是上面提到的那个问题: 没注册到 Event Loop 中. 正确做法是下面这种:
1 | async def main(): |
可以前后对比一下, 正确的异步代码运行时间稳定在 1.25s 左右, 而上面的代码要多上不少.
同时还可以对比下使用 requests 库的代码:
1 | for pid in range(1000, 1030): |
代码运行基本上需要 6s 左右的时间, 差距还是很明显的.