Python协程从入门到放弃到死亡到重生
in Python with 0 comment

Python协程从入门到放弃到死亡到重生

in Python with 0 comment

前言


协程这个概念很早就听说了,但一直处于了解阶段,个人感觉协程是未来IO密集型+异步处理的趋势,因为效果很明显。这篇文章记录自己学习协程的过程。

同步 异步 阻塞和非阻塞


同步(sync): 在发出一个功能调用时,在没有得到结果之前,改调用不返回。
异步(Async): 当一个异步过程功能调用发出后,调用者不能立刻得到结果。实际处理这个调用的部件在完成后,通过状态、通知和回调来通知调用者。
阻塞(Block): 阻塞调用是指调用结果返回之前,当前线程被挂起。(其线程进入非可执行状态,在这个状态下,CPU不会给线程分配时间片,即现场暂停运行,将计算资源让给其他活动线程,当I/O操作结束,改线程阻塞状态解除,重新变成活动线程,继续争夺CPU资源)函数只有在得到结果之后才返回。和同步调用不同的是,同步调用很多时候线程还是激活的,只是从逻辑上当前函数还没有返回而已。
非阻塞(Unblock):非阻塞和阻塞的概念相对应,指在不能立刻得到结果之前,该函数不会阻塞当前线程,而会立刻返回。

这里有一个形象的比喻:

如果小明点击下载按钮之后,就一直干瞪着进度条不做其他任何事情直到软件下载完成,这是同步阻塞;
如果小明点击下载按钮之后,就一直干瞪着进度条不做其他任何事情直到软件下载完成,但是软件下载完成其实是会「叮」的一声通知的(但小明依然那样干等着),这是异步阻塞;(不常见)
如果小明点击下载按钮之后,就去做其他事情了,不过他总需要时不时瞄一眼屏幕看软件是不是下载完成了,这是同步非阻塞;
如果小明点击下载按钮之后,就去做其他事情了,软件下载完之后「叮」的一声通知小明,小明再回来继续处理下载完的软件,这是异步非阻塞。

简单来说,区分同步和异步看是否有调用完的“通知”,阻塞和非阻塞看是否轮询。异步非阻塞 不去沦陷,只等待通知。

并行和并发

简单来说,并行是指两个或者多个事件在同一时刻发生,通常是当系统有一个以上CPU或CPU核心时,才有可能并行。两个或者多个事件、线程并步抢占CPU资源。而并发是指两个或者多个事件在同一时间间隔内发生,在一定时间间隔内,有多个程序在执行,但在同一时刻,只有一个程序在执行。

生成器和迭代器


通过一个简单的例子理解一下:
生成一个斐波那契數列,输出前N个:

def fab(max):
   n, a, b = 0, 0, 1
   L = []
   while n < max:
       L.append(b)
       a, b = b, a + b
       n = n + 1
   return L

这个代码看上去是没有问题的,但是当需要生成的数列数量非常大时,由于代码中将数字存储在list中,会导致巨大内存占用。能不能优化?

生成器:

如果这个元素可以通过某种方式推算出来切可以进行循环操作,就避免了大的内存占用。只需要函数在循环时计算得下一个数字并返回,这样就不必创建完整的list,从而节省大量空间。在Python'中,这种一边循环一边计算的机制,称为生成器:generator

class Fab(object):
 
   def __init__(self, max):
       self.max = max
       self.n, self.a, self.b = 0, 0, 1
 
   def __iter__(self):
       return self
 
   def next(self):
       if self.n < self.max:
           r = self.b
           self.a, self.b = self.b, self.a + self.b
           self.n = self.n + 1
           return r
       raise StopIteration()

上述代码通过类的形式将函数封装为一个可迭代对象。通过next方法在循环的时候每次去取一个数,只有在需要使用的时候才会生成,内存占用很小。但是,上述代码较为繁琐,在Python中,有一种语法糖能简化,那就是yield

yield语法糖

def fab(max):
    n, a, b = 0, 0, 1
    while n < max:
        yield b
        # print b
        a, b = b, a + b
        n = n + 1

调用和class版的完全一致,也可以使用next方法等。简单的说,yield的作用就是把一个函数变为一个generator,带有yield的函数不再是一个普通函数,Python解释器会将器视为generator。在for循环执行时,每次循环都会执行fab函数内部的代码,执行到yield时。函数就返回一个迭代值,下次迭代时,就从yield的下一句继续执行。调用next也是同理。
当函数执行结束时,会抛出StopIteration异常,表示迭代完成。

迭代器

Python中,一个可以被用来for循环的对象可统称为可迭代对象。可以用isinstance()判断一个对象是否为可迭代对象(Iterable)。生成器不但可以作用于for循环,还可以被next函数不断调用返回下一个值,最后抛出异常。而迭代器可以被next()函数调用并不断返回下一个值的对象称为迭代器(Iteratot)对象。

