Coroutine

Table of Contents

1. Coroutine

1.1. futures   ARCHIVE

1.2. python generator

python 的 yield 是一个 generator.

def get_next():
    i = 0;
    while (true):
        yield i
        i++

for i in get_next():
    print(i)
    # 输出 012345...

实际上 get_next() 返回了一个 generator, 上面的代码类似于:

gen = get_next();
while a = gen.resume():
    print(a)

generator yield 会把控制交回给调用者, 并通过一定方式保存了当前的上下文 (局部变量, 指令记数), 下一次 resume 时, 会恢复保存的上下文并直接从上次 yield 的地方开始执行

在实现上, 根据保存上下文的方式, 有两种方式来实现 generator:

  1. stackful

    每次 yield 时直接保存了当前的栈, resume 时恢复保存的栈. 这种方式比较简单直观, 但代价较大, 常用的手段有 setjump/longjump 和 makecontext/swapcontext

  2. stackless

    使用一个单独的对象来维护相关局部变量和状态, 例如:

    class GetNext(object):
        def __init__(self):
            self.i = 0
    
        def resume():
            while (true):
                if self.yield_point == YIELD1:
                    ...
                    self.yield_point = YIELD2;
                    return ...
                if self.yield_point == YIELD2:
                    ...
                    self.yield_point = ...
                    return ...
                ...
    

    stackless 的 generator 需要根据 generator 的代码生成相应的对象和代码, 但开销较小.

不考虑实现细节, generator 在用法上和 iterator 类似, 例如 Rust Iterator

Backlinks

Tensorflow Architecture: Parallism (Tensorflow Architecture: Parallism > Parallelism > data parallelism > generator): tensorflow dataset api 是一个 generator 模式的 api, 用来构造 data pipeline, 例 如:

1.3. coroutine

generator 通常只有被调用者是 generator, 需要有 resume 的能力. 对于 coroutine 来说, 任何一方都可以 resume, 例如:

__coroutine__ def ping():
    yield_to pong

__coroutine__ def pong():
    yield_to ping

实现上, 可以通过 generator 加 dispatcher 的方式来实现 coroutine

def dispatcher(entry):
    while (true):
      entry = entry.resume();

def ping():
    yield_to pong

def pong():
    yield_to ping

dispatcher(ping)

1.4. rust async/await

与 generator 类似, async/await 也可以看作是 coroutine 的特殊形式.

// [dependencies]
// futures = "*"
// async-std = "*"

use async_std;
use futures;
use std::time::Duration;

async fn run_sleep(t: u64) {
    async_std::task::sleep(Duration::from_millis(t)).await;
    println!("{:?}", t);
}

async fn async_main() {
    futures::join!(run_sleep(1000u64), run_sleep(1000u64), run_sleep(1000u64));
}

fn main() {
    futures::executor::block_on(async_main());
}

run_sleep 中的 await 并不会阻塞, 它会返回一个 future 对象给 async_main, 后者可以在多个 future 中 poll, 若某个 future ready, 则会通过 corouting 让 run_sleep 继续执行, 以达于多个 run_sleep 并发执行的效果: 三个 run_sleep 在 1s 后同时结束

1.5. coroutine/async/await 的用途

coroutine 的用途等同于如下的问题:

Q: 单核系统上使用多线程有什么用处?

A: 实现并发, 由 kernel 安排多个任务并发执行, 但并非并行执行.

coroutine 与单核上的多线程类似, 可以用来实现单线程的并发, 因为各个函数可以随意的转移控制并 resume.

单核多线程与 coroutine 有类似的对应关系:

单核多线程 coroutine
thread coroutine
进程调度 dispatcher
thread 上下文的保存与恢复 coroutine 上下文的保存与恢复
IO 中断 epoll 轮询

1.5.1. 代替 callback

1.5.2. 和 epoll 结合实现高吞吐量的 IO 操作

def dispatcher():
    while epoll(fdset):
        fd = getfd()
        mapping[fd].resume()

async def process():
    await connect
    await get_data

async def connect():
    mark(fd, connect)
    yield_to dispatcher

async def get_data():
    mark(fd, get_data)
    yield_to dispatcher


process()

实际上 async/await 是用一种模拟同步执行的写法表示 callback

Backlinks

Spike (Spike > interpreter > execute > host): spike 执行时分为 host 和 target 两部分, host 和 target 可以用不同的 thread 来实 现, 但默认配置下它们是通过 coroutine 实现的 (swapcontext)

Author: [email protected]
Date: 2019-11-28 Thu 00:00
Last updated: 2024-09-27 Fri 23:44

知识共享许可协议