协程 这一章来介绍一下Python最鲜为人知的, (表面上看起来)最无用的特性,也就是协程(Coroutine). 本章的notebook
文件见这里 . 协程是一种流程控制工具, 可以把它看作一种轻量级的线程,但是它的中断和继续都是由程序控制(而非系统阻塞),因此效率更高.
这么说可能有些难以理解,下面就一步步深入了解协程.
## 从生成器到协程
在Python2.5以后,生成器API中加入了`send`方法, 该方法可以发送数据并将该数据作为生成器函数中`yield`表达式的值. 自此, 生成器就可以用作协程.
下面介绍一个最简单的协程用法:
1 2 3 4 def simple_coroutine (): print('开始协程' ) x = yield print('协程接收到: ' , x)
1 2 my_coro = simple_coroutine() my_coro
<generator object simple_coroutine at 0x000001EC20B4C468>
开始协程
协程接收到: 58
---------------------------------------------------------------------------
StopIteration Traceback (most recent call last)
<ipython-input-6-fe8fce2fd396> in <module>()
----> 1 my_coro.send(58)
StopIteration:
上述代码给出了一个用生成器实现的协程, 和之前见过的生成器最大的不同在于yield
出现在了右边,而且关键字后面没有表达式. 这说明协程会从调用方接收数据.
定义好该协程后, 我们调用函数得到一个生成器对象my_coro
, 因为一开始生成器还没有启动,所以我们需要先调用next
函数. 然后执行完打印”开始协程”后, 流程来到yield
处, 并在收到send
函数发送的数据(58)后执行. 接着协程恢复, 一直运行到下一个yield
表达式, 或者到末尾中止(抛出StopIteration
). 协程有四种不同的状态(当前状态可以用inspect.getgeneratorstate
来获取):
GEN_CREATED 等待开始执行
GEN_RUNNING 解释器正在执行(仅出现在多线程应用中)
GEN_SUSPENDED 在yield
表达式出暂停
GEN_CLOSED 执行结束
仅当协程处于暂停状态时才能调用send
方法, 唯一的例外是当协程未激活(等待开始时),可以调用.send(None)
来激活它, 其效果等同于.next(my_coro)
.
下面用一个产出多个值的例子来更好的理解协程:
1 2 3 4 5 6 7 8 9 def simple_coro2 (a ): print('开始协程, a=' , a) b = yield a print('协程接收到: b=' , b) c = yield a + b print('协程接收到: c=' , c) my_coro2 = simple_coro2(14 ) from inspect import getgeneratorstate
1 getgeneratorstate(my_coro2)
'GEN_CREATED'
开始协程, a= 14
14
1 getgeneratorstate(my_coro2)
'GEN_SUSPENDED'
协程接收到: b= 28
42
协程接收到: c= 99
---------------------------------------------------------------------------
StopIteration Traceback (most recent call last)
<ipython-input-13-627eb114cd58> in <module>()
1 # 过程类似上一步,不过这里最后协程中止,抛出异常
----> 2 my_coro2.send(99)
StopIteration:
1 getgeneratorstate(my_coro2)
'GEN_CLOSED'
上面的代码最难懂的是b = yield a
这行, 这里右边的代码yield a
会在赋值之前先执行, 然后b
的值在下一次激活协程时再设定(而不是直接赋值为a
). 这里的代码不是非常直观, 需要一段时间才能习惯.
我们可以注意到,在上述代码中,每行yield
表达式既是上一阶段的结束, 又是下一阶段的开始.
使用协程计算移动平均值 下面我们用一个更复杂的示例来说明协程的行为. 这里我们希望用它来计算移动平均值:
1 2 3 4 5 6 7 8 9 def averager (): total = 0.0 count = 0 average = None while True : term = yield average total += term count += 1 average = total/count
这里的协程用来一个无限循环while True
来计算移动平均值, 那么它会一直接受值然后生成结果, 除非调用方调用了close
方法来关闭它. 使用协程的好处是不用再用闭包来保存上下文. 用法如下:
1 2 coro_avg = averager() next (coro_avg)
10.0
20.0
15.0
由于在启动协程前,我们需要用next
预先激活(prime)它, 这一步容易忘记,因此我们可以用一个特殊的装饰器来避免遗漏.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 from functools import wrapsdef coroutine (func ): @wraps(func ) def primer (*args, **kwargs ): gen =func(*args, **kwargs) next (gen) return gen return primer @coroutine def averager (): total = 0.0 count = 0 average = None while True : term = yield average total += term count += 1 average = total/count
1 2 coro_avg2 = averager() getgeneratorstate(coro_avg2)
'GEN_SUSPENDED'
上面的输出指明了,协程已经准备好可以接受值了,这是因为我们用装饰器使其在声明后就进行了预激.
终止协程和处理异常 协程中未处理的异常会向上回溯, 传给next
或是send
方法的调用者:
1 2 coro_avg = averager() coro_avg.send(40 )
40.0
TypeError Traceback (most recent call last)
<ipython-input-25-a8ca1776997a> in <module>()
----> 1 coro_avg.send('miao')
<ipython-input-21-93a336033508> in averager()
16 while True:
17 term = yield average
---> 18 total += term
19 count += 1
20 average = total/count
TypeError: unsupported operand type(s) for +=: 'float' and 'str'
StopIteration Traceback (most recent call last)
<ipython-input-27-2808c6331bad> in <module>()
----> 1 coro_avg.send(10)
StopIteration:
上面我们故意传入了一个字符串,引发了异常. 由于协程内没有处理异常,所以协程会终止,再次重新激活协程会抛出异常.
其实这就给出了一种终止协程的方法: 发送某个哨符值让协程退出. 常用的特殊值有None
和Ellipsis
. 我们也可以用两种方法,显式地把异常发给协程:
generator.throw
致使生成器在暂停的 yield 表达式处抛出指定的异常。如果生成器处理了抛出的异 常,代码会向前执行到下一个 yield 表达式,而产出的值会成为调用 generator.throw 方法得到的返回值。如果生成器没有处理抛出的异常,异常会向上冒泡,传到调用方的上下文中。
generator.close
致使生成器在暂停的 yield 表达式处抛出 GeneratorExit 异常。如果生成器没有处 理这个异常,或者抛出了 StopIteration 异常(通常是指运行到结尾),调用方不会报错。如果收到 GeneratorExit 异常,生成器一定不能产出值,否则解释器会抛出 RuntimeError 异常。生成器抛出的其他异常会向上冒泡,传给调用方。
下面举例说明一下这两个函数的用法.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 class DemoException (Exception ): """为演示构建的异常类""" def demo_exc_handling (): print('-> 协程开始' ) while True : try : x = yield except DemoException: print('*** 处理异常类. 继续执行...' ) else : print('-> 协程接收到 {!r}' .format (x)) raise RuntimeError('这一行应该永远不会被执行.' )
对于这样一个协程,正常的调用流程是这样的.
1 2 exc_coro = demo_exc_handling() next (exc_coro)
-> 协程开始
-> 协程接收到 5
-> 协程接收到 10
1 getgeneratorstate(exc_coro)
'GEN_CLOSED'
现在我们尝试传入异常,看一下协程如何处理.
1 2 exc_coro = demo_exc_handling() next (exc_coro)
-> 协程开始
-> 协程接收到 5
1 exc_coro.throw(DemoException)
*** 处理异常类. 继续执行...
1 getgeneratorstate(exc_coro)
'GEN_SUSPENDED'
可以看到这里协程对DemoException
做了处理,然后继续运行, 处于GET_SUSPENDED
等待状态.
但是当传入的异常没有处理时,协程就会自动停止,变为关闭状态, 如:
1 2 3 exc_coro = demo_exc_handling() next (exc_coro)exc_coro.send(5 )
-> 协程开始
-> 协程接收到 5
1 exc_coro.throw(ZeroDivisionError)
ZeroDivisionError Traceback (most recent call last)
<ipython-input-39-ab264ab92c46> in <module>()
----> 1 exc_coro.throw(ZeroDivisionError)
<ipython-input-28-152a963a63ce> in demo_exc_handling()
6 while True:
7 try:
----> 8 x = yield
9 except DemoException:
10 print('*** 处理异常类. 继续执行...')
ZeroDivisionError:
1 getgeneratorstate(exc_coro)
'GEN_CLOSED'
让协程返回值 这里我们用一个新版的移动平均值计算协程来说明 如何让协程返回值:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 from collections import namedtuple Result = namedtuple('Result' , 'count average' ) def averager (): total = 0.0 count = 0 average = None while True : term = yield if term is None : break total += term count += 1 average = total/count return Result(count, average)
1 2 coro_avg = averager() next (coro_avg)
StopIteration Traceback (most recent call last)
<ipython-input-46-af5e8de3e651> in <module>()
----> 1 coro_avg.send(None)
StopIteration: Result(count=3, average=15.5)
可以看到我们的返回值保存在了抛出的异常的value
中,那么如何获取返回值呢?
1 2 3 4 5 6 7 8 9 10 coro_avg = averager() next (coro_avg)coro_avg.send(10 ) coro_avg.send(30 ) coro_avg.send(6.5 ) try : coro_avg.send(None ) except StopIteration as exc: result = exc.value result
Result(count=3, average=15.5)
这样我们弯弯绕绕就获得了协程的返回值, 下面我们来讨论yield from
的结构.
使用yield from yield from
的主要功能是打开双向通道,将最外层的调用方和最内层的子生成器连接起来,这样二者可以直接发送和产出值, 还可以直接传入异常,而不用在位于中间的协程中添加大量处理异常的代码.
在介绍它之前, 我们先来说明一些专用的术语:
委派生成器 包含yield from <iterable>
表达式的生成器函数子生成器 从yield from
表达式中<iterable>
部分获取的生成器调用方 调用委派生成器的客户端代码.
下面给出一个例子来说明yield from
结构的用法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 from collections import namedtupleResult = namedtuple('Result' , 'count average' ) def averager (): total = 0.0 count = 0 average = None while True : term = yield if term is None : break total += term count += 1 average = total/count return Result(count, average) def grouper (results, key ): while True : results[key] = yield from averager() def main (data ): results = {} for key, values in data.items(): group = grouper(results, key) next (group) for value in values: group.send(value) group.send(None ) report(results) def report (results ): for key, result in sorted (results.items()): group, unit = key.split(';' ) print('{:2} {:5} averaging {:.2f}{}' .format ( result.count, group, result.average, unit)) data = { 'girls;kg' : [40.9 , 38.5 , 44.3 , 42.2 , 45.2 , 41.7 , 44.5 , 38.0 , 40.6 , 44.5 ], 'girls;m' : [1.6 , 1.51 , 1.4 , 1.3 , 1.41 , 1.39 , 1.33 , 1.46 , 1.45 , 1.43 ], 'boys;kg' : [39.0 , 40.8 , 43.2 , 40.8 , 43.1 , 38.6 , 41.4 , 40.6 , 36.3 ], 'boys;m' : [1.38 , 1.5 , 1.32 , 1.25 , 1.37 , 1.48 , 1.25 , 1.49 , 1.46 ], }
9 boys averaging 40.42kg
9 boys averaging 1.39m
10 girls averaging 42.04kg
10 girls averaging 1.43m
上面的代码的作用是给data
字典中的各个字段求平均值. 简要说明一下整个流程:
在main
函数中,每次外层循环会新建一个grouper
实例并赋给group
,这里的group
就是委派生成器 .
接着,预激委派生成器,此时委派生成器进入while True
循环, 调用子生成器averager
后在yield from
处暂停.
内层循环调用send
把值发给avergaer
, 同时group
生成器仍然停在yield from
那里.
整个内层循环结束,group
生成器仍然停在yield from
那里.
生成器send(None)
,然后结束averger
实例,委派生成器进入循环下一次.
外层循环重新构建一个grouper
实例, 然后绑定到group
变量, 前一个实例被垃圾回收程序回收.
这里给出了yield from
结构最简单的用法, 即只有一个委派生成器和一个子生成器. 委派生成器相当于管道, 因此我们可以将任意数量的委派生成器连接到一起: 一个委派生成器使用yield from
调用一个子生成器, 而那个子生成器本身也是一个委派生成器, 以此类推, 只要链条最终使用yield
表达式结束.