进程间通信叫做 IPC (Inter-Process Communication)
进程间通信通过multiprocessing中的Queue(队列)和Pipe(管道)模块来实现.
multiprocessing.Queue 队列
队列是先进先出的,multiprocessing模块里的Queue是一个多进程安全的队列对象,几乎就是queue.Queue的克隆.
from multiprocessing import Queue if __name__ == '__main__': q = Queue(5) # 实例化队列对象,参数表示队列的容纳元素的多少.省略则表示无大小限制. q.put(1) # 向队列中增加一个元素,如果此时超过队列容量,默认会阻塞 q.get() # 从队列中取出一个元素,如果此时队列为空,默认会阻塞 q.full() # 判断队列是否已满 q.empty() # 判断队列是否已空
将队列对象作为参数传入各个进程,即可实现进程间通信.
这里有一篇介绍Queue的文章,官方文档在此
put和get方法还有各自的put_nowait和get_nowait方法,等于各自的方法里设置false,这样就不会阻塞
但是空的时候get_nowait会报错,而队列满的时候,put_nowait会报错并失去put的值.这样可以捕获异常然后进行后续处理,就不用一直阻塞在那里等待队列.
在一般的数据处理中,生成数据的速度一般比处理比较快,比如在爬虫的实际运行环境中,一般采取多线程进行爬取数据,但是对数据的处理比较慢,就可以将爬取的数据暂时放入一个队列中,然后慢慢处理.或者在数据彼此不相关的情况下,可以增加处理者的线程,同时处理多个数据:
import time,random from multiprocessing import Process from multiprocessing import Queue def producer(name,food, q): for i in range(10): time.sleep(random.randint(1,2)) f = '{}生产的{}号{}'.format(name,i, food) q.put(f) print('{}生产了{}号{}'.format(name,i, food)) def consumer(name,q): while True: try: k = q.get(timeout=5) if k is not None: print(' {}吃了{}'.format(name,k)) time.sleep(2) else: print('{}没东西吃了'.format(name)) break except Exception: print(' {}等了这么久也没东西吃,看来是没了'.format(name)) break if __name__ == '__main__': queue = Queue() p1 = Process(target=producer, args=('吉祥', '包子',queue,)) p2 = Process(target=producer, args=('红宝石', '蛋糕',queue,)) p3 = Process(target=producer, args=('五芳斋', '粽子',queue,)) p1.start() p2.start() p3.start() c1 = Process(target=consumer,args=('Jenny',queue,)) c2 = Process(target=consumer,args=('Cony',queue,)) c1.start() c2.start() p1.join() p2.join() queue.put(None)
这个模型初步建立,用到的一个关键是,要在主进程内跟踪所以数据生产者的状态,保证全部生产完毕之后,给队列一个生产完毕的信号.目前的主要问题是,消费者很难判断生产是否结束,现在的做法是向队列里放一个None,子进程拿到None后正常退出,然后使用了一个长时间的超时来判断结束并退出,有没有更好的解决办法呢?
JoinableQueue,每次从队列里获取一个数据的时候,需要向队列提交一个回执,叫做q.taskdone().在每次向JoinableQueue put数据的时候,会有个计数器去加1,提交taskdone的时候,这个计数器会-1.这个兑队列本身也可以.join()来感知这个队列中的数据全部被执行完毕,这样就可以通过这个队列对象来
import time,random from multiprocessing import Process from multiprocessing import JoinableQueue def producer(name,food, q): for i in range(10): time.sleep(random.randint(1,2)) f = '{}生产的{}号{}'.format(name,i, food) q.put(f) print('{}生产了{}号{}'.format(name,i, food)) q.join() # 阻塞,直到一个队列中的所有数据全部处理完毕,等于一直在等着消费者拿了包子吃完才结束 def consumer(name,q): while True: food = q.get() if food is None: print('%s获取到了一个空' %name) break print('\033[31m%s消费了%s\033[0m' %(name,food)) time.sleep(random.randint(1,3)) q.task_done() # 减少一个计数,get方法并不减少计数,put方法会增加计数.这样就会和生产者进程最后同步,大家一起结束. if __name__ == '__main__': queue = JoinableQueue() p1 = Process(target=producer, args=('吉祥', '包子',queue,)) p2 = Process(target=producer, args=('红宝石', '蛋糕',queue,)) p3 = Process(target=producer, args=('五芳斋', '粽子',queue,)) p1.start() p2.start() p3.start() c1 = Process(target=consumer,args=('Jenny',queue,)) c2 = Process(target=consumer,args=('Cony',queue,)) c1.daemon = True # 设置为守护进程,主进程代码执行完毕就结束 c2.daemon = True # 设置为守护进程,主进程代码执行完毕就结束 c1.start() c2.start() p1.join() p2.join()
这里的流程是这样: 主进程启动各个子进程后,就通过最后两行join等待所有生产者结束,然后每个生产者,又通过队列等待消费者结束.而消费者又是守护进程,会在主进程结束的时候结束.
等到消费者吃光了所有队列里的东西,并且task_done通知队列全部活干完了,生产者就会从阻塞的地方结束,此时主进程等到了生产者结束,也执行到了最后的代码,然后消费者守护进程结束,就在工作全部处理完毕的情况下,关闭了所有进程.这样,可以增加任意多的消费者和生产者,由于队列是多线程安全的(自带锁),这些消费者和生产者互相不冲突.
# 任意多个生产者消费者的模型 import time, random from multiprocessing import Process from multiprocessing import JoinableQueue def producer(name, food, q): for k in range(10): time.sleep(random.randint(1, 2)) f = '{}生产的{}号{}'.format(name, k, food) q.put(f) print('{}生产了{}号{}'.format(name, k, food)) q.join() # 阻塞,直到一个队列中的所有数据全部处理完毕,等于一直在等着消费者拿了包子吃完才结束 def consumer(name, q): while True: food = q.get() print('\033[31m%s消费了%s\033[0m' % (name, food)) time.sleep(random.randint(1, 3)) q.task_done() # 减少一个计数,get方法并不减少计数,put方法会增加计数.这样就会和生产者进程最后同步,大家一起结束. if __name__ == '__main__': queue = JoinableQueue() p_list = [] for i in range(10): p = Process(target=producer, args=(str(i), '包子', queue,)) p_list.append(p) p.start() for j in range(5): c = Process(target=consumer, args=(str(j), queue,)) c.daemon = True c.start() [i.join() for i in p_list]
multiprocessing.Pipe 管道
管道也是一种组件,可以理解为一个双向的管道,有两个端点,每个端点都有发送和接受方法,从一端send东西过去,另外一端就可以recv进来.如果recv收不到,就会阻塞.
Pipe实例化会得到两个对象,是一个全双工通信管道的两端,假如两个端点为A和B,注意,A的send需要由B的recv来接收,A的recv收到的东西,是B发送的内容.
这里有一个问题就是,如果知道信息全部传送完毕.实际上,管道的引用计数是操作系统控制的,当一个管道双向全部被关闭的时候,这个管道就会关闭,再对这个管道进行操作,就会触发EOFError异常,通过捕捉这个异常,就可以知道管道是否传输完毕.
如果在主进程和多个进程间通信,需要把管道的两端都传递给所有进程,然后只用一端发的进程,就把另外一端close掉,主进程也关闭不用的一端,就像是全双工关闭了一条线路.等通信完毕之后,各自再关闭另外一端,管道就中断,引用计数变成了0,可以捕捉异常了.这里关键是理解,要用的端点,就是插在那一段数据上的管子的一头,另外一头不用了,就关闭,只用插在自己身上的那个端点.
from multiprocessing import Process, Pipe def senp(sendp, recvp): recvp.close() while True: try: msg = sendp.recv() print(msg) except EOFError: sendp.close() break if __name__ == '__main__': sendp, recvp = Pipe() Process(target=senp, args=(sendp, recvp,)).start() sendp.close() for i in range(20): recvp.send('hello') recvp.close()
其实可以理解为,主进程一根管子分支了很多根插到子进程上,子进程关闭recvp,实际是表示自己不使用这一端,主进程关闭sendp表示不使用这一头,但是其他头依然有引用.最后当主进程关闭自己的recvp这一头,所有的sendp各自被子进程关闭之后,管道的一端被所有进程完全关闭,就会引发EOF错误,表面管道通信已经结束,这个时候就退出通信即可.
将管道作用于生产者消费者模型:
from multiprocessing import Process, Pipe def producer(con, pro, name): con.close() for i in range(10): pro.send('{}的{}号产品'.format(name, i)) pro.close() def consumer(con, pro, name): pro.close() while True: try: msg = con.recv() print('{}消费了{}'.format(name, msg)) except EOFError: con.close() break if __name__ == '__main__': con_p, pro_p = Pipe() for j in range(1): Process(target=producer, args=(con_p, pro_p, '{}'.format(j))).start() for k in range(3): Process(target=consumer, args=(con_p, pro_p, '{}'.format(k))).start() con_p.close() # 在主进程里创建的管道,分别交给各个子进程,在子进程里都close之后,主进程也需要关闭,所有进程里都需要关闭所使用的管道 pro_p.close() # 所有进程内关闭管道
管道实际上是通过socket来通信的IPC,管道并不是进程安全的,如果想要安全,必须要给管道加锁.进程安全的是multiprocessing.Queue.
所以尽量使用Queue.
应该特别注意管道端点的正确管理问题。如果是生产者或消费者中都没有使用管道的某个端点,就应将它关闭。这也说明了为何在生产者中关闭了管道的输出端,在消费者中关闭管道的输入端。如果忘记执行这些步骤,程序可能在消费者中的recv()操作上挂起。管道是由操作系统进行引用计数的,必须在所有进程中关闭管道后才能生成EOFError异常。因此,在生产者中关闭管道不会有任何效果,除非消费者也关闭了相同的管道端点。
进程间数据共享
队列和管道,是进程间互相发送消息的渠道.如果所有的进程需要共享数据,一般需要通过锁来保证数据安全性.
展望未来,基于消息传递的并发编程是大势所趋,即便是使用线程,推荐做法也是将程序设计为大量独立的线程集合,通过消息队列交换数据。这样极大地减少了对使用锁定和其他同步手段的需求,还可以扩展到分布式系统中。但进程间应该尽量避免通信,即便需要通信,也应该选择进程安全的工具来避免加锁带来的问题。
多进程其实是不推荐使用数据共享的,如果要共享,就要通过一些能够在进程之间传递的数据结构.比如multiprocessing.Manager模块.
from multiprocessing import Process, Manager def minus(dic): dic['count'] -= 1 if __name__ == '__main__': m = Manager() dic = m.dict({'count': 100}) # 这个字典会变成可用于数据共享的字典 p_list = [] for i in range(70): p =Process(target=minus, args=(dic,)) p_list.append(p) p.start() [i.join() for i in p_list] print(dic)
结果运行可以发现,有的时候结果会出现不等于50的情况,说明manager支持的数据类型里,有一部分是多进程不安全的.只要加个锁就可以了.
from multiprocessing import Process, Manager,Lock def minus(dic,lock): lock.acquire() dic['count'] -= 1 lock.release() if __name__ == '__main__': m = Manager() lock = Lock() dic = m.dict({'count': 100}) # 这个字典会变成可用于数据共享的字典 p_list = [] for i in range(70): p =Process(target=minus, args=(dic,lock)) p_list.append(p) p.start() [i.join() for i in p_list] print(dic)
列出几篇文章来供多进程通信参考:
理解Python并发编程一篇就够了 – 线程篇
【Multiprocessing系列】共享资源.其中的其他文章也可以看看.
Python multiprocessing 模块解析 (2) – managers 初探轮廓