协程Coroutine的概念

协程,多线程,多进程,都是用来实现并发的.

由于现代操作系统的时间片原理,可以将这个概念抽象来看,只要能够满足保存上下文,在中断的地方回来执行的程序,都可以用来实现并发.所以并发的本质就是切换+保存状态,只要能够做到这两点,都可以实现并发.

在单线程内部,不可避免的会有IO操作,如果我们的程序能够编写的让单线程在出现IO操作的时候,转到另外的计算部分继续操作,对于操作系统来讲,我这一个线程就会一直处在不被IO阻塞而就绪进行计算的状态,这也是提高效率的一种办法.

协程的本质就是在单线程下,由用户自己控制一个任务遇到io阻塞了就切换另外一个任务去执行,以此来提升效率。为了实现它,我们需要找寻一种可以同时满足以下条件的解决方案:
#1. 可以控制多个任务之间的切换,切换之前将任务的状态保存下来,以便重新运行时,可以基于暂停的位置继续执行。
#2. 作为1的补充:可以检测io操作,在遇到io操作的情况下才发生切换

之前学过生成器写的生产者消费者模型就是单线程内实现并发,让两个程序来回切换.如果函数用yield来接收参数然后处理,就类似一个协程.而且由于第一个yield无法接收,必须先用内置函数next()启动生成器,这就像多线程和多进程的.start()方法一样,启动一个协程.

# yield模拟协程,实现一个查找功能的例子:
def find(pattern):
    while True:
        search = yield
        if pattern in search:
            print('found {} in {}'.format(pattern, search))
        else:
            print('Not searched')


searchlist = ['cony', 'momisjenny', 'testofcony', "cony's summer", 'minkolovejenny']
a = find('jenny')  # 传入要查找的内容
next(a)  # 启动协程
for i in searchlist:  # 协程等待接收传入的数据,传入数据如果包含要查找的内容就打印找到,否则就打印找不到.
    a.send(i)

可以发现,启动生成器之后,生成器像一个真的线程一样一直在提供服务;主程序不需要使用这个功能模块的时候,find函数完全不起作用,需要用到的时候,只要传入查找内容就可以返回结果,就像去另外一个线程或者进程请求结果一样.但实际上,还是在一个单线程内执行,如果给协程里边加上IO操作(比如添加一个input语句),协程会阻塞,这时候整个线程依然会阻塞,并不像真正的协程,究其原因还是因为yield是无法在自身发生阻塞的情况下跳出的.这就需要用一些模块来帮助在单线程内遇到IO的时候切换执行内容.

greenlet模块

greenlet 用于在程序之间切换

from greenlet import greenlet
import time


def eat():
    print('start eating')
    time.sleep(1)
    g2.switch()
    print('end eating')
    g2.switch()


def play():
    print('start playing')
    time.sleep(1)
    g1.switch()
    print('end playing')
    

g1 = greenlet(eat)
g2 = greenlet(play)
g1.switch()

将两个函数都交给greenlet去代理,得到两个对象g1和g2,之后先用一个switch()方法启动g1,就会从头执行eat(),然后执行到g2.switch()的时候会保存一个断点,再到g2执行,第一次跳入g2的时候也是从头开始执行play(),遇到跳回g1的时候,保存这里的断点,然后返回g1上次的断点继续往下执行,又遇到g2.switch(),返回g2的断点,继续执行.这里的例子比较简单,如果用在循环内,可以实现两个程序来回切换.
上边的例子如果用g2.switch()启动,结果就会变成start playing –> start eating –> end playing.
不过这个时候发现,如果在函数内加上IO,依然会阻塞,说明greenlet只是提供了一个顺序流程的改变,还不是真正的协程效果.需要一种可以检测IO状态的模块.gevent模块就是在greenlet的基础上加上了检测IO的功能,实现真正的协程.

gevent模块

import gevent

def eat(name):
    print(name,'start eating')
    gevent.sleep(2)  # 这里用gevent的sleep方法,可以被gevent感知到
    print(name,'end eating')
    return 12

def play(name):
    print(name,'start playing')
    gevent.sleep(2)
    print(name,'end playing')
    return 21


g1=gevent.spawn(eat,name='cony')  # 把代码注册到一个协程里,只用spawn的话无法运行,必须使用join方法才行
g2=gevent.spawn(play,name='jenny')  # 又注册了一个协程
g1.join()  
g2.join()  # 也可以用gevent.joinall([g1,g2])
print(g1.value)  # 取得返回值12
print(g2.value)  # 取得返回值21

