多线程控制组件-互斥锁和递归锁

多进程用锁的情况不多,因为进程共享数据的时候,一般用成熟的消息中间件,自带锁而且多进程安全;进程之间应该尽量少的通信,只有在确实需要自己访问数据的时候再手动加锁.但是线程锁是一个永恒的主题,用的非常多.
线程锁的模块与multiprocessing类似,也是Lock模块.从前边各个线程可以共享全局变量就知道,如果没用锁,肯定会产生数据安全,看例子:

# 线程争抢数据
from threading import Thread
import os,time

def work():
    global n
    temp=n
    time.sleep(0.1)
    n=temp-1

if __name__ == '__main__':
    n=100
    l=[]
    for i in range(100):
        p=Thread(target=work)
        l.append(p)
        p.start()
    for p in l:
        p.join()

    print(n) #结果可能为99

开启了100个线程对全局变量n=100来减1,结果发现结果不是0,反而是99,因为100个线程去读n,在0.1秒的时间内,都读取到了n=100,再都减,结果都是99,最后100个线程把各自的99都写回n,就得到了99.
这个和GIL锁没有冲突,因为GIL锁了同时一个线程,线程是分时间片运行,数据没有上锁,所以依然在很短的时间内会有不同的线程访问到同一个数据.所以必须对取值的部分上锁.

# 加锁后代码
from threading import Thread,Lock

def work():
    global n
    lock.acquire()
    n=n-1
    lock.release()

if __name__ == '__main__':
    n=100
    lock = Lock()
    l=[]
    for i in range(100):
        p=Thread(target=work)
        l.append(p)
        p.start()
    for p in l:
        p.join()

    print(n) #结果可能为99

加锁牺牲了速度保证了安全性.这种锁叫做互斥锁,也就是操作这个数据和单线程操作没有什么区别.深入研究锁,看一个模型:科学家吃面:若干个科学家坐在桌子旁,桌子上只有一个叉子一个盘面条,必须同时拿到面条和叉子才能吃面.如果不同的人拿到叉子和面条,如果互相不放弃,就很可能造成死锁.

from threading import Thread, Lock
import time


def eat1(name):
    noodle_lock.acquire()
    print('{}拿到面条了'.format(name))
    time.sleep(1)
    fork_lock.acquire()
    print('{}拿到叉子了'.format(name))
    print('{}吃面'.format(name))
    fork_lock.release()
    noodle_lock.release()


def eat2(name):
    fork_lock.acquire()
    print('{}拿到叉子了'.format(name))
    time.sleep(1)
    noodle_lock.acquire()
    print('{}拿到面条了'.format(name))
    print('{}吃面'.format(name))
    noodle_lock.release()
    fork_lock.release()


if __name__ == '__main__':
    n = 100
    noodle_lock = Lock()
    fork_lock = Lock()
    Thread(target=eat1, args=('jenny',)).start()
    Thread(target=eat2, args=('cony',)).start()
    Thread(target=eat1, args=('minko',)).start()
    Thread(target=eat2, args=('gege',)).start()

可以看到,程序会卡住,这是因为jenny一开始拿到了noodle_lock,表示拿到了面条,然后准备等待1秒以后去获取fork_lock拿叉子;cony线程先去拿了fork_lock表示拿到了叉子,然后等待1秒以后去拿面条.结果1秒钟以后,Jenny准备去拿叉子,而cony准备去拿面条,由于锁没有释放,两个人互相等着拿对方的东西,结果一直卡在这里,两个人都吃不了面.这就是死锁现象.为了解决这种加锁的数据还互相依赖的问题,需要用递归锁来加锁,而不能用普通的互斥锁.

递归锁的特点就像是一串钥匙去打开一个俄罗斯套娃,先用第一个钥匙开第一个锁,再用第二个钥匙开第二层锁,直到开到最里边,加锁的时候则是反过来.即同一把锁可以反复上锁,每次锁一层:

# 递归锁
from threading import Thread, RLock,Lock
import time