生成器和协程

从语法上讲,生成器是一个带yield语句的函数。协程,又称微线程纤程,英文Coroutine。先看看协程的概念:

协程通过允许多个入口点在某些位置暂停和恢复执行来概括用于非抢占式多任务的子程序。

从某些角度来理解,协程其实就是一个可以暂停执行的函数,并且可以恢复继续执行。我们知道,yield关键字已经可以暂停执行,如果在暂停后有方法把一些值发送回到暂停窒息的函数中,那么便就可以理解为协程。在Python PEP 342中,添加了“把东西发回已经暂停的生成器中”的方法,这个方法就是send()

生成器和协程都是通过pythonyield的关键字实现的,不同的是,生成器只调用next来不断的生成数据,而协程会调用nextsend来返回结果和接收参数。

def stupid_fib(n):
    index = 0
    a = 0
    b = 1
    while index < n:
        sleep_cnt = yield b
        print('let me think {0} secs'.format(sleep_cnt))
        time.sleep(sleep_cnt)
        a, b = b, a + b
        index += 1
print('-'*10 + 'test yield send' + '-'*10)
N = 20
sfib = stupid_fib(N)
fib_res = next(sfib)
while True:
    print(fib_res)
    try:
        fib_res = sfib.send(random.uniform(0, 0.5))
    except StopIteration:
        break

其中next(sfib)相当于sfib.send(None),可以使得sfib运行至第一个yield处返回。后续将一个随机的秒数发送给sfib,作为当前中断的yield表达式的返回值。
yield 表达式的作用包含了三个步骤:
1、 向函数外抛出值
2、 暂停,等待next()send()恢复。
3、 赋值,接受send发送过来的数据。
需要注意的是,在使用send(None)或者next()预激生成器函数,并执行到第一个yield语句结束的位置时,实际程序只执行了1、2两步,程序返回了值,并暂停,并没有执行第三步去赋值。在后续的循环中,才会进行赋值。

yield from

Python3.3出现了yield from语法,yield from item 表达式从item中获得迭代器。yield from 可以代替for 循环,使得代码更为精炼。yield from后面需要加的是可迭代对象。

def first_gen():
    for c in "AB":
        yield c
    for i in range(0, 3):
        yield i
 
print(list(first_gen()))

def second_gen():
    yield from "AB"
    yield from range(0, 3)

print(list(second_gen()))

结果:

['A', 'B', 0, 1, 2]
['A', 'B', 0, 1, 2]

yiled from后面加上一个生成器之后,就实现了生成的嵌套。实现生成器的嵌套,不一定要使用yield from ,但它可以让我们避免让自己处理各种料想不到的异常。如果自己去实现,会加大编码的难度。

yield from的主要功能是打开双向通道,把最外层的调用与最内层的子生成器连接起来,这样二者就可以直接发送和产出值,还可以直接穿入异常。

coloutine1.jpg

委派生成器在yied from表达式处暂停时,调用方可以直接把数据发给子生成器,子生成器再把产出值发给调用方,子生成器返回之后,解释器会抛出StopIteration异常。
委托生成器的作用就是:在调用方与子生成器之间建立一个双向通道。

为什么一定要使用yield from 语句呢:
在使用yiled from 语句时,语句为我们已经处理了很多的异常:

coloutine2.jpg

greenletgevent

greenlet可以实现协程,不过每一次都要人为去指向下一个该执行的协程。greenlet可以从一个协程切换到任意其他协程,但必须保证greenlet的正常结束,在协程之间的任意切换很容易出现问题。greeletStackless发展来的Cpython扩展包,greelet是底层实现了原生协程的C扩展库。

使用greenlet实现的生产者-消费者模型:

# 基于greenlet的生产者消费者协程
from greenlet import greenlet
import random
import time

def Producer():
    while True:
        item = random.randint(1, 10)
        print("生产<{}>中...".format(item))
        time.sleep(1)
        c.switch(item)  # 切换到消费者,并将item传入。


def Consumer():
    while True:
        item = p.switch()  # 切换到生产者。等待生产者传递参数item
        print("消费<{}>中..".format(item))

c = greenlet(Consumer)  # 将普通函数编程协程
p = greenlet(Producer)  # 同理
c.switch()  # 启动协程,Consumer先执行
"""
从consumer开始执行,执行到item=p.switch()时,程序切换到producer,并等待传参
producer得到执行权后,生成一个item,并往下执行代码
当producer执行到c.switch(item)时,程序携带传递的item切换到consumer,
consumer继续往下执行,直到下一次运行到p.switch时,交出执行权,切换到producer,重复以上过程
"""

