阿布云

你所需要的,不仅仅是一个好用的代理。

Python的异步编程,从无到有(二)

阿布云 发表于

30.png

什么是协程?

协程,又称作 Coroutine 。从字面上来理解,即协同运行的例程,它是比是线程(thread)更细量级的用户态线程,特点是允许用户的主动调用和主动退出,挂起当前的例程然后返回值或去执行其他任务,接着返回原来停下的点继续执行。等下,这是否有点奇怪?我们都知道一般函数都是线性执行的,不可能说执行到一半返回,等会儿又跑到原来的地方继续执行。但一些熟悉python(or其他动态语言)的童鞋都知道这可以做到,答案是用yield语句。其实这里我们要感谢操作系统(OS)为我们做的工作,因为它具有getcontext和swapcontext这些特性,通过系统调用,我们可以把上下文和状态保存起来,切换到其他的上下文,这些特性为coroutine的实现提供了底层的基础。操作系统的Interrupts和Traps机制则为这种实现提供了可能性,因此它看起来可能是下面这样的:

1.png

理解生成器(generator)

学过生成器和迭代器的同学应该都知道python有yield这个关键字,yield能把一个函数变成一个generator,与return不同,yield在函数中返回值时会保存函数的状态,使下一次调用函数时会从上一次的状态继续执行,即从yield的下一条语句开始执行,这样做有许多好处,比如我们想要生成一个数列,若该数列的存储空间太大,而我们仅仅需要访问前面几个元素,那么yield就派上用场了,它实现了这种一边循环一边计算的机制,节省了存储空间,提高了运行效率。

这里以斐波那契数列为例:

def fib(max):

    n, a, b = 0, 0, 1

    while n  max:

        print b

        a, b = b, a + b

        n = n + 1

如果使用上述的算法,那么我每一次调用函数时,都要耗费大量时间循环做重复的事情。而如果使用yield的话,它则会生成一个generator,当我需要时,调用它的next方法获得下一个值,改动的方法很简单,直接把print改为yield就OK。

生产者-消费者的协程

#-*- coding:utf-8

def consumer():

    status = True

    while True:

        n = yield status

        print("我拿到了{}!".format(n))

        if n == 3:

            status = False

 

def producer(consumer):

    n = 5

    while n > 0:

    # yield给主程序返回消费者的状态

        yield consumer.send(n)

        n -= 1

 

if __name__ == '__main__':

    c = consumer()

    c.send(None)

    p = producer(c)

    for status in p:

        if status == False:

            print("我只要3,4,5就行啦")

            break

    print("程序结束")

上面这个例子是典型的生产者-消费者问题,我们用协程的方式来实现它。首先从主程序中开始看,第一句c = consumer(),因为consumer函数中存在yield语句,python会把它当成一个generator(生成器,注意:生成器和协程的概念区别很大,千万别混淆了两者),因此在运行这条语句后,python并不会像执行函数一样,而是返回了一个generator object。

再看第二条语句c.send(None),这条语句的作用是将consumer(即变量c,它是一个generator)中的语句推进到第一个yield语句出现的位置,那么在例子中,consumer中的status = True和while True:都已经被执行了,程序停留在n = yield status的位置(注意:此时这条语句还没有被执行),上面说的send(None)语句十分重要,如果漏写这一句,那么程序直接报错,这个send()方法看上去似乎挺神奇,等下再讲它的作用。

下面第三句p = producer(c),这里则像上面一样定义了producer的生成器,注意的是这里我们传入了消费者的生成器,来让producer跟consumer通信。

第四句for status in p:,这条语句会循环地运行producer和获取它yield回来的状态。

好了,进入正题, 现在我们要让生产者发送1,2,3,4,5给消费者,消费者接受数字,返回状态给生产者,而我们的消费者只需要3,4,5就行了,当数字等于3时,会返回一个错误的状态。最终我们需要由主程序来监控生产者-消费者的过程状态,调度结束程序。

现在程序流进入了producer里面,我们直接看yield consumer.send(n),生产者调用了消费者的send()方法,把n发送给consumer(即c),在consumer中的n = yield status,n拿到的是消费者发送的数字,同时,consumer用yield的方式把状态(status)返回给消费者,注意:这时producer(即消费者)的consumer.send()调用返回的就是consumer中yield的status!消费者马上将status返回给调度它的主程序,主程序获取状态,判断是否错误,若错误,则终止循环,结束程序。上面看起来有点绕,其实这里面generator.send(n)的作用是:把n发送generator(生成器)中yield的赋值语句中,同时返回generator中yield的变量(结果)。