def eat1(name):
    noodle_lock.acquire()  # 拿到第一个锁,这个时候其他线程已经拿不到了
    print('{}拿到面条了'.format(name))  # 拿到第一个数据
    time.sleep(1)
    fork_lock.acquire()  # 再加一层锁,深入到另一数据
    print('{}拿到叉子了'.format(name))  # 拿到第二个数据
    print('{}吃面'.format(name))  # 完成操作
    fork_lock.release()  # 还回第一把钥匙
    noodle_lock.release()  # 还回第二把钥匙,释放之前所有数据.


def eat2(name):
    fork_lock.acquire()
    print('{}拿到叉子了'.format(name))
    time.sleep(1)
    noodle_lock.acquire()
    print('{}拿到面条了'.format(name))
    print('{}吃面'.format(name))
    noodle_lock.release()
    fork_lock.release()


if __name__ == '__main__':
    n = 100
    noodle_lock = fork_lock = RLock()   # 一个钥匙串上的两把钥匙
    Thread(target=eat1, args=('jenny',)).start()
    Thread(target=eat2, args=('cony',)).start()
    Thread(target=eat1, args=('minko',)).start()
    Thread(target=eat2, args=('gege',)).start()

上边的这段代码是故意为了演示两层不同名字的锁对应不同的数据,实际上可以修改成下边的样子更容易理解:

from threading import Thread, RLock,Lock
import time


def eat1(name):
    rlock.acquire()  # 第一层递归锁
    print('{}拿到面条了'.format(name))  # 拿到第一个数据
    time.sleep(1)
    rlock.acquire()  # 第二层递归锁
    print('{}拿到叉子了'.format(name))  # 拿到第二个数据
    print('{}吃面'.format(name))  # 完成操作
    rlock.release()  # 解开第二层递归锁
    rlock.release()  # 解开第一层递归锁,其他线程可以取得该锁.


def eat2(name):
    rlock.acquire()
    print('{}拿到叉子了'.format(name))
    time.sleep(1)
    rlock.acquire()
    print('{}拿到面条了'.format(name))
    print('{}吃面'.format(name))
    rlock.release()
    rlock.release()


if __name__ == '__main__':
    n = 100
    rlock = RLock()   # 一把递归锁
    Thread(target=eat1, args=('jenny',)).start()
    Thread(target=eat2, args=('cony',)).start()
    Thread(target=eat1, args=('minko',)).start()
    Thread(target=eat2, args=('gege',)).start()

这里如果改用互斥锁,会卡死在获取第二个数据上边,因为互斥锁无法再反复加锁.进程中也有互斥锁和递归锁的用法,进程中也会产生死锁的现象.

多线程控制组件-信号量

信号量的用法和进程里几乎相同,用于控制同一时刻可以有多少线程进行信号量指定的任务.

from threading import Thread, Semaphore
import time


def func(sem, a, b):
    sem.acquire()
    time.sleep(1)
    print(a * b)
    sem.release()


if __name__ == '__main__':
    sem_1 = Semaphore(4)
    for i in range(20):
        Thread(target=func, args=(sem_1, i, i + 4,)).start()

可见结果是4个4个的冒出来,也就是同时只有4个任务执行了sem.acquire到sem.release之间的代码.与进程一样,需要将一个信号量对象同时传入所有需要该信号量控制的线程中.

多线程控制组件-事件

事件的用法和进程也几乎相同,需要将事件传入到所有需要该事件的线程中.线程的一个关键特性是每个线程都是独立运行且状态不可预测。如果程序中的其他线程需要通过判断某个线程的状态来确定自己下一步的操作,这时线程同步问题就会变得非常棘手。为了解决这些问题,我们需要使用threading库中的Event对象。Event对象包含一个可由线程设置的信号标志,它允许线程等待某些事件的发生。在初始情况下,Event对象中的信号标志被设置为假。如果有线程等待一个Event对象, 而这个Event对象的标志为假,那么这个线程将会被一直阻塞直至该标志为真。一个线程如果将一个Event对象的信号标志设置为真,它将唤醒所有等待这个Event对象的线程。如果一个线程等待一个已经被设置为真的Event对象,那么它将忽略这个事件, 继续执行.