greenlet的价值在于高性能的原生协程,且语义更加明确、显式切换,执行到switch时就切换程序
直接将函数包装成协程,可以保留原代码的风格

gevent

gevent是实现协程的第三方库,通过封装greenletepoll回调编程模式,生成器协程实现。当遇到IO操作时,就自动切换到其他协程,等到IO操作完成,再在适当的时候切换回来继续执行。gevent会自动切换协程。就保证总有协程在执行,而不是等待IO。由于切换实在IO操作时自动完成。所以gevent需要修改Python的自带的一些保准库,这一过程在启动时通过monkey patch完成。

"""
gevent: 通过greenlet实现协程,核心就是遇到IO操作,会自动切换到其他协程

"""
# 将python标准库中的一些阻塞操作变为非阻塞
from gevent import monkey;monkey.patch_all()
# 使用猴子补丁要写在第一行
import gevent

def test1():
    print("test1")
    gevent.sleep(0)  # 模拟耗时操作
    print("test11")

def test2():
    print("test2")
    gevent.sleep(0)  # 模拟耗时操作
    print("test22")

# g1 = gevent.spawn(test1)  # 将函数封装成协程,并启动
# g2 = gevent.spawn(test2)
# gevent.joinall([g1, g2])

"""
# joinall() 阻塞当前流程,执行给定的greenlet(列表中的对象),等待程序执行完
# spawn是启动协程,参数为函数名及其参数

运行结果:

test1
test2
test11
test22

代码执行test1,打印test1,遇到gevent.sleep(0)时切换程序,执行test2
test()执行,打印test2,执行到gevent.sleep(0)时切换程序
执行test1在gevent.sleep(0)后面的代码,直到再次遇到gevent时,切换程序
然后在test2中,继续执行gevent后的代码,直到遇到gevent时,再次切换
直到程序执行完毕

"""

gevent的价值在于它的使用基于epoll的libev来避开阻塞;使用基于gevent的高效协程,来切换执行
只在遇到阻塞的时候切换,没有轮询和线程开销。

asyncio.coroutine

Python3.4中加入了asyncio库,使得Python获得了事件循环的特性,但这个还是以生成器对象为基础,yield fromasyncio模块中很常用。通过asnyncio+生成器,我们可以实现这样一个异步的模型:

import asyncio

@asyncio.coroutine
def counttdown(number, n):
    while n > 0:
        print("T-minus", n, "({})".format(number))
        yield from asyncio.sleep(1)
        n -= 1

loop = asyncio.get_event_loop()
tasks = [
    asyncio.ensure_future(counttdown("A", 2)),
    asyncio.ensure_future(counttdown("B", 5)),
]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

这里asyncio.coroutine装饰器是用来标记这个函数是一个协程,因为asyncio要求所有用作协程的生成器必须由asyncio.coroutine装饰。这段代码中,事件循环会启动两个countdown() 协程,它们会一直执行,知道遇到yield from asyncio.sleep(),暂停执行,并将一个async.Future对象返回给事件循环。事件循环会监控这个asyncio.Future对象,一旦执行完成后,会将这个Future的执行结果返回给刚刚因为这个Future暂停的协程,并且继续执行原协程。

在上面的代码中,asyncio.sleep中,创建了一个Futrure对象,作为更内层的协程对象,通过yield from交给了事件循环,而Future是一个实现了__iter__对象的生成器。

@coroutine
def sleep(delay, result=None, *, loop=None):
    """Coroutine that completes after a given time (in seconds)."""
    future = futures.Future(loop=loop)
    h = future._loop.call_later(delay,
                                future._set_result_unless_cancelled, result)
    try:
        return (yield from future)
    finally:
        h.cancel()

 class Future:
#blabla...
    def __iter__(self):
        if not self.done():
            self._blocking = True
            yield self  # This tells Task to wait for completion.
        assert self.done(), "yield from wasn't used with future"
        return self.result()  # May raise too.

当协程yield from asyncio.sleep时,事件循环其实是与Future对象建立了联系。程序运行结果如下:

coroutine3.jpg

asyncawait

Python3.5中引入了asyncawait,可以将它们理解为asyncio.coroutine / yield from的完美替身,async/await让协程表面上独立于生成器而存在,将细节隐藏于asyncio模块之下。使用await可以针对耗时的操作进行挂起,类似于生成器里的yield一样,使函数让出控制权。协程遇到await,事件循环挂起该协程,直到其他协程也挂起或者执行完毕,再进行下一个协程的执行。耗时的操作一般是一些IO操作,如网络请求,文件读取等。这里可以使用asyncio.sleep来进行模拟举例:

import asyncio
import time

now = lambda: time.time()

async def do_some_work(x):
    print('Waiting: ', x)

    await asyncio.sleep(x)
    return 'Done after {}s'.format(x)

start = now()