于是程序便一直运作,直至consumer中获取的n的值变为3!此时consumer把status变为False,最后返回到主程序,主程序中断循环,程序结束。

输出结果:

我拿到了5!

我拿到了4!

我拿到了3!

我只要3,4,5就行啦

程序结束

Coroutine与Generator

有些人会把生成器(generator)和协程(coroutine)的概念混淆,我以前也会这样,不过其实发现,两者的区别还是很大的。

直接上最重要的区别:

  • generator总是生成值,一般是迭代的序列

  • coroutine关注的是消耗值,是数据(data)的消费者

  • coroutine不会与迭代操作关联,而generator会

  • coroutine强调协同控制程序流,generator强调保存状态和产生数据

相似的是,它们都是不用return来实现重复调用的函数/对象,都用到了yield(中断/恢复)的方式来实现。

asyncio

asyncio是python 3.4中新增的模块,它提供了一种机制,使得你可以用协程(coroutines)、IO复用(multiplexing I/O)在单线程环境中编写并发模型。

根据官方说明,asyncio模块主要包括了:

  • 具有特定系统实现的事件循环(event loop);

  • 数据通讯和协议抽象(类似Twisted中的部分);

  • TCP,UDP,SSL,子进程管道,延迟调用和其他;

  • Future类;

  • yield from的支持;

  • 同步的支持;

  • 提供向线程池转移作业的接口;

下面来看下asyncio的一个例子:

import asyncio

 

async def compute(x, y):

    print("Compute %s + %s ..." % (x, y))

    await asyncio.sleep(1.0)

    return x + y

 

async def print_sum(x, y):

    result = await compute(x, y)

    print("%s + %s = %s" % (x, y, result))

 

loop = asyncio.get_event_loop()

loop.run_until_complete(print_sum(1, 2))

loop.close()

2.png

当事件循环开始运行时,它会在Task中寻找coroutine来执行调度,因为事件循环注册了 print_sum() ,因此 print_sum() 被调用,执行 result = await compute(x, y) 这条语句(等同于 result = yield from compute(x, y) ),因为 compute() 自身就是一个coroutine,因此 print_sum() 这个协程就会暂时被挂起, compute() 被加入到事件循环中,程序流执行 compute() 中的print语句,打印”Compute %s + %s …”,然后执行了 await asyncio.sleep(1.0) ,因为 asyncio.sleep() 也是一个coroutine,接着 compute() 就会被挂起,等待计时器读秒,在这1秒的过程中,事件循环会在队列中查询可以被调度的coroutine,而因为此前 print_sum() 与 compute() 都被挂起了,因此事件循环会停下来等待协程的调度,当计时器读秒结束后,程序流便会返回到 compute() 中执行return语句,结果会返回到 print_sum() 中的result中,最后打印result,事件队列中没有可以调度的任务了,此时 loop.close() 把事件队列关闭,程序结束。

会JS的同学是不是感觉倍感亲切?没错, 事件驱动 模型就是异步编程的重中之重。

最后再通过一个例子,演示 事件驱动 模型的运作原理。

首先,我们用同步的方式,抓取baidu的一百个网页。

def sync_way():

    for i in range(100):

        sock = socket.socket()

        sock.connect(('www.baidu.com', 80))

        print('connected')

        request = 'GET {} HTTP/1.0\r\nHost: www.baidu.com\r\n\r\n'.format('/s?wd={}'.format(i))

        sock.send(request.encode('ascii'))

        response = b''

        chunk = sock.recv(4096)

        while chunk:

            response += chunk

            chunk = sock.recv(4096)

        print('done!!')

 

from time import time

start = time()

 

sync_way()  #Cost 47.757508993148804 seconds

 

end = time()

print ('Cost {} seconds'.format(end - start))

总共耗时47秒,这对于一个要求性能的爬虫来说是不可接受的,看看我们有没有办法将这个爬虫的性能提高十倍以上,把时间缩短到5秒之内。