除了红绿灯模型之外,看一个常用的多线程访问数据库的程序:
启动两个线程:
线程A用于连接数据库,在连接之前该线程需要等待一个信号,即数据库可连接
线程B用于测试网络是否通畅,去和数据库连通,如果超过2秒钟没有反应,则表明该网络不通.如果通,再允许A去访问数据库.

from threading import Thread, Event
import time
import random


def check_web(evn):  # 模拟网络延时
    time.sleep(random.randint(0, 5))
    evn.set()


def conn_db(evn):
    count = 1
    while count <= 3:
        evn.wait(1)  # wait的参数可以指定等待的时间,超时就会继续执行下边的代码,不会在此处阻塞
        if evn.is_set() == True:
            print('连接数据库')
            break
        else:
            print('第{}次连接失败'.format(count))
            count += 1
    else:
        raise TimeoutError('数据库连接失败')


if __name__ == '__main__':
    event = Event()
    t1 = Thread(target=check_web, args=(event,))
    t2 = Thread(target=conn_db, args=(event,))
    t1.start()
    t2.start()

多线程控制组件-条件

看视频里稀里糊涂的没怎么听明白,这里我找了两篇文章: python多线程编程(5): 条件变量同步Python线程指南 看了一下,然后把其中一个生产者消费者模型自己改写了一下,终于明白了一点,试着用自己的语言解释一下条件.

# 条件锁实现生产者消费者模型
from threading import Thread, Condition
import time

def consumer(con, i):
    global n
    while True:
        con.acquire()
        if n > 100:
            n = n - 3
            print('第{}号消费者消费了3个东西,还有{}个东西'.format(i, n))
            con.notify()
            time.sleep(0.1)
        else:
            con.wait()
        con.release()

def producer(con, i):
    global n
    while True:
        con.acquire()
        if n <= 100:
            n = n + 200
            print('第{}号生产者生产了200个东西'.format(i))
            con.notify()
            time.sleep(0.1)
        else:
            con.wait()
        con.release()


if __name__ == '__main__':
    n = 189
    con = Condition()
    for i in range(3):
        Thread(target=consumer, args=(con, i)).start()
    for i in range(3):
        Thread(target=producer, args=(con, i)).start()

这个程序运行之后的效果,就是各个消费者不断消费n,当n<=100的时候停止消费,由生产者生产200个东西投入市场,然后消费者继续消费.而且最关键的是,这个程序还有一定概率卡住进入死锁.

这个程序分析如下:首先,无论是增加n的生产者还是减少n的消费者,都要操作全局变量n,根据之前互斥锁的概念,则所有生产者和消费者需要处于同一把互斥锁下.于是建立一个条件锁con,传递给所有线程.

程序启动以后,所有消费者和生产者都去抢锁,不管执行顺序如何,一轮抢下来,三个生产者都进入wait阻塞,三个消费者继续争抢锁,每一次消费者抢到锁执行之后,都会发一个notify信号,复活一个生产者线程,然后这个生产者线程因为条件不满足,再次进入阻塞.

到了n大于100之前的最后一次执行n = n -3之后,必定会复活一个生产者线程,此时有三个活的消费者线程和一个生产者线程抢锁,这一轮下来,很可能消费者线程陷入阻塞,被生产者复活;但关键的是,也有可能三个消费者先进入阻塞,生产者发出的notify信号复活了其他生产者线程,判断满足条件就不再发送notify,因为无法再发送notify信号复活新的线程,导致程序死锁,这与分析的结果相同.如果想要改进,目前先简单的把生产者改为1个,即生产者发送notify信号的时候,肯定不会复活其他生产者,就可以解决这个问题.而且可以预测的是,如果增加生产者的数量,程序容易更早的进入死锁,修改生产者到100,确实也会出现这种情况.

还有一个原因是,将消费者低于某个限界停止消费和生产者低于某个限界就生产产品,两个限界值设置成了一样,即consumer和producer函数里的判断条件都是100,就会按照上边的分析导致出现死锁.之前的链接里的程序,如果将临界点修改成一样,也会发生死锁,如果将临界点拉开到一定程度(这里试验了一些情况,应该是与每次生产的数量及生产者消费者个数有关系,稍微高于消费者临界依然会卡住),则同一时刻会有同时生存的生产者和消费者线程,程序就不会死锁.

