前言
协程这个概念很早就听说了,但一直处于了解阶段,个人感觉协程是未来IO密集型+异步处理的趋势,因为效果很明显。这篇文章记录自己学习协程的过程。
同步 异步 阻塞和非阻塞
同步(sync)
: 在发出一个功能调用时,在没有得到结果之前,改调用不返回。异步(Async)
: 当一个异步过程功能调用发出后,调用者不能立刻得到结果。实际处理这个调用的部件在完成后,通过状态、通知和回调来通知调用者。阻塞(Block)
: 阻塞调用是指调用结果返回之前,当前线程被挂起。(其线程进入非可执行状态,在这个状态下,CPU不会给线程分配时间片,即现场暂停运行,将计算资源让给其他活动线程,当I/O操作结束,改线程阻塞状态解除,重新变成活动线程,继续争夺CPU资源)函数只有在得到结果之后才返回。和同步调用不同的是,同步调用很多时候线程还是激活的,只是从逻辑上当前函数还没有返回而已。非阻塞(Unblock)
:非阻塞和阻塞的概念相对应,指在不能立刻得到结果之前,该函数不会阻塞当前线程,而会立刻返回。
这里有一个形象的比喻:
如果小明点击下载按钮之后,就一直干瞪着进度条不做其他任何事情直到软件下载完成,这是同步阻塞;
如果小明点击下载按钮之后,就一直干瞪着进度条不做其他任何事情直到软件下载完成,但是软件下载完成其实是会「叮」的一声通知的(但小明依然那样干等着),这是异步阻塞;(不常见)
如果小明点击下载按钮之后,就去做其他事情了,不过他总需要时不时瞄一眼屏幕看软件是不是下载完成了,这是同步非阻塞;
如果小明点击下载按钮之后,就去做其他事情了,软件下载完之后「叮」的一声通知小明,小明再回来继续处理下载完的软件,这是异步非阻塞。
简单来说,区分同步和异步看是否有调用完的“通知”,阻塞和非阻塞看是否轮询。异步非阻塞 不去沦陷,只等待通知。
并行和并发
简单来说,并行是指两个或者多个事件在同一时刻发生,通常是当系统有一个以上CPU或CPU核心时,才有可能并行。两个或者多个事件、线程并步抢占CPU资源。而并发是指两个或者多个事件在同一时间间隔内发生,在一定时间间隔内,有多个程序在执行,但在同一时刻,只有一个程序在执行。
生成器和迭代器
通过一个简单的例子理解一下:
生成一个斐波那契數列,输出前N个:
def fab(max):
n, a, b = 0, 0, 1
L = []
while n < max:
L.append(b)
a, b = b, a + b
n = n + 1
return L
这个代码看上去是没有问题的,但是当需要生成的数列数量非常大时,由于代码中将数字存储在list
中,会导致巨大内存占用。能不能优化?
生成器:
如果这个元素可以通过某种方式推算出来切可以进行循环操作,就避免了大的内存占用。只需要函数在循环时计算得下一个数字并返回,这样就不必创建完整的list
,从而节省大量空间。在Python'中,这种一边循环一边计算的机制,称为生成器:generator
。
class Fab(object):
def __init__(self, max):
self.max = max
self.n, self.a, self.b = 0, 0, 1
def __iter__(self):
return self
def next(self):
if self.n < self.max:
r = self.b
self.a, self.b = self.b, self.a + self.b
self.n = self.n + 1
return r
raise StopIteration()
上述代码通过类的形式将函数封装为一个可迭代对象。通过next方法在循环的时候每次去取一个数,只有在需要使用的时候才会生成,内存占用很小。但是,上述代码较为繁琐,在Python中,有一种语法糖能简化,那就是yield
。
yield
语法糖
def fab(max):
n, a, b = 0, 0, 1
while n < max:
yield b
# print b
a, b = b, a + b
n = n + 1
调用和class
版的完全一致,也可以使用next
方法等。简单的说,yield
的作用就是把一个函数变为一个generator
,带有yield
的函数不再是一个普通函数,Python
解释器会将器视为generator
。在for
循环执行时,每次循环都会执行fab
函数内部的代码,执行到yield
时。函数就返回一个迭代值,下次迭代时,就从yield
的下一句继续执行。调用next也是同理。
当函数执行结束时,会抛出StopIteration
异常,表示迭代完成。
迭代器
Python
中,一个可以被用来for
循环的对象可统称为可迭代对象。可以用isinstance()
判断一个对象是否为可迭代对象(Iterable)
。生成器不但可以作用于for
循环,还可以被next
函数不断调用返回下一个值,最后抛出异常。而迭代器可以被next()
函数调用并不断返回下一个值的对象称为迭代器(Iteratot
)对象。
生成器和协程
从语法上讲,生成器是一个带yield语句的函数。协程,又称微线程
,纤程
,英文Coroutine
。先看看协程的概念:
协程通过允许多个入口点在某些位置暂停和恢复执行来概括用于非抢占式多任务的子程序。
从某些角度来理解,协程其实就是一个可以暂停执行的函数,并且可以恢复继续执行。我们知道,yield
关键字已经可以暂停执行,如果在暂停后有方法把一些值发送回到暂停窒息的函数中,那么便就可以理解为协程。在Python PEP 342
中,添加了“把东西发回已经暂停的生成器中”的方法,这个方法就是send()
。
生成器和协程都是通过python
中yield
的关键字实现的,不同的是,生成器只调用next
来不断的生成数据,而协程会调用next
和send
来返回结果和接收参数。
def stupid_fib(n):
index = 0
a = 0
b = 1
while index < n:
sleep_cnt = yield b
print('let me think {0} secs'.format(sleep_cnt))
time.sleep(sleep_cnt)
a, b = b, a + b
index += 1
print('-'*10 + 'test yield send' + '-'*10)
N = 20
sfib = stupid_fib(N)
fib_res = next(sfib)
while True:
print(fib_res)
try:
fib_res = sfib.send(random.uniform(0, 0.5))
except StopIteration:
break
其中next(sfib)
相当于sfib.send(None)
,可以使得sfib
运行至第一个yield
处返回。后续将一个随机的秒数发送给sfib
,作为当前中断的yield
表达式的返回值。yield
表达式的作用包含了三个步骤:
1、 向函数外抛出值
2、 暂停,等待next()
或send()
恢复。
3、 赋值,接受send
发送过来的数据。
需要注意的是,在使用send(None)
或者next()
预激生成器函数,并执行到第一个yield
语句结束的位置时,实际程序只执行了1、2两步,程序返回了值,并暂停,并没有执行第三步去赋值。在后续的循环中,才会进行赋值。
yield from
在Python3.3
出现了yield from
语法,yield from item
表达式从item
中获得迭代器。yield from
可以代替for
循环,使得代码更为精炼。yield from
后面需要加的是可迭代对象。
def first_gen():
for c in "AB":
yield c
for i in range(0, 3):
yield i
print(list(first_gen()))
def second_gen():
yield from "AB"
yield from range(0, 3)
print(list(second_gen()))
结果:
['A', 'B', 0, 1, 2]
['A', 'B', 0, 1, 2]
当yiled from
后面加上一个生成器之后,就实现了生成的嵌套。实现生成器的嵌套,不一定要使用yield from
,但它可以让我们避免让自己处理各种料想不到的异常。如果自己去实现,会加大编码的难度。
yield from
的主要功能是打开双向通道,把最外层的调用与最内层的子生成器连接起来,这样二者就可以直接发送和产出值,还可以直接穿入异常。
委派生成器在yied from
表达式处暂停时,调用方可以直接把数据发给子生成器,子生成器再把产出值发给调用方,子生成器返回之后,解释器会抛出StopIteration
异常。
委托生成器的作用就是:在调用方与子生成器之间建立一个双向通道。
为什么一定要使用yield from
语句呢:
在使用yiled from
语句时,语句为我们已经处理了很多的异常:
greenlet
和gevent
greenlet
可以实现协程,不过每一次都要人为去指向下一个该执行的协程。greenlet
可以从一个协程切换到任意其他协程,但必须保证greenlet
的正常结束,在协程之间的任意切换很容易出现问题。greelet
是Stackless
发展来的Cpython
扩展包,greelet
是底层实现了原生协程的C扩展库。
使用greenlet
实现的生产者-消费者
模型:
# 基于greenlet的生产者消费者协程
from greenlet import greenlet
import random
import time
def Producer():
while True:
item = random.randint(1, 10)
print("生产<{}>中...".format(item))
time.sleep(1)
c.switch(item) # 切换到消费者,并将item传入。
def Consumer():
while True:
item = p.switch() # 切换到生产者。等待生产者传递参数item
print("消费<{}>中..".format(item))
c = greenlet(Consumer) # 将普通函数编程协程
p = greenlet(Producer) # 同理
c.switch() # 启动协程,Consumer先执行
"""
从consumer开始执行,执行到item=p.switch()时,程序切换到producer,并等待传参
producer得到执行权后,生成一个item,并往下执行代码
当producer执行到c.switch(item)时,程序携带传递的item切换到consumer,
consumer继续往下执行,直到下一次运行到p.switch时,交出执行权,切换到producer,重复以上过程
"""
greenlet
的价值在于高性能的原生协程,且语义更加明确、显式切换,执行到switch
时就切换程序
直接将函数包装成协程,可以保留原代码的风格
gevent
gevent
是实现协程的第三方库,通过封装greenlet
,epoll
回调编程模式,生成器协程实现。当遇到IO
操作时,就自动切换到其他协程,等到IO
操作完成,再在适当的时候切换回来继续执行。gevent
会自动切换协程。就保证总有协程在执行,而不是等待IO
。由于切换实在IO
操作时自动完成。所以gevent
需要修改Python
的自带的一些保准库,这一过程在启动时通过monkey patch
完成。
"""
gevent: 通过greenlet实现协程,核心就是遇到IO操作,会自动切换到其他协程
"""
# 将python标准库中的一些阻塞操作变为非阻塞
from gevent import monkey;monkey.patch_all()
# 使用猴子补丁要写在第一行
import gevent
def test1():
print("test1")
gevent.sleep(0) # 模拟耗时操作
print("test11")
def test2():
print("test2")
gevent.sleep(0) # 模拟耗时操作
print("test22")
# g1 = gevent.spawn(test1) # 将函数封装成协程,并启动
# g2 = gevent.spawn(test2)
# gevent.joinall([g1, g2])
"""
# joinall() 阻塞当前流程,执行给定的greenlet(列表中的对象),等待程序执行完
# spawn是启动协程,参数为函数名及其参数
运行结果:
test1
test2
test11
test22
代码执行test1,打印test1,遇到gevent.sleep(0)时切换程序,执行test2
test()执行,打印test2,执行到gevent.sleep(0)时切换程序
执行test1在gevent.sleep(0)后面的代码,直到再次遇到gevent时,切换程序
然后在test2中,继续执行gevent后的代码,直到遇到gevent时,再次切换
直到程序执行完毕
"""
gevent的价值在于它的使用基于epoll的libev来避开阻塞;使用基于gevent的高效协程,来切换执行
只在遇到阻塞的时候切换,没有轮询和线程开销。
asyncio.coroutine
在Python3.4
中加入了asyncio
库,使得Python
获得了事件循环的特性,但这个还是以生成器对象为基础,yield from
在asyncio
模块中很常用。通过asnyncio+生成器
,我们可以实现这样一个异步的模型:
import asyncio
@asyncio.coroutine
def counttdown(number, n):
while n > 0:
print("T-minus", n, "({})".format(number))
yield from asyncio.sleep(1)
n -= 1
loop = asyncio.get_event_loop()
tasks = [
asyncio.ensure_future(counttdown("A", 2)),
asyncio.ensure_future(counttdown("B", 5)),
]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
这里asyncio.coroutine
装饰器是用来标记这个函数是一个协程,因为asyncio
要求所有用作协程的生成器必须由asyncio.coroutine
装饰。这段代码中,事件循环会启动两个countdown()
协程,它们会一直执行,知道遇到yield from asyncio.sleep()
,暂停执行,并将一个async.Future
对象返回给事件循环。事件循环会监控这个asyncio.Future
对象,一旦执行完成后,会将这个Future
的执行结果返回给刚刚因为这个Futur
e暂停的协程,并且继续执行原协程。
event_loop
事件循环:程序开启一个无限的循环,程序员会把一些函数注册到事件循环上。当满足事件发生的时候,调用相应的协程函数。coroutine
协程:协程对象,指一个使用async关键字定义的函数,它的调用不会立即执行函数,而是会返回一个协程对象。协程对象需要注册到事件循环,由事件循环调用。task
任务:一个协程对象就是一个原生可以挂起的函数,任务则是对协程进一步封装,其中包含任务的各种状态。
协程对象不能直接运行,在注册事件循环的时候,其实是run_until_complete
方法将协程包装成一个任务(task
)对象。task
对象是Future
的子类,保存了协程运行后的状态,用于未来获取协程的结果。
在上面的代码中,asyncio.sleep
中,创建了一个Futrure
对象,作为更内层的协程对象,通过yield from
交给了事件循环,而Future
是一个实现了__iter__
对象的生成器。
@coroutine
def sleep(delay, result=None, *, loop=None):
"""Coroutine that completes after a given time (in seconds)."""
future = futures.Future(loop=loop)
h = future._loop.call_later(delay,
future._set_result_unless_cancelled, result)
try:
return (yield from future)
finally:
h.cancel()
class Future:
#blabla...
def __iter__(self):
if not self.done():
self._blocking = True
yield self # This tells Task to wait for completion.
assert self.done(), "yield from wasn't used with future"
return self.result() # May raise too.
当协程yield from asyncio.sleep
时,事件循环其实是与Future
对象建立了联系。程序运行结果如下:
async
和await
在Python3.5
中引入了async
和await
,可以将它们理解为asyncio.coroutine / yield from
的完美替身,async/await
让协程表面上独立于生成器而存在,将细节隐藏于asyncio
模块之下。使用await
可以针对耗时的操作进行挂起,类似于生成器里的yield
一样,使函数让出控制权。协程遇到await
,事件循环挂起该协程,直到其他协程也挂起或者执行完毕,再进行下一个协程的执行。耗时的操作一般是一些IO
操作,如网络请求,文件读取等。这里可以使用asyncio.sleep
来进行模拟举例:
import asyncio
import time
now = lambda: time.time()
async def do_some_work(x):
print('Waiting: ', x)
await asyncio.sleep(x)
return 'Done after {}s'.format(x)
start = now()
coroutine1 = do_some_work(1)
coroutine2 = do_some_work(2)
coroutine3 = do_some_work(4)
tasks = [
asyncio.ensure_future(coroutine1),
asyncio.ensure_future(coroutine2),
asyncio.ensure_future(coroutine3)
]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
for task in tasks:
print('Task ret: ', task.result())
print('TIME: ', now() - start)
Waiting: 1
Waiting: 2
Waiting: 4
Task ret: Done after 1s
Task ret: Done after 2s
Task ret: Done after 4s
TIME: 4.003541946411133
在sleep
的时候,使用await
让出控制权。当遇到阻塞调用的函数的时候,使用await
方法将协程的控制权让出,以便loop
调用其他的协程。
注意的区别是:await
接受的对象必须是一个awaitable
的对象,所谓awaitable
的对象,就是一个实现了__await()__
方法的对象,而且这个方法必须返回一个不是协程的迭代器。在Python3.6
中,yield
和await
也可以在同一个函数中使用,初次之外,也可以在列表推导等地方使用async for
或await
语法。
result = [i async for i in aiter() if i % 2]
result = [await func() for fun in funcs if await condition()]
async def test(x, y):
for i in range(y):
yield i
await asyncio.sleep(x)
协程与异步
与多线程编程不同的是,多个协程总是运行在同一个线程中,一旦其中的一个协程发生阻塞行为,进而所有的协程都无法继续运行。例如在我们进行爬虫编写时,习惯使用requests
库,而这个库就是阻塞的。尝试使用协程的方式进行编写:
import asyncio
import requests
import time
start = time.time()
async def get(url):
return requests.get(url)
async def request():
url = 'http://127.0.0.1:5000'
print('Waiting for', url)
response = await get(url)
print('Get response from', url, 'Result:', response.text)
tasks = [asyncio.ensure_future(request()) for _ in range(5)]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
end = time.time()
print('Cost time:', end - start)
Waiting for http://127.0.0.1:5000
Get response from http://127.0.0.1:5000 Result: Hello!
Waiting for http://127.0.0.1:5000
Get response from http://127.0.0.1:5000 Result: Hello!
Waiting for http://127.0.0.1:5000
Get response from http://127.0.0.1:5000 Result: Hello!
Waiting for http://127.0.0.1:5000
Get response from http://127.0.0.1:5000 Result: Hello!
Waiting for http://127.0.0.1:5000
Get response from http://127.0.0.1:5000 Result: Hello!
Cost time: 15.134317874908447
而不使用协程,使用普通方式,也是这个时间。为什么会这样呢,究其原因是requests
并不支持异步操作。在运行时阻塞并未挂起。另外await
后面所跟的对象必须是:一个原生coroutine
对象,一个由types.coroutine
装饰的生成器,这个生成器可以返回coroutine
对象。而requests
返回的对象不符合上述条件。为了程序运行不报错,上面代码在await
时对requsts
进行了一次async
函数的包装,但是它并不是“原生的coroutine对象”,因此也就不能真正异步了。
可以通过使用实现了异步的aiohttp
或者Trip
库改写上述爬虫。
import asyncio
import time
import aiohttp
from spider_normal import targets, show_results
final_results = {}
async def get_content(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
content = await resp.read()
return len(content)
async def spider(url):
length = await get_content(url)
final_results[url] = length
return True
def main():
loop = asyncio.get_event_loop()
cor = [spider(url) for url in targets]
start_time = time.time()
result = loop.run_until_complete(asyncio.gather(*cor))
print("Use time: {:.2f}s".format(time.time() - start_time))
show_results(final_results)
print("loop result: ", result)
if __name__ == '__main__':
main()
可以看到,运行时间大幅度降低。
总结
这篇文章记录自己对协程的一些简单理解和认知,某些细节较为简浅粗糙,协程目前在见到的项目中解除不多,更多的是使用多进程或多线程的方式去处理。通过这篇文章,可以看到,协程相对我们熟悉的线程,多进行还是有点难理解的,特别是协程内部处理以及异步操作原理,逻辑较为分散。希望以后的项目中能用到协程去处理问题。
REFERENCE
[1] https://cuiqingcai.com/6160.html
[2] https://bbs.ichunqiu.com/forum.php?mod=viewthread&tid=41931&highlight=%E5%8D%8F%E7%A8%8B
[3] https://bbs.ichunqiu.com/forum.php?mod=viewthread&tid=41930&highlight=%E5%8D%8F%E7%A8%8B
[4] http://python.jobbole.com/87310/
[5] http://python.jobbole.com/86069/
[6] https://www.ibm.com/developerworks/cn/opensource/os-cn-python-yield/
[7] https://www.cnblogs.com/zingp/p/5911537.html
本文由 dr0op 创作,采用 知识共享署名4.0 国际许可协议进行许可
本站文章除注明转载/出处外,均为本站原创或翻译,转载前请务必署名
最后编辑时间为: Jan 23, 2019 at 10:57 am