coroutine1 = do_some_work(1)
coroutine2 = do_some_work(2)
coroutine3 = do_some_work(4)

tasks = [
    asyncio.ensure_future(coroutine1),
    asyncio.ensure_future(coroutine2),
    asyncio.ensure_future(coroutine3)
]

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

for task in tasks:
    print('Task ret: ', task.result())

print('TIME: ', now() - start)
Waiting:  1
Waiting:  2
Waiting:  4
Task ret:  Done after 1s
Task ret:  Done after 2s
Task ret:  Done after 4s
TIME:  4.003541946411133

sleep的时候,使用await让出控制权。当遇到阻塞调用的函数的时候,使用await方法将协程的控制权让出,以便loop调用其他的协程。

注意的区别是:await接受的对象必须是一个awaitable的对象,所谓awaitable的对象,就是一个实现了__await()__方法的对象,而且这个方法必须返回一个不是协程的迭代器。在Python3.6中,yieldawait也可以在同一个函数中使用,初次之外,也可以在列表推导等地方使用async forawait语法。

result = [i async for i in aiter() if i % 2]
result = [await func() for fun in funcs if await condition()]

async def test(x, y):
    for i in range(y):
        yield i
        await asyncio.sleep(x)

协程与异步

与多线程编程不同的是,多个协程总是运行在同一个线程中,一旦其中的一个协程发生阻塞行为,进而所有的协程都无法继续运行。例如在我们进行爬虫编写时,习惯使用requests库,而这个库就是阻塞的。尝试使用协程的方式进行编写:

import asyncio
import requests
import time
 
start = time.time()
 
async def get(url):
    return requests.get(url)
 
async def request():
    url = 'http://127.0.0.1:5000'
    print('Waiting for', url)
    response = await get(url)
    print('Get response from', url, 'Result:', response.text)
 
tasks = [asyncio.ensure_future(request()) for _ in range(5)]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
 
end = time.time()
print('Cost time:', end - start)
Waiting for http://127.0.0.1:5000
Get response from http://127.0.0.1:5000 Result: Hello!
Waiting for http://127.0.0.1:5000
Get response from http://127.0.0.1:5000 Result: Hello!
Waiting for http://127.0.0.1:5000
Get response from http://127.0.0.1:5000 Result: Hello!
Waiting for http://127.0.0.1:5000
Get response from http://127.0.0.1:5000 Result: Hello!
Waiting for http://127.0.0.1:5000
Get response from http://127.0.0.1:5000 Result: Hello!
Cost time: 15.134317874908447

而不使用协程,使用普通方式,也是这个时间。为什么会这样呢,究其原因是requests并不支持异步操作。在运行时阻塞并未挂起。另外await后面所跟的对象必须是:一个原生coroutine对象,一个由types.coroutine装饰的生成器,这个生成器可以返回coroutine对象。而requests返回的对象不符合上述条件。为了程序运行不报错,上面代码在await时对requsts进行了一次async函数的包装,但是它并不是“原生的coroutine对象”,因此也就不能真正异步了。
可以通过使用实现了异步的aiohttp或者Trip库改写上述爬虫。

import asyncio
import time

import aiohttp

from spider_normal import targets, show_results

final_results = {}

async def get_content(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as resp:
            content = await resp.read()
            return len(content)

async def spider(url):
    length = await get_content(url)
    final_results[url] = length
    return True

def main():
    loop = asyncio.get_event_loop()
    cor = [spider(url) for url in targets]
    start_time = time.time()
    result = loop.run_until_complete(asyncio.gather(*cor))
    print("Use time: {:.2f}s".format(time.time() - start_time))
    show_results(final_results)
    print("loop result: ", result)

if __name__ == '__main__':
    main()

可以看到,运行时间大幅度降低。

coroutine4.jpg

总结


这篇文章记录自己对协程的一些简单理解和认知,某些细节较为简浅粗糙,协程目前在见到的项目中解除不多,更多的是使用多进程或多线程的方式去处理。通过这篇文章,可以看到,协程相对我们熟悉的线程,多进行还是有点难理解的,特别是协程内部处理以及异步操作原理,逻辑较为分散。希望以后的项目中能用到协程去处理问题。

REFERENCE


[1] https://cuiqingcai.com/6160.html
[2] https://bbs.ichunqiu.com/forum.php?mod=viewthread&tid=41931&highlight=%E5%8D%8F%E7%A8%8B
[3] https://bbs.ichunqiu.com/forum.php?mod=viewthread&tid=41930&highlight=%E5%8D%8F%E7%A8%8B
[4] http://python.jobbole.com/87310/
[5] http://python.jobbole.com/86069/
[6] https://www.ibm.com/developerworks/cn/opensource/os-cn-python-yield/
[7] https://www.cnblogs.com/zingp/p/5911537.html

Responses