你所需要的,不仅仅是一个好用的代理。
协程,又称作 Coroutine 。从字面上来理解,即协同运行的例程,它是比是线程(thread)更细量级的用户态线程,特点是允许用户的主动调用和主动退出,挂起当前的例程然后返回值或去执行其他任务,接着返回原来停下的点继续执行。等下,这是否有点奇怪?我们都知道一般函数都是线性执行的,不可能说执行到一半返回,等会儿又跑到原来的地方继续执行。但一些熟悉python(or其他动态语言)的童鞋都知道这可以做到,答案是用yield语句。其实这里我们要感谢操作系统(OS)为我们做的工作,因为它具有getcontext和swapcontext这些特性,通过系统调用,我们可以把上下文和状态保存起来,切换到其他的上下文,这些特性为coroutine的实现提供了底层的基础。操作系统的Interrupts和Traps机制则为这种实现提供了可能性,因此它看起来可能是下面这样的:
理解生成器(generator)
学过生成器和迭代器的同学应该都知道python有yield这个关键字,yield能把一个函数变成一个generator,与return不同,yield在函数中返回值时会保存函数的状态,使下一次调用函数时会从上一次的状态继续执行,即从yield的下一条语句开始执行,这样做有许多好处,比如我们想要生成一个数列,若该数列的存储空间太大,而我们仅仅需要访问前面几个元素,那么yield就派上用场了,它实现了这种一边循环一边计算的机制,节省了存储空间,提高了运行效率。
这里以斐波那契数列为例:
def fib(max):
n, a, b = 0, 0, 1
while n max:
print b
a, b = b, a + b
n = n + 1
如果使用上述的算法,那么我每一次调用函数时,都要耗费大量时间循环做重复的事情。而如果使用yield的话,它则会生成一个generator,当我需要时,调用它的next方法获得下一个值,改动的方法很简单,直接把print改为yield就OK。
生产者-消费者的协程
#-*- coding:utf-8
def consumer():
status = True
while True:
n = yield status
print("我拿到了{}!".format(n))
if n == 3:
status = False
def producer(consumer):
n = 5
while n > 0:
# yield给主程序返回消费者的状态
yield consumer.send(n)
n -= 1
if __name__ == '__main__':
c = consumer()
c.send(None)
p = producer(c)
for status in p:
if status == False:
print("我只要3,4,5就行啦")
break
print("程序结束")
上面这个例子是典型的生产者-消费者问题,我们用协程的方式来实现它。首先从主程序中开始看,第一句c = consumer(),因为consumer函数中存在yield语句,python会把它当成一个generator(生成器,注意:生成器和协程的概念区别很大,千万别混淆了两者),因此在运行这条语句后,python并不会像执行函数一样,而是返回了一个generator object。
再看第二条语句c.send(None),这条语句的作用是将consumer(即变量c,它是一个generator)中的语句推进到第一个yield语句出现的位置,那么在例子中,consumer中的status = True和while True:都已经被执行了,程序停留在n = yield status的位置(注意:此时这条语句还没有被执行),上面说的send(None)语句十分重要,如果漏写这一句,那么程序直接报错,这个send()方法看上去似乎挺神奇,等下再讲它的作用。
下面第三句p = producer(c),这里则像上面一样定义了producer的生成器,注意的是这里我们传入了消费者的生成器,来让producer跟consumer通信。
第四句for status in p:,这条语句会循环地运行producer和获取它yield回来的状态。
好了,进入正题, 现在我们要让生产者发送1,2,3,4,5给消费者,消费者接受数字,返回状态给生产者,而我们的消费者只需要3,4,5就行了,当数字等于3时,会返回一个错误的状态。最终我们需要由主程序来监控生产者-消费者的过程状态,调度结束程序。
现在程序流进入了producer里面,我们直接看yield consumer.send(n),生产者调用了消费者的send()方法,把n发送给consumer(即c),在consumer中的n = yield status,n拿到的是消费者发送的数字,同时,consumer用yield的方式把状态(status)返回给消费者,注意:这时producer(即消费者)的consumer.send()调用返回的就是consumer中yield的status!消费者马上将status返回给调度它的主程序,主程序获取状态,判断是否错误,若错误,则终止循环,结束程序。上面看起来有点绕,其实这里面generator.send(n)的作用是:把n发送generator(生成器)中yield的赋值语句中,同时返回generator中yield的变量(结果)。
于是程序便一直运作,直至consumer中获取的n的值变为3!此时consumer把status变为False,最后返回到主程序,主程序中断循环,程序结束。
输出结果:
我拿到了5!
我拿到了4!
我拿到了3!
我只要3,4,5就行啦
程序结束
Coroutine与Generator
有些人会把生成器(generator)和协程(coroutine)的概念混淆,我以前也会这样,不过其实发现,两者的区别还是很大的。
直接上最重要的区别:
generator总是生成值,一般是迭代的序列
coroutine关注的是消耗值,是数据(data)的消费者
coroutine不会与迭代操作关联,而generator会
coroutine强调协同控制程序流,generator强调保存状态和产生数据
相似的是,它们都是不用return来实现重复调用的函数/对象,都用到了yield(中断/恢复)的方式来实现。
asyncio
asyncio是python 3.4中新增的模块,它提供了一种机制,使得你可以用协程(coroutines)、IO复用(multiplexing I/O)在单线程环境中编写并发模型。
根据官方说明,asyncio模块主要包括了:
具有特定系统实现的事件循环(event loop);
数据通讯和协议抽象(类似Twisted中的部分);
TCP,UDP,SSL,子进程管道,延迟调用和其他;
Future类;
yield from的支持;
同步的支持;
提供向线程池转移作业的接口;
下面来看下asyncio的一个例子:
import asyncio
async def compute(x, y):
print("Compute %s + %s ..." % (x, y))
await asyncio.sleep(1.0)
return x + y
async def print_sum(x, y):
result = await compute(x, y)
print("%s + %s = %s" % (x, y, result))
loop = asyncio.get_event_loop()
loop.run_until_complete(print_sum(1, 2))
loop.close()
当事件循环开始运行时,它会在Task中寻找coroutine来执行调度,因为事件循环注册了 print_sum() ,因此 print_sum() 被调用,执行 result = await compute(x, y) 这条语句(等同于 result = yield from compute(x, y) ),因为 compute() 自身就是一个coroutine,因此 print_sum() 这个协程就会暂时被挂起, compute() 被加入到事件循环中,程序流执行 compute() 中的print语句,打印”Compute %s + %s …”,然后执行了 await asyncio.sleep(1.0) ,因为 asyncio.sleep() 也是一个coroutine,接着 compute() 就会被挂起,等待计时器读秒,在这1秒的过程中,事件循环会在队列中查询可以被调度的coroutine,而因为此前 print_sum() 与 compute() 都被挂起了,因此事件循环会停下来等待协程的调度,当计时器读秒结束后,程序流便会返回到 compute() 中执行return语句,结果会返回到 print_sum() 中的result中,最后打印result,事件队列中没有可以调度的任务了,此时 loop.close() 把事件队列关闭,程序结束。
会JS的同学是不是感觉倍感亲切?没错, 事件驱动 模型就是异步编程的重中之重。
最后再通过一个例子,演示 事件驱动 模型的运作原理。
首先,我们用同步的方式,抓取baidu的一百个网页。
def sync_way():
for i in range(100):
sock = socket.socket()
sock.connect(('www.baidu.com', 80))
print('connected')
request = 'GET {} HTTP/1.0\r\nHost: www.baidu.com\r\n\r\n'.format('/s?wd={}'.format(i))
sock.send(request.encode('ascii'))
response = b''
chunk = sock.recv(4096)
while chunk:
response += chunk
chunk = sock.recv(4096)
print('done!!')
from time import time
start = time()
sync_way() #Cost 47.757508993148804 seconds
end = time()
print ('Cost {} seconds'.format(end - start))
总共耗时47秒,这对于一个要求性能的爬虫来说是不可接受的,看看我们有没有办法将这个爬虫的性能提高十倍以上,把时间缩短到5秒之内。
首先考虑上面这个程序的瓶颈出在哪个地方,经过思考,很容易看出上面的程序有几个不足之处:
socket连接的建立需要等待,一旦握手建立的时间漫长,就会影响下面的流程正常运行。
socket接收数据的过程是阻塞式的,等待buffer的过程也是需要一段时间的。
socket的建立连接-接收过程都是一个一个来的,在没完成一个连接时不能进行其他连接的处理。
好了,先解决第一个问题:socket的等待。痛点很明显,我们不能一直等待socket的状态发生改变,而是当socket的状态发生改变时,让它告诉我们。要解决这个问题,可以利用io复用,先看看io复用的定义:
IO复用:预先告知内核,使内核一旦发现进程指定的一个或多个IO条件就绪(输入准备被读取,或描述符能承接更多的输出),它就通知进程。
阻塞IO模型看起来是这样的:
recvfrom->无数据报准备好->等待数据->数据报准备好->数据从内核复制到用户空间->复制完成->返回成功指示
而IO复用模型看起来是这样的:
select->无数据报准备好->据报准备好->返回可读条件->recvfrom->数据从内核复制到用户空间->复制完成->返回成功指示
于是我们可以对上面的代码这样修改。
from selectors import DefaultSelector, EVENT_WRITE
selector = DefaultSelector()
sock = socket.socket()
sock.setblocking(False)
try:
sock.connect(('www.baidu.com', 80))
except BlockingIOError:
pass
def connected():
selector.unregister(sock.fileno())
print('connected!')
selector.register(sock.fileno(), EVENT_WRITE, connected)
把socket设置为非阻塞,把socket的句柄注册到事件轮询中,当socket发生可写事件时,表示socket连接就绪了,这时候再把socket从事件轮询中删除,在socket返回可写事件之前,系统都不是阻塞状态的。同理,对于socket从网络中接收数据,也可以用同样的方法,只需要把要监听的事件改为可读事件就行了。
当然,仅仅这样还是不够的,试想一下,如果有多个socket进行连接,采用上面的非阻塞方式,当一个socket开始等待事件返回时,理论上系统此时应该做的是处理另一个socket的流程,但这里还缺乏了一个必要的机制,当从一个处理socket流程切到另一个处理socket流程时,原来的流程的上下文状态该怎么保存下来以便恢复呢,显然易见这里需要用到上面说到的协程机制,在python中通过yield语法可以把一个函数或方法包装成一个生成器,当生成器执行yield语句时,生成器内部的上下文状态就会被保存,如果想要在未来的操作中把这个生成器恢复,只需要调用生成器的send方法即可从原流程中继续往下走。
有了上面这个概念,我们可以创建一个Future类,它代表了协程中等待的“未来发生的结果”,举例来说,在发起网络请求时,socket会在buffer中返回一些数据,这个获取的动作在异步流程中发生的时间是不确定的,Future就是用来封装这个未来结果的类,但当socket在某个时间段监测到可读事件,读取到数据了,那么他就会把数据写入Future里,并告知Future要执行某些回调动作。
class Future:
def __init__(self):
self.result = None
self._callbacks = []
def add_done_callback(self, fn):
self._callbacks.append(fn)
def set_result(self, result):
self.result = result
for callback in self._callbacks:
callback(self)
有了Future,我们可以包装一个AsyncRequest类,用以发起异步请求的操作。
class AsyncRequest:
def __init__(self, host, url, port, timeout=5):
self.sock = socket.socket()
self.sock.settimeout(timeout)
self.sock.setblocking(False)
self.host = host
self.url = url
self.port = port
self.method = None
def get(self):
self.method = 'GET'
self.request = '{} {} HTTP/1.0\r\nHost: {}\r\n\r\n'.format(self.method, self.url, self.host)
return self
def process(self):
if self.method is None:
self.get()
try:
self.sock.connect((self.host, self.port))
except BlockingIOError:
pass
self.f = Future()
selector.register(self.sock.fileno(),
EVENT_WRITE,
self.on_connected)
yield self.f
selector.unregister(self.sock.fileno())
self.sock.send(self.request.encode('ascii'))
chunk = yield from read_all(self.sock)
return chunk
def on_connected(self, key, mask):
self.f.set_result(None)
在AsyncRequest的process方法里,实例在发起异步连接请求后通过yield一个future阻断了程序流,表示他需要等待未来发生的动作发生(在这里是等待socket可写),这时候系统会去执行其他事件,当未来socket变成可写时,future被写入数据,同时执行回调,从原来停下的地方开始执行,执行读取socket数据的处理。
这里关键的地方就是future在yield之后会在未来某个时候再次被send然后继续往下走,这时候就需要一个用来驱动Future的类。这里称为Task,它需要接受一个协程作为参数,并驱动协程的程序流执行。
class Task(Future):
def __init__(self, coro):
super().__init__()
self.coro = coro
f = Future()
f.set_result(None)
self.step(f)
def step(self, future):
try:
next_future = self.coro.send(future.result)
if next_future is None:
return
except StopIteration as exc:
self.set_result(exc.value)
return
next_future.add_done_callback(self.step)
最终,整个程序还需要一个EventLoop类,用来监听到来的事件为socket执行回调以及把协程包装成Task来实现异步驱动。
class EventLoop:
stopped = False
select_timeout = 5
def run_until_complete(self, coros):
tasks = [Task(coro) for coro in coros]
try:
self.run_forever()
except StopError:
pass
def run_forever(self):
while not self.stopped:
events = selector.select(self.select_timeout)
if not events:
raise SelectTimeout('轮询超时')
for event_key, event_mask in events:
callback = event_key.data
callback(event_key, event_mask)
def close(self):
self.stopped = True
OK,那么现在用新的方法再测试一遍,通过python3的yield from语法我们把协程操作代理到AsyncRequest类的process方法中,最终把协程放到EventLoop中执行。
def fetch(url):
request = AsyncRequest('www.baidu.com', url, 80)
data = yield from request.process()
return data
def get_page(url):
page = yield from fetch(url)
return page
def async_way():
ev_loop = get_event_loop()
ev_loop.run_until_complete([
get_page('/s?wd={}'.format(i)) for i in range(100)
])
from time import time
start = time()
async_way() # Cost 3.534296989440918 seconds
end = time()
print ('Cost {} seconds'.format(end - start))
可以看到总共耗时3.5秒,通过把同步改写成基于事件驱动的异步,整个程序的效率提高的十倍以上。
有了上面的基础,可以更进一步改写出一个的任务队列的异步处理形式,把EventLoop的实现隐藏,提供更简单的接口。
from collections import deque
class Queue:
def __init__(self):
self._q = deque()
self.size = 0
def put(self, item):
self.size += 1
self._q.append(item)
def get(self):
item = self._q.popleft()
return item
def task_done(self):
self.size -= 1
if self.size == 0:
self.empty_callback()
class AsyncWorker(Queue):
def __init__(self, coroutine, workers=10, loop_timeout=5):
super().__init__()
self.func = coroutine
self.stopped = False
self.ev_loop = get_event_loop()
self.ev_loop.select_timeout = loop_timeout
self.workers = workers
self.result_callbacks = []
def work(self):
def _work():
while not self.stopped:
item = None
try:
item = self.get()
except IndexError:
yield None
result = yield from self.func(item)
self.task_done()
for callback in self.result_callbacks:
callback(result)
self.tasks = []
for _ in range(self.workers):
self.tasks.append(_work())
self.ev_loop.run_until_complete(self.tasks)
def add_result_callback(self, func):
self.result_callbacks.append(func)
def empty_callback(self):
self.ev_loop.close()
def print_content_length(data):
print(len(data))
async_worker = AsyncWorker(get_page, workers=20)
async_worker.add_result_callback(print_content_length)
for i in range(15):
async_worker.put('/s?wd={}'.format(i))
async_worker.work()
参考文献: