Python 异步编程实践

学一下 asyncio 的用法. 其实很早就想学学异步相关的接口, 但总是觉得那几个概念有点唬人, 什么 callback, event loop, coroutine... 看起来不太好惹的样子.

但其实没深入到底层实现, 不去看 CPython 源码, 例如 event loop 到底是怎么调度的, 指针是怎么戳来戳去的, 单纯从 API 上看, 感觉也不是那么难懂. 这就是高层次抽象的好呀.

函数 v.s. 协程

  • Subroutine vs Coroutine
    • routine: 顺序执行的代码片段
    • sub: 分出去
    • co: 共同执行, "协作"

对应两种编程方式

  • Synchronous vs Asynchronous
    • 同步编程和异步编程

传统的同步编程:

1
2
3
4
5
6
7
8
9
def f(a, b) -> int:
return a + b


def main():
print(f(2, 3))


main()
  • 调用 f(2, 3) 时, main 进入阻塞态
  • 会创建属于 f 的栈帧(stack frame)
    • 栈顶添加一个返回指针, 告诉解释器, 函数返回时, 应该从哪里继续执行
    • 函数执行完毕, 解释器释放 f 的栈帧, 并返回到 main 的栈帧
子程序

大部分编程语言的函数调用都采用这种方式, C 语言就是一个典型案例.

而协程采用的模型:
  • coroutine
    • 封装异步执行的代码段
  • task
    • 封装协程, 可以包含多个协程
    • 异步代码以 Task 的形式去运行
  • event loop
    • 维护多个 tasks(任务)
    • 监听事件
    • 像一个调度器, 负责 task 的切换
模式对比
  • 注意 event loop 它只能在一个时间执行一个任务
    • 不存在系统级别的上下文切换, 和线程不一样
    • 也就不存在 race condition 问题
  • 最后效果看起来像是并发的, 但其实只是更好地利用了代码中间的等待时间.

所以如果你要优化的代码里不存在等待这件事的话, 采用 asyncio 就没有意义了.

Python 里的 asyncio 模块提供了异步编程的支持:

  • def 定义函数, "子程序", Callable 对象
  • async def 定义的"协程", 也是一个 Callable 对象
    • 直接 call 一个协程函数?
    • python 返回了一个 coroutine object
  • 协程不会直接执行
    • 需要被等待 await
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import asyncio


async def add(a, b) -> int:
return a + b


async def main():
# <coroutine object add at 0x7f341e35f370>
# awaitable object
print(add(2, 3))

res = await add(2, 3)
# 5
print(res)


asyncio.run(main())
  • 注意异步代码必须在 async def 定义的协程函数体内编写
  • asyncio.run 作为整个异步函数的入口
  • awaitable object 包含 coroutine 和 task
    • 还有一个 Future 对象, 但通常情况下没有必要在应用层级的代码中创建 Future 对象

看到这里感觉两种编程方式没啥差别, 异步编程就多了一步要 await ?

  • 但注意: coroutine 对象中的异步代码是可以被暂停和恢复

对比下 time.sleep()asyncio.sleep():

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import asyncio
import time


async def call_api():
print('Hello')
time.sleep(3)
print('World')


async def main():
start = time.perf_counter()
await asyncio.gather(call_api(), call_api())
end = time.perf_counter()
print(f'It took {end - start} second(s) to complete.')

asyncio.run(main())
  • asyncio.gather: 聚合两个协程 -> 单个协程
    • 本例中, 即并发地执行两次 call_api

运行代码时, 你会发现在第一次打印 Hello 之后程序陷入了阻塞状态:

1
Hello

等待 3s 之后, 才会继续

1
2
3
4
5
Hello
World
Hello
World
It took 6.006541083002958 second(s) to complete.
  • 奇怪, 两次 call_api 都在同一时间开始执行, 但实际上是串行执行的?
  • 原因: time.sleep 是一个阻塞调用, 它会阻塞事件循环
    • 在执行过程, 程序转到了一个子程序里, 硬控了 3s
time.sleep()

修改下 call_api 里的内容:

1
2
3
4
async def call_api():
print('Hello')
await asyncio.sleep(3)
print('World')
1
2
3
4
5
Hello
Hello
World
World
It took 3.0044386359986675 second(s) to complete.
asyncio.sleep()
  • 第一个 call_api 调用 print
  • 第一个进入休眠状态, Event Loop 唤醒第二个 call_api

一个问题:

1
await asyncio.gather(call_api(), call_api())

是否等同于

1
2
await call_api()
await call_api()
  • 事实显示, 下面这种写法和用到同步函数的效果是一样的.
    • 为什么会出现这样的结果?
  • 原因: 没有将协程放到 Event Loop 中
    • 在第一个 call_api() 阻塞时, Event Loop 的任务队列中并不包含第二个 call_api()
    • 就自然不会切换到第二个 call_api() 中继续执行了.
  • 更好的理解: 这两个 call_api 协程放到了同一个 task 中, Event loop 调度不了

解决方案:

