你所需要的,不仅仅是一个好用的代理。
协程,又称微线程,纤程。英文名Coroutine。一句话说明什么是协程,协程是一种用户态的轻量级线程。
协程拥有自己的寄存器上下文和栈。协程调度切换时,将寄存器上下文和栈保存到其他地方,在切换回来的时候,恢复先前保存的寄存器上下文和栈。因此,协程能保留上一次调用的状态(即所有局部状态的一个特定组合),每次过程重入时,就相当于进入上一次调用的状态,换种说法,进入上一次离开时所处逻辑流的位置。
子程序,或者称为函数,在所有语言中都是层级调用,比如A调用B,B在执行过程中又调用了C,C执行完毕返回,B执行完毕返回,最后A执行完毕。
所以子程序调用时通过栈实现的,一个线程就是执行一个子程序。子程序调用总是一个入口,一次返回,调用顺序是明确的。而协程的调用和子程序不同。
协程看上去也是子程序,但执行过程中,在子程序内部可中断,然后转而执行别的子程序,在适当的时候再返回来接着执行。
注意,在一个子程序中中断,去执行其他子程序,不是函数调用,有点类似CPU的中断。比如子程序A、B:
def a():
print(" 1 ")
print(" 2 ")
print(" 3 ")
def b():
print(" x ")
print(" y ")
print(" z ")
假设由程序执行,在执行A的过程中,可以随时中断,去执行B,B也可能在执行过程中中断再去执行A,结果可能是:
1
2
x
y
3
z
但是在A中是没有调用B的,所以协程的调用比函数调用理解起来要难一些。看起来A、B的执行有点像多线程,但协程的特点在是一个线程执行,和多线程比协程有何优势?
最大的优势就是协程极高的执行效率。因为子程序切换不是线程切换,而是有程序自身控制,因此,没有线程切换的开销,和多线程比,线程数量越多,协程的性能优势就越明显。
第二大优势就是不需要多线程的锁机制,因为只有一个线程,也不存在同时写变量冲突,在协程中控制共享资源不加锁,只需要判断状态就好了,所以执行效率比多线程高很多。
因为协程是一个线程执行,那么怎么利用多核CPU呢?最简单的方法是多进程加协程,既充分利用多核,有充分发挥协程的高效率,可获得极高的性能。
协程的优点:
无需线程上下文切换的开销。
无需原子操作锁定及同步的开销。原子操作(atomic operation)是不需要synchronized,所谓原子操作是指不会被线程调度机制打断的操作;这种操作一旦开始,就一直运行到结束,中间不会有任何context switch(切换到另一个线程)。原子操作可以是一个步骤,也可以是多个操作步骤,但是其顺序是不可以被打乱,或者切割掉只执行部分。视作整体是原子性的核心。
方便切换控制流,简化编程模型。
高并发+高扩展性+低成本。一个CPU支持上万的协程都不是问题,所以很适合用于高并发处理。
协程的缺点:
无法利用多核资源。协程的本质是个单线程,它不能同时将单个CPU的多个核用上,协程需要和进程配合才能运行在多CPU上。当然我们日常所编写的绝大部分应用都没有这个必要,除非是CPU密集型应用。
进行阻塞(Blocking)操作(如IO时)会阻塞掉整个程序。
使用yield实现协程操作。
import time,queue
def consumer(name):
print(" -->starting eating xoxo ")
while True:
new_xo = yield
print(" %s is eating xoxo %s "%(name,new_xo))
def producer():
r = con.__next__()
r = con2.__next__()
n = 0
while n < 5:
n += 1
con.send(n)
con2.send(n)
print(" \033[32;1mproducer\033[0m is making xoxo %s "%n)
if __name__ == " __main__ ":
con = consumer(" c1 ")
con2 = consumer(" c2 ")
p = producer()
输出:
-->starting eating xoxo
-->starting eating xoxo
c1 is eating xoxo 1
c2 is eating xoxo 1
producer is making xoxo 1
c1 is eating xoxo 2
c2 is eating xoxo 2
producer is making xoxo 2
c1 is eating xoxo 3
c2 is eating xoxo 3
producer is making xoxo 3
c1 is eating xoxo 4
c2 is eating xoxo 4
producer is making xoxo 4
c1 is eating xoxo 5
c2 is eating xoxo 5
producer is making xoxo 5
协程的特点:
1、必须在只有一个单线程里实现并发。
2、修改共享数据不需加锁。
3、用户程序里自己保持多个控制流的上下文栈。
4、一个协程遇到IO操作自动切换到其它协程。
刚才yield实现的不能算是合格的协程。
Python对协程的支持是通过generator实现的。在generator中,我们不但可以通过for循环来迭代,还可以不断调用next()函数获取由yield语句返回到下一个值。但是python的yield不但可以返回一个值,它可以接收调用者发出的参数。
greenlet是一个用C实现的协程模块,相比于Python自带的yield,它可以在任意函数之间随意切换,而不需把这个函数声明为generator。
from greenlet import greenlet
def f1():
print(11)
gr2. switch ()
print(22)
gr2. switch ()
def f2():
print(33)
gr1. switch ()
print(44)
gr1 = greenlet(f1)
gr2 = greenlet(f2)
gr1. switch ()
输出:
11
33
22
44
以上例子还有一个问题没有解决,就是遇到IO操作自动切换。
Gevent是一个第三方库,可以轻松提供gevent实现并发同步或异步编程,在gevent中用到的主要模式是Greenlet,它是以C扩展模块形式接入Python的轻量级协程。Greenlet全部运行在主程序操作系统进程的内部,但它们被协作式地调度。
import gevent
def foo():
print(" Running in foo ")
gevent.sleep()
print(" Explicit contenxt switch to foo agin ")
def bar():
print(" Explicit context to bar ")
gevent.sleep(1)
print(" Implict context switch back to bar ")
def func3():
print(" running func3 ")
gevent.sleep(0)
print(" running func3 again ")
gevent.joinall([
gevent.spawn(foo),
gevent.spawn(bar),
gevent.spawn(func3),
])
输出:
Running in foo
Explicit context to bar
running func3
Explicit contenxt switch to foo agin
running func3 again
Implict context switch back to bar
import gevent
def f1(pid):
gevent.sleep(0.5)
print(" F1 %s done "%pid)
def f2():
for i in range(10):
f1(i)
def f3():
threads = [gevent.spawn(f1,i) for i in range(10)]
gevent.joinall(threads)
print(" f2 ")
f2()
print(" f3 ")
f3()
输出:
f2
F1 0 done
F1 1 done
F1 2 done
F1 3 done
F1 4 done
F1 5 done
F1 6 done
F1 7 done
F1 8 done
F1 9 done
f3
F1 0 done
F1 4 done
F1 8 done
F1 7 done
F1 6 done
F1 5 done
F1 1 done
F1 3 done
F1 2 done
F1 9 done
上面程序的重要部分是将f1函数封装到Greenlet内部线程的gevent.spawn。初始化的greenlet列表存放在数组threads中,此数组被传给gevent.joinall函数,后者阻塞当前流程,并执行所有给定的greenlet。执行流程只会在所有greenlet执行完后才会继续向下走。
from urllib import request
import gevent,time
from gevent import monkey
# 把当前程序的所有的id操作给单独的做上标记
monkey.patch_all()
def f(url):
print(" GET:%s "%url)
resp = request.urlopen(url)
data = resp.read()
f = open(" load.txt "," wb ")
f.write(data)
f.close()
print(" %d bytes received from %s. "%(len(data),url))
urls = ['https: //www.python.org/',
'http: //www.cnblogs.com/yinshoucheng-golden/',
'https: //github.com/']
time_start = time.time()
for url in urls:
f(url)
print(" 同步cost ",time.time() - time_start)
async_time_start = time.time()
gevent.joinall([
gevent.spawn(f,'https: //www.python.org/'),
gevent.spawn(f,'http: //www.cnblogs.com/yinshoucheng-golden/'),
gevent.spawn(f,'https: //github.com/'),
])
print(" 异步cost ",time.time() - async_time_start)
import sys,socket,time,gevent
from gevent import socket,monkey
monkey.patch_all()
def server(port):
s = socket.socket()
s.bind((" 0.0.0.0 ",port))
s.listen(500)
while True:
cli,addr = s.accept()
gevent.spawn(handle_request,cli)
def handle_request(conn):
try :
while True:
data = conn.recv(1024)
print(" recv: ",data)
if not data:
conn.shutdown(socket.SHUT_WR)
conn.send(data)
except Exception as ex:
print(ex)
finally :
conn.close()
if __name__ == " __main__ ":
server(6969)
import socket
HOST = " localhost "
PORT = 6969
s = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
s.connect((HOST,PORT))
while True:
msg = bytes(input(" >>: "),encoding=" utf8 ")
s.sendall(msg)
data = s.recv(1024)
# print(data)
print(" Received ",repr(data))
s.close()
import socket,threading
def sock_conn():
client = socket.socket()
client.connect((" localhost ",6969))
count = 0
while True:
client.send((" hello %s "%count).encode(" utf-8 "))
data = client.recv(1024)
print(" %s from server:%s "%(threading.get_ident(),data.decode()))
count += 1
client.close()
for i in range(100):
t = threading.Thread(target=sock_conn)
t.start()