Coroutine
Table of Contents
1. Coroutine
1.1. futures ARCHIVE
1.2. python generator
python 的 yield 是一个 generator.
def get_next(): i = 0; while (true): yield i i++ for i in get_next(): print(i) # 输出 012345...
实际上 get_next() 返回了一个 generator, 上面的代码类似于:
gen = get_next(); while a = gen.resume(): print(a)
generator yield 会把控制交回给调用者, 并通过一定方式保存了当前的上下文 (局部变量, 指令记数), 下一次 resume 时, 会恢复保存的上下文并直接从上次 yield 的地方开始执行
在实现上, 根据保存上下文的方式, 有两种方式来实现 generator:
stackful
每次 yield 时直接保存了当前的栈, resume 时恢复保存的栈. 这种方式比较简单直观, 但代价较大, 常用的手段有 setjump/longjump 和 makecontext/swapcontext
stackless
使用一个单独的对象来维护相关局部变量和状态, 例如:
class GetNext(object): def __init__(self): self.i = 0 def resume(): while (true): if self.yield_point == YIELD1: ... self.yield_point = YIELD2; return ... if self.yield_point == YIELD2: ... self.yield_point = ... return ... ...
stackless 的 generator 需要根据 generator 的代码生成相应的对象和代码, 但开销较小.
不考虑实现细节, generator 在用法上和 iterator 类似, 例如 Rust Iterator
Backlinks
Tensorflow Architecture: Parallism (Tensorflow Architecture: Parallism > Parallelism > data parallelism > generator): tensorflow dataset api 是一个 generator 模式的 api, 用来构造 data pipeline, 例 如:
1.3. coroutine
generator 通常只有被调用者是 generator, 需要有 resume 的能力. 对于 coroutine 来说, 任何一方都可以 resume, 例如:
__coroutine__ def ping(): yield_to pong __coroutine__ def pong(): yield_to ping
实现上, 可以通过 generator 加 dispatcher 的方式来实现 coroutine
def dispatcher(entry): while (true): entry = entry.resume(); def ping(): yield_to pong def pong(): yield_to ping dispatcher(ping)
1.4. rust async/await
与 generator 类似, async/await 也可以看作是 coroutine 的特殊形式.
// [dependencies] // futures = "*" // async-std = "*" use async_std; use futures; use std::time::Duration; async fn run_sleep(t: u64) { async_std::task::sleep(Duration::from_millis(t)).await; println!("{:?}", t); } async fn async_main() { futures::join!(run_sleep(1000u64), run_sleep(1000u64), run_sleep(1000u64)); } fn main() { futures::executor::block_on(async_main()); }
run_sleep 中的 await 并不会阻塞, 它会返回一个 future 对象给 async_main, 后者可以在多个 future 中 poll, 若某个 future ready, 则会通过 corouting 让 run_sleep 继续执行, 以达于多个 run_sleep 并发执行的效果: 三个 run_sleep 在 1s 后同时结束
1.5. coroutine/async/await 的用途
coroutine 的用途等同于如下的问题:
Q: 单核系统上使用多线程有什么用处?
A: 实现并发, 由 kernel 安排多个任务并发执行, 但并非并行执行.
coroutine 与单核上的多线程类似, 可以用来实现单线程的并发, 因为各个函数可以随意的转移控制并 resume.
单核多线程与 coroutine 有类似的对应关系:
单核多线程 | coroutine |
---|---|
thread | coroutine |
进程调度 | dispatcher |
thread 上下文的保存与恢复 | coroutine 上下文的保存与恢复 |
IO 中断 | epoll 轮询 |
1.5.1. 代替 callback
1.5.2. 和 epoll 结合实现高吞吐量的 IO 操作
def dispatcher(): while epoll(fdset): fd = getfd() mapping[fd].resume() async def process(): await connect await get_data async def connect(): mark(fd, connect) yield_to dispatcher async def get_data(): mark(fd, get_data) yield_to dispatcher process()
实际上 async/await 是用一种模拟同步执行的写法表示 callback