1
2
3
4
5
6
7
8
task_1 = asyncio.create_task(
call_api()
)
task_2 = asyncio.create_task(
call_api()
)
await task_1
await task_2
  • 使用 asyncio.create_task 作为一种显式地创建 Task 的方式
    • 之前提到, 异步模型中 Event Loop 负责调度 Task 的执行.
  • 上面 asyncio.gather 隐式地将 coroutine 转换成了 task, 同时也注册到 Event Loop 中.

再来看一个例子:

我们去掉上面的 await task, 看看会发生什么

1
2
3
4
5
6
7
task_1 = asyncio.create_task(
call_api()
)
task_2 = asyncio.create_task(
call_api()
)
await asyncio.sleep(1)
1
2
3
Hello
Hello
It took 1.0013072900001134 second(s) to complete.

关注几个点:

  • 创建了 task, 任务就会追加到 Event Loop 的任务队列中.
    • main -> task_1 -> task_2
    • asyncio.run(main()) 调用结束后, Event Loop 也就结束了.
    • 所以 task_1 和 task_2 并没有被执行完全.
  • await task 是在让 Event Loop 等待 Task 执行结束.
    • task_1 说: 等我执行完毕你再走, 但也别干等我, 你看看是不是还有别人要忙.
    • 本质上是等待 Task 的 done 方法返回 True

asyncio 经过多个版本的迭代, 演变出了高层级的 API, 在以前, 我们还需要手动管理 Event Loop 的运行与停止

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import asyncio

def hello_world(loop):
"""A callback to print 'Hello World' and stop the event loop"""
print('Hello World')
loop.stop()

loop = asyncio.new_event_loop()

# Schedule a call to hello_world()
loop.call_soon(hello_world, loop)

# Blocking call interrupted by loop.stop()
try:
loop.run_forever()
finally:
loop.close()

这个例子也能让我们观察到 Event Loop 的运行机制.

应用场景

异步编程很适合解决一些网络通讯上的问题

io bound tasks

  • 最常见的一个例子就是 js 里, 前端向后端发送请求 api 会用到的 fetch
  • 以及后端:
    • 数据库发起查询请求
    • 部署模型: AI 模型在后台运行
    • 评测平台: 等待评测机运行结果

后端会有所谓异步视图, 例如, 在 FastAPI 框架中用的很多; 在 Django 4.1 版本之后, 也支持了异步查询 ORM 的相关功能.

1
user = await User.objects.filter(username=my_input).afirst()
  • 在一个协程函数里, 当然不能从异步代码中调用堵塞的同步代码, 它会堵塞上面所述的 Event Loop.
  • 所以需要用到 Django 提供的 异步查询 API
    • 这里没有用 first, 而是 afirst, 返回的是一个 Coroutine 对象.

再举个例子, OJ 的等待运行结果:

1
2
3
# must wait for judge result
data = client.judge(code)
return JudgeStatus.PENDING

假设多个用户发起了评测请求 r1, r2, r3, 那显然, 后端要等待评测机评测 r1 之后才能处理 r2 的请求, 为了防止主线程阻塞, 我们就要有意识地让 judge 的行为后台化, 就可以用一些异步框架去处理(例如: celery, dramatiq, etc.):

1
2
3
4
5
6
7
8
@dramatiq.actor
def judge(code):
# ...

# view function
# ...
client.judge.send(code)
return JudgeStatus.PENDING

具体可以参考多评测机的任务队列实现.

习题

  1. 直接 await 一个 coroutine object 和 task object 有什么区别?
  2. 观察下面的异步代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import asyncio
import aiohttp
from typing import Coroutine


URL = "https://acm.hdu.edu.cn/showproblem.php"


async def fetch(session, url, params) -> Coroutine:
async with session.get(url, params=params) as response:
return await response.text()


async def main():
async with aiohttp.ClientSession() as session:
for pid in range(1000, 1030):
await fetch(session, URL, {'pid': pid})

if __name__ == '__main__':
asyncio.run(main())

这里的异步是否起到作用?

解答:

  1. 区别: 是否将这个 awaitable object 注册到 Event Loop 中. 只有 Task 才会被 Event loop 调度. 注意到一个事情: await coroutine 并不会使得这个对象变成 task.
  2. aiohttp 库的异步代码是没问题的, 问题是 main 里的 for 循环: 相当于 await 了 30 个 fetch 协程, 但实际上并没有并发地让这些协程并发执行, 还是上面提到的那个问题: 没注册到 Event Loop 中. 正确做法是下面这种:
1
2
3
4
5
async def main():
async with aiohttp.ClientSession() as session:
tasks = [fetch(session, URL, {'pid': pid})
for pid in range(1010, 1030)]
await asyncio.gather(*tasks)

可以前后对比一下, 正确的异步代码运行时间稳定在 1.25s 左右, 而上面的代码要多上不少.

同时还可以对比下使用 requests 库的代码:

1
2
for pid in range(1000, 1030):
response = requests.get(URL, params={'pid': pid})

代码运行基本上需要 6s 左右的时间, 差距还是很明显的.

参考资料

  1. Django 异步支持
  2. 编写 asyncio 代码
  3. Python 官方文档 asyncio 模块