首先考虑上面这个程序的瓶颈出在哪个地方,经过思考,很容易看出上面的程序有几个不足之处:

  1. socket连接的建立需要等待,一旦握手建立的时间漫长,就会影响下面的流程正常运行。

  2. socket接收数据的过程是阻塞式的,等待buffer的过程也是需要一段时间的。

  3. socket的建立连接-接收过程都是一个一个来的,在没完成一个连接时不能进行其他连接的处理。

好了,先解决第一个问题:socket的等待。痛点很明显,我们不能一直等待socket的状态发生改变,而是当socket的状态发生改变时,让它告诉我们。要解决这个问题,可以利用io复用,先看看io复用的定义:

IO复用:预先告知内核,使内核一旦发现进程指定的一个或多个IO条件就绪(输入准备被读取,或描述符能承接更多的输出),它就通知进程。

阻塞IO模型看起来是这样的:

recvfrom->无数据报准备好->等待数据->数据报准备好->数据从内核复制到用户空间->复制完成->返回成功指示

而IO复用模型看起来是这样的:

select->无数据报准备好->据报准备好->返回可读条件->recvfrom->数据从内核复制到用户空间->复制完成->返回成功指示

于是我们可以对上面的代码这样修改。

from selectors import DefaultSelector, EVENT_WRITE

 

selector = DefaultSelector()

 

sock = socket.socket()

sock.setblocking(False)

try:

    sock.connect(('www.baidu.com', 80))

except BlockingIOError:

    pass

 

def connected():

    selector.unregister(sock.fileno())

    print('connected!')

    

selector.register(sock.fileno(), EVENT_WRITE, connected)

把socket设置为非阻塞,把socket的句柄注册到事件轮询中,当socket发生可写事件时,表示socket连接就绪了,这时候再把socket从事件轮询中删除,在socket返回可写事件之前,系统都不是阻塞状态的。同理,对于socket从网络中接收数据,也可以用同样的方法,只需要把要监听的事件改为可读事件就行了。

当然,仅仅这样还是不够的,试想一下,如果有多个socket进行连接,采用上面的非阻塞方式,当一个socket开始等待事件返回时,理论上系统此时应该做的是处理另一个socket的流程,但这里还缺乏了一个必要的机制,当从一个处理socket流程切到另一个处理socket流程时,原来的流程的上下文状态该怎么保存下来以便恢复呢,显然易见这里需要用到上面说到的协程机制,在python中通过yield语法可以把一个函数或方法包装成一个生成器,当生成器执行yield语句时,生成器内部的上下文状态就会被保存,如果想要在未来的操作中把这个生成器恢复,只需要调用生成器的send方法即可从原流程中继续往下走。

有了上面这个概念,我们可以创建一个Future类,它代表了协程中等待的“未来发生的结果”,举例来说,在发起网络请求时,socket会在buffer中返回一些数据,这个获取的动作在异步流程中发生的时间是不确定的,Future就是用来封装这个未来结果的类,但当socket在某个时间段监测到可读事件,读取到数据了,那么他就会把数据写入Future里,并告知Future要执行某些回调动作。

class Future:

    def __init__(self):

        self.result = None

        self._callbacks = []

 

    def add_done_callback(self, fn):

        self._callbacks.append(fn)

 

    def set_result(self, result):

        self.result = result

        for callback in self._callbacks:

            callback(self)

有了Future,我们可以包装一个AsyncRequest类,用以发起异步请求的操作。

 

class AsyncRequest:

    def __init__(self, host, url, port, timeout=5):

        self.sock = socket.socket()

        self.sock.settimeout(timeout)

        self.sock.setblocking(False)

        self.host = host

        self.url = url

        self.port = port

        self.method = None

 

    def get(self):

        self.method = 'GET'

        self.request = '{} {} HTTP/1.0\r\nHost: {}\r\n\r\n'.format(self.method, self.url, self.host)

        return self

 

    def process(self):

        if self.method is None:

            self.get()

        try:

            self.sock.connect((self.host, self.port))

        except BlockingIOError:

            pass

        self.f = Future()

        selector.register(self.sock.fileno(),

                      EVENT_WRITE,

                      self.on_connected)

        yield self.f

        selector.unregister(self.sock.fileno())

 

        self.sock.send(self.request.encode('ascii'))

 

        chunk = yield from read_all(self.sock)

        return chunk

 

    def on_connected(self, key, mask):

        self.f.set_result(None)

