锁,信号量和事件,都是用来同步异步程序的组件.
锁
之前多线程编程的时候,都是操作逻辑,还没有实际操作需要共享的数据.但是已经知道,创建进程并且运行,是操作系统去完成的,实际的执行顺序未必可知.如果对多个进程都需要的文件进行操作,必须有一种机制来保证不会错乱,就引入了锁的抽象概念,即一个进程在操作某个I/O对象的时候,会给这个对象上一把锁,如果其他进程来访问,看到锁之后就知道了该对象现在不可访问.待锁去掉之后,再进行访问.
from multiprocessing import Process import os, time, json def show(i): with open('ticket') as f: dic = json.load(f) print('编号{}查看了余票: %s'.format(i,dic['ticket'])) def buy(i): with open('ticket',) as f: dic = json.load(f) if dic['ticket'] >0: dic['ticket'] -= 1 print('{}买到票了'.format(i)) else: print('{}没买到票'.format(i)) time.sleep(0.1) with open('ticket','w') as f: json.dump(dic,f) if __name__ == '__main__': for i in range(10): p = Process(target=buy,args=(i,)) p.start()
这个程序想要实现查看文件里的剩余票数,如果有票,就将票数减少1,如果为0,就返回买不到票.
但是实际运行起来,发现10个人都买到了票,这显然有问题,是因为进程运行都很快,在第一个访问文件的进程将0写入文件之前,其他进程也读取了1,结果都拿到一样的结果.
如果要改变这种情况,就在文件的门口放一把锁和挂上唯一的一把钥匙.先拿到钥匙的人把门锁上,完成工作之后,打开门然后把钥匙挂在门上,下一个人来的时候,看到门锁着,则无法访问文件,看到有钥匙,则开门进房间.
这个过程抽象成一个锁的概念.锁的使用方法:在需要使用锁的程序代码前后,用Lock()实例化的对象的acquire方法来获得锁,release方法来释放锁.
import json import time from multiprocessing import Process from multiprocessing import Lock def buy_ticket(i, lock): lock.acquire() # 拿钥匙进门 with open('ticket') as f: dic = json.load(f) time.sleep(0.1) if dic['ticket'] > 0: dic['ticket'] -= 1 print('\033[32m%s买到票了\033[0m' % i) else: print('\033[31m%s没买到票\033[0m' % i) time.sleep(0.1) with open('ticket', 'w') as f: json.dump(dic, f) lock.release() # 还钥匙 if __name__ == '__main__': lock = Lock() for i in range(10): p = Process(target=buy_ticket, args=(i, lock)) p.start() # 以下是恢复票数 time.sleep(3) with open('ticket') as f: dic = json.load(f) dic['ticket'] = 3 with open('ticket','w') as f: json.dump(dic, f)
这样每次可以看到,10个人里确实有3个人抢到了票.
注意,启动的10个进程,传入的参数是同一个对象.
信号量
在锁的基础上,如果同时有n个资源可以供超过这些资源的数目进程访问,那么同时只能让n个进程分别访问这个资源,就相当于指定数量的锁,这个时候就用信号量类,来实例化一个信号量对象,用于控制总数.
import time import random from multiprocessing import Process from multiprocessing import Semaphore def ktv(i, sem): sem.acquire() # 取一个钥匙然后进门 print('{}进入了KTV'.format(i)) time.sleep(random.randint(2, 5)) print('{}离开了KTV'.format(i)) sem.release() # 归还钥匙 if __name__ == '__main__': sem = Semaphore(4) for i in range(10): p = Process(target=ktv, args=(i, sem)) p.start()
这个程序模拟了一个只能容纳4个人的KTV房间,有10个人想唱歌,则必须等进去的人唱好出来之后,才能再进去.
实际上把sem.acquire()直接写在主进程里边,可以看到,反复调用sem.acquire(),只有前四次,之后的代码会响应,之后就会阻塞住.其实就相当于一个加了计数器的锁.
事件
对于锁和信号量可以使所有的进程都进入阻塞状态,也可以控制所有的进程解除阻塞.
一个事件被创建之后,默认是阻塞状态.
from multiprocessing import Event if __name__ == '__main__': e = Event() # 实例化事件 print(e.is_set()) # 查看一个事件的状态,默认被设置成阻塞 print(123456)
结果是False,并且打印了123456,似乎没有阻塞,实际上,事件e是根据e.wait()的值来决定是否阻塞.
if __name__ == '__main__': e = Event() print(e.is_set()) e.set() # 设置事件的状态为True print(e.is_set()) # 用来查看一个事件的状态 e.wait() # 是否阻塞取决于set是否为True print(123456) e.clear() # 将事件的状态重新设置为False e.wait() print(123456)
第一个123456正常打印,第二个123456之前事件e发生阻塞.事件被创建的时候默认为阻塞,并且会在wait()方法的地方阻塞.设置为True的时候,不阻塞.
事件的作用,就是等待某一件事中的一个信号变化,然后做一些动作,再将状态改回来,继续再进入等待状态.这几个进程间同步控制,内部其实写了基于文件的socket,所以如果用错了,可能会出现socket错误.
用一个经典例子解释事件:
import time from multiprocessing import Process from multiprocessing import Event def light(e): while True: if e.is_set(): time.sleep(2) e.clear() print('\033[31m红灯亮了\033[0m ') else: time.sleep(2) e.set() print('\033[32m绿灯亮了\033[0m ') def car(i, e): while True: if not e.is_set(): print('{}车在等待通行') e.wait() print('{}车通过绿灯') if __name__ == '__main__': e = Event() traffic = Process(target=light, args=(e,)) traffic.start() for i in range(5): cc = Process(target=car, args=(i, e,)) cc.start()
用一个事件传递到所有的进程中,这个事件的状态一旦改变,各个进程便可以通过这个事件的状态改变作出相应的逻辑.