由于协程一开始可以异步提交,但执行到最后,肯定要和当前线程内程序最后同步并获得结果,否则就失去了协程的意义.这个模块不能够感知time.sleep(),但是可以感知自己的sleep()方法,在发现sleep了之后,就会切换执行其他gevent对象内的程序.time.sleep(2)或其他的阻塞,gevent是不能直接识别的需要用到下边代码from gevent import monkey;monkey.patch_all()(写在一行内),放到要导入的模块之前,最好放在文件开头的地方就行了.修改后代码如下:

from gevent import monkey
import gevent
import time

monkey.patch_all()


def eat(name):
    print(name, 'start eating')
    time.sleep(2)
    print(name, 'end eating')
    return 10


def play(name):
    print(name, 'start playing')
    time.sleep(2)
    print(name, 'end playing')
    return 11


g1 = gevent.spawn(eat, name='cony')
g2 = gevent.spawn(play, name='jenny')
g1.join()
g2.join()
print(g1.value)
print(g2.value)

协程模块遇到能够识别的IO操作的时候,才会进行任务切换,实现并发的效果.

线程是CPU调度的最小单位,所以协程实际上操作系统是不管的,是由用户自行调度的.协程本质上是在一个线程内部来回切换.有效利用协程可以提高单线程的效率.

gevent模块应用

# 模拟高IO场景
from gevent import monkey;monkey.patch_all()
import gevent
import time

def task(i):
    time.sleep(0.5)
    print(12345,i)

def sync():
    for i in range(10):
        task(i)

def async():
    t_list = [gevent.spawn(task,i) for i in range(10)]
    gevent.joinall(t_list)

sync()
async()

从执行结果来看,显然异步的效率高,协程适合单线程内IO操作频繁的时候,所以协程很适合用在网络IO方面的处理.

# 轻型爬虫
from gevent import monkey

monkey.patch_all()
import gevent
import requests


def get_page(url, filename):
    print('GET: %s' % url)
    reponse = requests.get(url)
    if reponse.status_code == 200:
        with open(filename, 'w') as f:
            f.write(reponse.content.decode('utf-8'))
        print('{}写入完成'.format(url))


url_list = ['http://www.conyli.cc',
            'http://www.baidu.com',
            'http://github.com',
            'http://bbs.saraba1st.com/2b/']

filename = 'html'
res_list = []
for i in range(len(url_list)):
    res_list.append(gevent.spawn(get_page, url_list[i], filename + str(i) + '.html'))
gevent.joinall(res_list)

爬取内容各个网址的内容写入到文件里.从结果里可以看到,写入完成并不是按照启动的顺序,而是先爬完的先读写.这样比同步执行各个操作要节省很多时间.
学到这里,可以看到,单线程内部可以有协程,多线程,还有多进程,因此如果有大规模的并行任务,理论上是可以开i个进程,每个进程中有j个线程,每个线程中有k个协程,也就是一共i*j*k数量的任务运行.一般在现代操作系统中,4核CPU的这个总数量可以支持50000个.

协程实现单线程的socketserver并发

# 思路和之前多线程或者多进程的方式类似,用一个循环不断监听socket对象的accpet()阻塞,每生成一个连接对象,就用handler方法注册到新的协程里去处理这个连接对象.
from gevent import monkey

monkey.patch_all()
import gevent
import socket

ip_port = ('127.0.0.1', 8080)
buffer_size = 1024
backlog = 5


def get_conn():
    while True:
        conn, addr = sk.accept()
        gevent.spawn(handler, conn, addr)


def handler(conn, addr):
    print('From {} connected'.format(addr))
    try:
        while True:
            data = conn.recv(buffer_size)
            # print('From {}: {}'.format(addr, data.decode('utf-8')))
            conn.send(data.upper())
    except Exception:
        print('From {} connect lost'.format(addr))
        conn.close()


sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sk.bind(ip_port)
sk.listen(backlog)
get_conn()

多进程,多线程,协程都实现过socketserver了,再回头看socketserver模块的源码,其实就是用多线程Threading实现的.这里有篇文章提到了说一般可以考虑多进程+协程的方式工作,又可以跑满CPU,又不用考虑多线程加锁问题.另外,gevent还有协程池pool等方法,留待以后再来研究了.