协程Coroutine的概念
协程,多线程,多进程,都是用来实现并发的.
由于现代操作系统的时间片原理,可以将这个概念抽象来看,只要能够满足保存上下文,在中断的地方回来执行的程序,都可以用来实现并发.所以并发的本质就是切换+保存状态,只要能够做到这两点,都可以实现并发.
在单线程内部,不可避免的会有IO操作,如果我们的程序能够编写的让单线程在出现IO操作的时候,转到另外的计算部分继续操作,对于操作系统来讲,我这一个线程就会一直处在不被IO阻塞而就绪进行计算的状态,这也是提高效率的一种办法.
协程的本质就是在单线程下,由用户自己控制一个任务遇到io阻塞了就切换另外一个任务去执行,以此来提升效率。为了实现它,我们需要找寻一种可以同时满足以下条件的解决方案:
#1. 可以控制多个任务之间的切换,切换之前将任务的状态保存下来,以便重新运行时,可以基于暂停的位置继续执行。
#2. 作为1的补充:可以检测io操作,在遇到io操作的情况下才发生切换
之前学过生成器写的生产者消费者模型就是单线程内实现并发,让两个程序来回切换.如果函数用yield来接收参数然后处理,就类似一个协程.而且由于第一个yield无法接收,必须先用内置函数next()启动生成器,这就像多线程和多进程的.start()方法一样,启动一个协程.
# yield模拟协程,实现一个查找功能的例子: def find(pattern): while True: search = yield if pattern in search: print('found {} in {}'.format(pattern, search)) else: print('Not searched') searchlist = ['cony', 'momisjenny', 'testofcony', "cony's summer", 'minkolovejenny'] a = find('jenny') # 传入要查找的内容 next(a) # 启动协程 for i in searchlist: # 协程等待接收传入的数据,传入数据如果包含要查找的内容就打印找到,否则就打印找不到. a.send(i)
可以发现,启动生成器之后,生成器像一个真的线程一样一直在提供服务;主程序不需要使用这个功能模块的时候,find函数完全不起作用,需要用到的时候,只要传入查找内容就可以返回结果,就像去另外一个线程或者进程请求结果一样.但实际上,还是在一个单线程内执行,如果给协程里边加上IO操作(比如添加一个input语句),协程会阻塞,这时候整个线程依然会阻塞,并不像真正的协程,究其原因还是因为yield是无法在自身发生阻塞的情况下跳出的.这就需要用一些模块来帮助在单线程内遇到IO的时候切换执行内容.
greenlet模块
greenlet 用于在程序之间切换
from greenlet import greenlet import time def eat(): print('start eating') time.sleep(1) g2.switch() print('end eating') g2.switch() def play(): print('start playing') time.sleep(1) g1.switch() print('end playing') g1 = greenlet(eat) g2 = greenlet(play) g1.switch()
将两个函数都交给greenlet去代理,得到两个对象g1和g2,之后先用一个switch()方法启动g1,就会从头执行eat(),然后执行到g2.switch()的时候会保存一个断点,再到g2执行,第一次跳入g2的时候也是从头开始执行play(),遇到跳回g1的时候,保存这里的断点,然后返回g1上次的断点继续往下执行,又遇到g2.switch(),返回g2的断点,继续执行.这里的例子比较简单,如果用在循环内,可以实现两个程序来回切换.
上边的例子如果用g2.switch()启动,结果就会变成start playing –> start eating –> end playing.
不过这个时候发现,如果在函数内加上IO,依然会阻塞,说明greenlet只是提供了一个顺序流程的改变,还不是真正的协程效果.需要一种可以检测IO状态的模块.gevent模块就是在greenlet的基础上加上了检测IO的功能,实现真正的协程.
gevent模块
import gevent def eat(name): print(name,'start eating') gevent.sleep(2) # 这里用gevent的sleep方法,可以被gevent感知到 print(name,'end eating') return 12 def play(name): print(name,'start playing') gevent.sleep(2) print(name,'end playing') return 21 g1=gevent.spawn(eat,name='cony') # 把代码注册到一个协程里,只用spawn的话无法运行,必须使用join方法才行 g2=gevent.spawn(play,name='jenny') # 又注册了一个协程 g1.join() g2.join() # 也可以用gevent.joinall([g1,g2]) print(g1.value) # 取得返回值12 print(g2.value) # 取得返回值21
由于协程一开始可以异步提交,但执行到最后,肯定要和当前线程内程序最后同步并获得结果,否则就失去了协程的意义.这个模块不能够感知time.sleep(),但是可以感知自己的sleep()方法,在发现sleep了之后,就会切换执行其他gevent对象内的程序.time.sleep(2)或其他的阻塞,gevent是不能直接识别的需要用到下边代码from gevent import monkey;monkey.patch_all()(写在一行内),放到要导入的模块之前,最好放在文件开头的地方就行了.修改后代码如下:
from gevent import monkey import gevent import time monkey.patch_all() def eat(name): print(name, 'start eating') time.sleep(2) print(name, 'end eating') return 10 def play(name): print(name, 'start playing') time.sleep(2) print(name, 'end playing') return 11 g1 = gevent.spawn(eat, name='cony') g2 = gevent.spawn(play, name='jenny') g1.join() g2.join() print(g1.value) print(g2.value)
协程模块遇到能够识别的IO操作的时候,才会进行任务切换,实现并发的效果.
线程是CPU调度的最小单位,所以协程实际上操作系统是不管的,是由用户自行调度的.协程本质上是在一个线程内部来回切换.有效利用协程可以提高单线程的效率.
gevent模块应用
# 模拟高IO场景 from gevent import monkey;monkey.patch_all() import gevent import time def task(i): time.sleep(0.5) print(12345,i) def sync(): for i in range(10): task(i) def async(): t_list = [gevent.spawn(task,i) for i in range(10)] gevent.joinall(t_list) sync() async()
从执行结果来看,显然异步的效率高,协程适合单线程内IO操作频繁的时候,所以协程很适合用在网络IO方面的处理.
# 轻型爬虫 from gevent import monkey monkey.patch_all() import gevent import requests def get_page(url, filename): print('GET: %s' % url) reponse = requests.get(url) if reponse.status_code == 200: with open(filename, 'w') as f: f.write(reponse.content.decode('utf-8')) print('{}写入完成'.format(url)) url_list = ['http://www.conyli.cc', 'http://www.baidu.com', 'http://github.com', 'http://bbs.saraba1st.com/2b/'] filename = 'html' res_list = [] for i in range(len(url_list)): res_list.append(gevent.spawn(get_page, url_list[i], filename + str(i) + '.html')) gevent.joinall(res_list)
爬取内容各个网址的内容写入到文件里.从结果里可以看到,写入完成并不是按照启动的顺序,而是先爬完的先读写.这样比同步执行各个操作要节省很多时间.
学到这里,可以看到,单线程内部可以有协程,多线程,还有多进程,因此如果有大规模的并行任务,理论上是可以开i个进程,每个进程中有j个线程,每个线程中有k个协程,也就是一共i*j*k数量的任务运行.一般在现代操作系统中,4核CPU的这个总数量可以支持50000个.
协程实现单线程的socketserver并发
# 思路和之前多线程或者多进程的方式类似,用一个循环不断监听socket对象的accpet()阻塞,每生成一个连接对象,就用handler方法注册到新的协程里去处理这个连接对象. from gevent import monkey monkey.patch_all() import gevent import socket ip_port = ('127.0.0.1', 8080) buffer_size = 1024 backlog = 5 def get_conn(): while True: conn, addr = sk.accept() gevent.spawn(handler, conn, addr) def handler(conn, addr): print('From {} connected'.format(addr)) try: while True: data = conn.recv(buffer_size) # print('From {}: {}'.format(addr, data.decode('utf-8'))) conn.send(data.upper()) except Exception: print('From {} connect lost'.format(addr)) conn.close() sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sk.bind(ip_port) sk.listen(backlog) get_conn()
多进程,多线程,协程都实现过socketserver了,再回头看socketserver模块的源码,其实就是用多线程Threading实现的.这里有篇文章提到了说一般可以考虑多进程+协程的方式工作,又可以跑满CPU,又不用考虑多线程加锁问题.另外,gevent还有协程池pool等方法,留待以后再来研究了.