跳转至

异步

协程

在传统线程的执行过程中, 函数都是顺序执行的. 而协程, Coroutine, 则不同, 在执行过程中, 其函数内部可以产生中断, 转而执行别的函数, 在适当的时候再返回来接着执行. 所以说, 这是一种并发执行, 和多线程差不多, 多线程也是并发执行, 而不是并行执行(由于GIL锁的存在).

协程的优势

协程和多线程比的优势就是:

  • 极高的执行效率: 在线程内部不涉及切换, 没有开销
  • 不需要多线程锁: 只有一个线程, 不存在像多线程那样子线程同时读写共享资源的冲突, 但是存在对其他进程之间的共享资源的读写冲突

协程实现并行

在单个线程内的协程是并发的, 可以用多进程+协程实现利用多核心CPU.

协程的实现方式

可以由多种方式实现协程, 其中主要包括生成器asyncio模块提供的async/await关键字.

生成器实现协程

生成器是一种强大的工具, 允许你通过惰性方式生成序列. 通常, 我们用生成器对象的next()方法获取下一个值. 然而, 生成器对象还有send()方法, 不仅可以恢复生成器的执行, 还可以向生成器对象发送一个值, 该值会被yield表示式接受并作为返回值.

例子

定义:

def echo_generator():
while True:
    received = yield
    print(f'Received: {received}')

gen = echo_generator()
next(gen)
gen.send('Hello')
gen.send('World')

执行:

$ python main.py
Received: Hello
Received: World
  1. 得到了一个生成器对象gen
  2. next(gen)用于启动生成器, 直到执行到yield中断
  3. gen.send('Hello'), 执行赋值操作received = 'Hello', 打印, 直到执行到yield中断
  4. gen.send('World'), 执行赋值操作received = 'World', 打印, 直到执行到yield中断

现在来看由生成器实现的一个协程:

例子

定义:

def consumer():
    v = ''
    while True:
        u = yield v
        if not u:
            return
        print('[CONSUMER] Consuming %s...' % u)
        v = '200 OK'

def produce(c):
    c.send(None)
    n = 0
    while n < 5:
        n = n + 1
        print('[PRODUCER] Producing %s...' % n)
        r = c.send(n)
        print('[PRODUCER] Consumer return: %s' % r)
    c.close()

c = consumer()
produce(c)

执行:

$ python main.py
[PRODUCER] Producing 1...
[CONSUMER] Consuming 1...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 2...
[CONSUMER] Consuming 2...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 3...
[CONSUMER] Consuming 3...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 4...
[CONSUMER] Consuming 4...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 5...
[CONSUMER] Consuming 5...
[PRODUCER] Consumer return: 200 OK
  1. 得到了一个生成器对象c, 将其传入produce()函数中
  2. c.send(None)用于启动生成器, 无法执行赋值操作u = None, 直到执行到yield ''中断, 返回给生产者''
  3. c.send(1), 执行赋值操作u = 1, 消费者消费, 直到执行yield '200 OK'中断, 返回给生产者'200 OK'
  4. c.send(2), 执行赋值操作u = 2, 消费者消费, 直到执行yield '200 OK'中断, 返回给生产者'200 OK'
  5. 继续下一轮循环
Tip

每次执行r = c.send([value])时, u = yield v语句的执行过程如下:

  1. 上次操作只执行了yield v但是没有执行赋值
  2. 从中断点继续执行另一半赋值u = [value]
  3. 执行逻辑
  4. 执行u = yield v中的yield v部分并中断, 不执行赋值
  5. r = v
笔记

传统的生产者-消费者模型是一个线程生产, 一个线程消费, 通过锁控制, 一不小心可能导致死锁. 如果改用协程, 生产者生产之后, 直接通过生成器对象的send()函数将物品转移给消费者消费, 待消费者消费完毕后, 切换到生产者继续生产, 效率极高.

asyncio模块实现协程

asyncio是Python 3.4版本引入的, 内置了对异步IO的支持. 它的编程模型就是一个事件循环, 我们从asyncio模块中直接获取一个EventLoop的引用, 然后把需要执行的协程放到EventLoop中执行, 就实现了异步IO.

事件循环

在系统中, 可以产生事件的实体叫做事件源, 能处理事件的实体叫做事件处理者. 此外, 还有一些第三方实体叫做事件循环. 它的作用是管理所有的事件, 在整个程序运行的过程中不断循环执行, 追踪事件发生的顺序将它们放到队列中, 当主线程空闲的时候, 调用相应的事件处理者来处理事件.

Tip

可以通过下列伪代码理解事件循环:

while (1) {
    events = getEvents();
    for (e in events)
        processEvent(e);
}

老写法

注意

@asyncio.coroutine在Python 3.11中已经被移除, 所以下列代码在Python 3.11+运行会报错.

@asyncio.coroutine将一个生成器标为coroutine类型, 然后就把这个生成器放到事件循环中执行.

例子

定义:

import threading
import asyncio

@asyncio.coroutine
def hello():
    print('Hello world! (%s)' % threading.currentThread())
    yield from asyncio.sleep(1)

@asyncio.coroutine
def welcome():
    yield from asyncio.sleep(1)
    print('Welcome world! (%s)' % threading.currentThread())

loop = asyncio.get_event_loop()
tasks = [hello(), welcome()]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

执行:

$ python main.py
Hello world! (<_MainThread(MainThread, started 140706241734464)>)
# 暂停了1秒
Welcome world! (<_MainThread(MainThread, started 140706241734464)>)

这里, yield from语法可以让我们很方便地调用另一个生成器, 当然你如果想自己写yield, 不想调用别人的生成器也可以, 你自己把这个IO操作写出来执行完了yield一下就可以了. 上面的案例本质上就是要执行asyncio.sleep(1), 执行到它里面的yield为止, 这个操作耗时1秒, 可以看成是一个1秒的IO操作, 在此期间, 主线程并未等待, 而是去事件循环中执行其他可执行的coroutine了, 如在welcome()这个coroutine等待结果的时候, hello()这个coroutine运行.

yield from

yield from可以用来调用其他的生成器.

例子

定义:

def sub_generator():
    yield "Sub generator: step 1"
    yield "Sub generator: step 2"
    yield "Sub generator: step 3"

def main_generator():
    print("Main generator: start")
    yield "Main generator: before sub generator"
    yield from sub_generator()
    yield "Main generator: after sub generator"
    print("Main generator: end")

gen = main_generator()
for value in gen:
    print(value)

执行:

$ python main.py
Main generator: start
Main generator: before sub generator
Sub generator: step 1
Sub generator: step 2
Sub generator: step 3
Main generator: after sub generator
Main generator: end

定义:

def sub_generator():
    yield "Sub generator: step 1"
    yield "Sub generator: step 2"
    return "Sub generator: done"

def main_generator():
    print("Main generator: start")
    result = yield from sub_generator()
    print(f"Main generator: received '{result}' from sub generator")
    yield "Main generator: end"

gen = main_generator()
for value in gen:
    print(value)

执行:

$ python main.py
Main generator: start
Sub generator: step 1
Sub generator: step 2
Main generator: received 'Sub generator: done' from sub generator
Main generator: end

新写法

为了简化并更好的标识异步IO, 从Python3.5开始引入了新的语法asyncawait. 要使用新的语法, 只需要做两步的简单替换:

  1. @asyncio.coroutine替换为async
  2. yield from替换为await
例子
async def hello():
print("Hello world!")
r = await asyncio.sleep(1)
print("Hello again!")