Python 协程的前世今生

生成器与协程

  在Python2.3中,生成器正式被纳入标准。在处理大量数据时,生成器非常有用,例如读取一个几G的文件,你没有必要将文件一次性读取到内存,而是读到一行就将其返回,这样程序不会阻塞在等待读取文件的过程中,也可以大大减少内存的占用。

1
2
3
4
5
6
7
def read_lazy(file_object):
"""Lazy function (generator) to read a file line by line."""
while True:
data = file_object.readline()
if not data:
break
yield data

  生成器执行时候每次遇到yield就会停顿一次并返回一个值,而你可以使用next()让生成器继续执行,直到遇到下一个yield

1
2
3
4
5
6
7
f = open('data')
while True:
try:
line = next(f)
print(line, end='')
except StopIteration:
break


  后来人们意识到了,既然有办法暂停一个函数的执行,稍后让它从暂停的地方继续执行,这不就满足了协程的概念吗?然而你只能获得生成器的返回值,而不能向生成器传递值。为支持协程,PEP 342提出了你可以使用send()向生成器发送一个值同时重启生成器的执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
def grep(pattern):
try:
while True:
line = (yield)
if pattern in line:
print(line)
except GeneratorExit:
print('See you again!')
g = grep('Python')
g.send(None) # 启动协程
g.send("Python Cookbook")
g.close()

  既然一个协程在执行时候可以通过yield让出当前线程,那么我们就可以在不同的协程之间进行切换从而达到并发的效果,为此你需要编写一个任务调度器,实现基于Cooperative multitasking的任务调度,通过这个调度器管理不同协程的执行。


  下面我们实现一个基于协程的并发Echo Server,这个程序的核心在于任务调度器:

  • 当EchoServer在执行accept, recvsend这类I/O操作之前,会主动让出,这时控制权将移交到调度器手中。
  • 当调度器发现将要执行的I/O操作是acceptrecv时,就将任务放到等待读取的队列,若是send则将任务放到等待写入的队列。
  • 调度器会对这两个队列进行I/O轮询,并将就绪的任务放到就绪队列中。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
from socket import socket, AF_INET, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR
from collections import deque
from select import select
class Scheduler(object):
def __init__(self):
self.ready = deque() # 就绪队列 deque: coroutine
self.read_waiting = {} # 等待读取的队列 dict: socket -> coroutine
self.write_waiting = {} # 等待写入的队列 dict: socket -> coroutine
def add_ready(self, task):
self.ready.append(task)
def add_read(self, fileno, task):
self.read_waiting[fileno] = task
def add_write(self, fileno, task):
self.write_waiting[fileno] = task
def loop(self):
while any([self.ready, self.read_waiting, self.write_waiting]):
# 如果就绪队列是空的,就进行I/O轮询,等待事件就绪
while not self.ready:
can_read, can_write, _ = select(self.read_waiting,
self.write_waiting,
[])
for fileno in can_read:
self.ready.append(self.read_waiting.pop(fileno))
for fileno in can_write:
self.ready.append(self.write_waiting.pop(fileno))
task = self.ready.popleft()
try:
event, fileno = next(task) # 让协程执行到 yield
if event == 'read':
self.read_waiting[fileno] = task
elif event == 'write':
self.write_waiting[fileno] = task
else:
raise RuntimeError("ERROR!")
except StopIteration:
print('task done')
class AsyncSocket(object):
def __init__(self, sock):
self.sock = sock
def recv(self, maxsize):
yield 'read', self.sock # 主动让出,交给调度器调度
return self.sock.recv(maxsize)
def send(self, data):
yield 'write', self.sock # 主动让出,交给调度器调度
return self.sock.send(data)
def accept(self):
yield 'read', self.sock # 主动让出,交给调度器调度
client, addr = self.sock.accept()
return AsyncSocket(client), addr
def __getattr__(self, name):
return getattr(self.sock, name)
def echo_server(scheduler, address, backlog=5):
sock = AsyncSocket(socket(AF_INET, SOCK_STREAM))
sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
sock.bind(address)
sock.listen(backlog)
while True:
client_sock, client_addr = yield from sock.accept() # blocking
scheduler.add_ready(echo_handler(client_addr, client_sock))
def echo_handler(address, client_sock):
print('Got connection from {}'.format(address))
while True:
msg = yield from client_sock.recv(8192) # blocking
if not msg:
break
yield from client_sock.send(msg) # blocking
client_sock.close()
if __name__ == '__main__':
scheduler = Scheduler()
server = echo_server(scheduler, ('', 20000))
scheduler.add_ready(server)
scheduler.loop()

  这段代码实现了一个小型的协作式任务调度器。这里你应该意识到协程与线程的不同点了,线程的调度是基于抢占式调度的,在抢占式调度里面,一个线程可能来不及将寄存器变量写入到内存中,或者处理器的缓存来不及同步到主内存中,就被迫切换到另一线程,这就是线程中为什么需要使用各种同步的方式来保证数据的一致性。而协程是主动让出,所以不存在多个协程同时修改同一变量的情况,也就不需要使用互斥锁等同步方式了。
  而协作式调度的一个明显缺点就是,如果某个协程在执行阻塞的调用,或者执行密集的CPU计算,这时协程就无法让出,整个程序就会失去响应了。这就是为什么当今的操作系统放弃协作式调度的原因了。