多线程控制组件-定时器

定时器用于在固定的时间后触发某时间.使用比较简单,看这里的一篇文章即可.唯一要注意的一点就是定时器只能触发一次,如果想要反复触发,需要写在循环内.

多线程通信组件-队列

多线程队列与多进程队列略有不同,直接通过queue模块的Queue对象使用即可,queue模块下边有几种队列,都是线程安全的队列.
本来线程之间可以共享变量,但是由于数据不安全,需要加锁.所以线程间通信,也推荐使用成熟的消息中间件.

# 先进先出的标准队列
from threading import Thread
import queue

q = queue.Queue(n)  # 方法与multiprocessing.Queue一样
q.get()
q.get_nowait()
q.put()
q.put_nowait()
# 后进先出队列,其实就是一个栈,可以用来模拟栈
from threading import Thread
import queue

q = queue.LifoQueue()

q.put(31)
q.put_nowait(34)
print(q.get())
print(q.get_nowait())
# 优先级队列
from threading import Thread
import queue

q = queue.PriorityQueue()

q.put((10, 'fd',))
q.put_nowait((20, 'adsf',))
q.put_nowait((10, 'sadf',))
q.put_nowait((-5, 'dd',))
q.put_nowait((1, 'f',))
print(q.get())
print(q.get())
print(q.get())
print(q.get())

优先级队列每次放入一个元组,元组的第一个元素是优先级,第二个元素是消息.
越小优先级越高,也支持负数,优先级也可以是浮点数.取出的也是元组.如果优先级相同,按照ASCII码的大小顺序进行取出,小的优先取出.

线程池

原来的python是没有线程池的,后来的版本中出现了线程池.通过concurrent.futures导入ProcessPoolExecutor和ThreadPoolExecutor,其中前者是进程池,功能与multiprocessing里边的Pool一样,后者是线程池.
这两个模块的方法是相同的.所以python里目前有两个进程池,一个线程池可供使用.

from concurrent.futures.thread import ThreadPoolExecutor
import time


def func(n):
    time.sleep(1)
    print(n)
    return n + 1


p = ThreadPoolExecutor(max_workers=40)

res_l = []

for i in range(10):
    res = p.submit(func, i)  # 异步提交并启动任务,也可以用map方法res = p.map(func, range(40))
    res_l.append(res)
p.shutdown()  # 等于close+join方法
print([i.result() for i in res_l])  # 和进程池一样,线程池支持取得返回值.

如果使用进程池,只需要把ThreadPoolExecutor修改成ProcessPoolExecutor就可以了,二者的方法完全一致.
注意,使用map是无法取得返回值.

from concurrent.futures.thread import ThreadPoolExecutor
import time


def func(n):
    time.sleep(1)
    print(n)
    return n + 1


p = ThreadPoolExecutor(max_workers=4)

res_l = []

for i in range(20):
    res = p.submit(func, i)  
    res_l.append(res)

for t in res_l:
    print(t.result())

如果代码改成上面这样,去掉shutdown,并且不再使用列表推导式来等全部结果都出来之后再打印,可以看到主线程获得了多少结果就会打印出多少结果.

回调函数

from concurrent.futures.thread import ThreadPoolExecutor
import time


def func(n):
    time.sleep(1)
    print(n)
    return n + 1

def callbackfunc(n):
    print('回调函数的结果是{}'.format(n.result()))

p = ThreadPoolExecutor(max_workers=4)

res_l = []

for i in range(20):
    res = p.submit(func, i).add_done_callback(callbackfunc)

需要注意的一点是,回调函数拿到的参数n是一个对象,需要用.result()取到里边的值.

至此完成了所有的并发编程系列,由于GIL锁的限制,实际上在python内,多进程是并行,而多线程是并发.并行和并发编程为基础的编程工作提供了更广阔的思路,在应用并发编程的时候,需要注意消息的传递以及进程和线程的管理.