在AsyncRequest的process方法里,实例在发起异步连接请求后通过yield一个future阻断了程序流,表示他需要等待未来发生的动作发生(在这里是等待socket可写),这时候系统会去执行其他事件,当未来socket变成可写时,future被写入数据,同时执行回调,从原来停下的地方开始执行,执行读取socket数据的处理。

这里关键的地方就是future在yield之后会在未来某个时候再次被send然后继续往下走,这时候就需要一个用来驱动Future的类。这里称为Task,它需要接受一个协程作为参数,并驱动协程的程序流执行。

class Task(Future):

    def __init__(self, coro):

        super().__init__()

        self.coro = coro

        f = Future()

        f.set_result(None)

        self.step(f)

 

    def step(self, future):

        try:

            next_future = self.coro.send(future.result)

            if next_future is None:

                return

        except StopIteration as exc:

            self.set_result(exc.value)

            return

        next_future.add_done_callback(self.step)

最终,整个程序还需要一个EventLoop类,用来监听到来的事件为socket执行回调以及把协程包装成Task来实现异步驱动。

 

class EventLoop:

    stopped = False

    select_timeout = 5

 

    def run_until_complete(self, coros):

        tasks = [Task(coro) for coro in coros]

        try:

            self.run_forever()

        except StopError:

            pass

 

    def run_forever(self):

        while not self.stopped:

            events = selector.select(self.select_timeout)

            if not events:

                raise SelectTimeout('轮询超时')

            for event_key, event_mask in events:

                callback = event_key.data

                callback(event_key, event_mask)

 

    def close(self):

        self.stopped = True

OK,那么现在用新的方法再测试一遍,通过python3的yield from语法我们把协程操作代理到AsyncRequest类的process方法中,最终把协程放到EventLoop中执行。

 

def fetch(url):

    request = AsyncRequest('www.baidu.com', url, 80)

    data = yield from request.process()

    return data

 

def get_page(url):

    page = yield from fetch(url)

    return page

 

def async_way():

    ev_loop = get_event_loop()

    ev_loop.run_until_complete([

        get_page('/s?wd={}'.format(i)) for i in range(100)

    ])

 

from time import time

start = time()

 

async_way() # Cost 3.534296989440918 seconds

 

end = time()

print ('Cost {} seconds'.format(end - start))

可以看到总共耗时3.5秒,通过把同步改写成基于事件驱动的异步,整个程序的效率提高的十倍以上。

 

有了上面的基础,可以更进一步改写出一个的任务队列的异步处理形式,把EventLoop的实现隐藏,提供更简单的接口。

 

from collections import deque

 

class Queue:

    def __init__(self):

        self._q = deque()

        self.size = 0

 

    def put(self, item):

        self.size += 1

        self._q.append(item)

 

    def get(self):

        item = self._q.popleft()

        return item

 

    def task_done(self):

        self.size -= 1

        if self.size == 0:

            self.empty_callback()

 

class AsyncWorker(Queue):

    def __init__(self, coroutine, workers=10, loop_timeout=5):

        super().__init__()

        self.func = coroutine

        self.stopped = False

        self.ev_loop = get_event_loop()

        self.ev_loop.select_timeout = loop_timeout

        self.workers = workers

        self.result_callbacks = []

 

    def work(self):

        def _work():

            while not self.stopped:

                item = None

                try:

                    item = self.get()

                except IndexError:

                    yield None

                result = yield from self.func(item)

                self.task_done()

                for callback in self.result_callbacks:

                    callback(result)

        self.tasks = []

        for _ in range(self.workers):

            self.tasks.append(_work())

        self.ev_loop.run_until_complete(self.tasks)

 

    def add_result_callback(self, func):

        self.result_callbacks.append(func)

 

    def empty_callback(self):

        self.ev_loop.close()

 

def print_content_length(data):

    print(len(data))

 

async_worker = AsyncWorker(get_page, workers=20)

async_worker.add_result_callback(print_content_length)

for i in range(15):

    async_worker.put('/s?wd={}'.format(i))

async_worker.work()

参考文献:

A Web Crawler With asyncio Coroutines