异步编程

  在需要处理大量I/O操作的程序中,例如爬虫系统,当程序发起一个页面请求,到接收到响应这个过程中,程序可以先执行其它的任务,而不用处于阻塞状态,这就是异步编程。异步编程可以让程序一直保持在忙碌的状态,而不用浪费时间在一些无谓的等待中。Python3.4 引入了 asyncio 模块,用于支持异步编程。asyncio要求,如果你要将一个生成器当成协程来使用,那么应该使用asyncio.coroutine来装饰它。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import asyncio
@asyncio.coroutine
def countdown(number, n):
while n > 0:
print('T-minus', n, '({})'.format(number))
yield from asyncio.sleep(1)
n -= 1
loop = asyncio.get_event_loop()
tasks = [ asyncio.ensure_future(countdown("A", 2)),
asyncio.ensure_future(countdown("B", 2))]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

  这段代码的核心在于事件循环loop,事件循环的原理类似于任务管理器。事件循环的原理并不难理解:当协程执行到yield from的时候会暂定执行并返回一个asyncio.Future对象给looploop内部将维持一个队列来保存这些asyncio.Future对象,一旦loop监测到事件发生了(例如一秒过去了),就会将相应的asyncio.Future发送给原来的协程,这时协程将继续执行。

asyn/await 语法

  Python3.5 指出,你可以使用async将一个函数定义成协程,但async函数中却不能出现yield表达式。如果你要定义一个使用yield的协程,可以使用types.coroutine装饰器:

1
2
3
4
5
6
7
8
from types import coroutine
@coroutine
def foo():
yield 'foo'
g = foo()
print(g.send(None)) # 'foo'

  Python3.5 引入types.coroutine的目的在于区分生成器和协程。为弄清楚这个区别,首先得理解什么是Awaitable Objects,Python3.5 规定,await表达式只能用于awaitable对象。


  那么,有哪些是awaitable对象呢?

  • 实现了__await__()方法,并且这个方法返回一个可迭代的对象。
  • async定义的函数,这类函数对象实现了__await__()方法。
  • type.coroutine修饰的函数。
  • asyncio.coroutine修饰的函数。

  现在你该明白了,由types.coroutine修饰的函数可以用于await表达式中,而一般的生成器则不能:

1
2
3
4
5
6
7
8
9
10
11
from types import coroutine
@coroutine
def foo():
yield 'foo'
async def bar():
await foo()
b = bar()
print(b.send(None))

  你可以将await等同于yield from,但与yield from的差别是,await的使用范围更广,任务awaitable对象都可以用于await表达式中。

理解事件循环

  让我们编写一个并发的Echo Server,这个例子的核心在于asyncio的事件循环:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import asyncio
from socket import socket, AF_INET, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR
loop = asyncio.get_event_loop()
async def echo_server(address, backlog=5):
sock = socket(AF_INET, SOCK_STREAM)
sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
sock.bind(address)
sock.listen(backlog)
sock.setblocking(False)
while True:
client, addr = await loop.sock_accept(sock)
print('Connection from', addr)
loop.create_task(echo_handler(client))
async def echo_handler(client):
with client:
while True:
data = await loop.sock_recv(client, 8192)
if not data:
break
await loop.sock_sendall(client, data)
print('Connection closed.')
if __name__ == '__main__':
loop.create_task(echo_server(('', 20000)))
loop.run_forever()

  David Beazley大神编写了一个事件循环Loop,用来模仿asyncio的事件循环。这个神奇的例子是不是很熟悉,很类似于我们前面的任务调度器。或许这个例子可以加深我们的理解:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
from types import coroutine
from collections import deque
from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE
@coroutine
def read_wait(sock):
yield 'read_wait', sock
@coroutine
def write_wait(sock):
yield 'write_wait', sock
class Loop(object):
def __init__(self):
self.ready = deque()
self.selector = DefaultSelector()
async def sock_recv(self, sock, maxbytes):
await read_wait(sock)
return sock.recv(maxbytes)
async def sock_accept(self, sock):
await read_wait(sock)
return sock.accept()
async def sock_sendall(self, sock, data):
while data:
await write_wait(sock)
nsend = sock.send(data)
data = data[nsend:]
def create_task(self, task):
self.ready.append(task)
def run_forever(self):
while True:
while not self.ready:
events = self.selector.select()
for key, _ in events:
self.ready.append(key.data)
self.selector.unregister(key.fileobj)
while self.ready:
self.current_task = self.ready.popleft()
try:
op, *args = self.current_task.send(None)
getattr(self, op)(*args)
except StopIteration:
pass
def read_wait(self, sock):
self.selector.register(sock, EVENT_READ, self.current_task)
def write_wait(self, sock):
self.selector.register(sock, EVENT_WRITE, self.current_task)

参考资料