进程池

进程并不是越多越好,由于进程的启动,销毁和调度都需要消耗资源,所以开过多的进程会消耗大量资源,而并不能够获得更高的性能.
所以提出了进程池的概念,先启动一定个数的进程,然后等待任务,用这些进程去执行这些任务,执行完任务后,这个进程接受新的任务.这就是进程池的概念.
python中的进程池用multiprocessing.Pool模块控制.Pool模块是一个简单的进程池模块,只能指定固定个数的进程,不能像有些语言一样可以设定上限和下限.

# 进程池例子
import time
from multiprocessing import Pool


def test(n):
    print('This is fun{} running'.format(n))
    time.sleep(1)
    print('This is fun{} ending'.format(n))


if __name__ == '__main__':
    pool = Pool(9)
    pool.map(test, range(10))  # map后边传一个序列对象作为参数,如果想要传多个参数,可以用元组传递,在函数内部拆解

用map方法执行的时候,默认异步调用,然后自带.close()和.join()方法,就和之前用Process的情况一样.除此之外,还可以用apply和apply_async方法可以启动进程池:

import time
from multiprocessing import Pool
import os


def test(n):
    print('This is fun{} running'.format(n))
    print('My id is', os.getpid())
    time.sleep(1)
    print('This is fun{} ending'.format(n))


if __name__ == '__main__':
    pool = Pool(9)
    for i in range(10):
        pool.apply(test, args=(i,))  # 同步启动线程

经过运行可知,apply是同步提交任务,任务依次完成.异步的方法是apply_async

import time
from multiprocessing import Pool
import os


def test(n):
    print('This is fun{} running'.format(n))
    print('My id is', os.getpid())
    time.sleep(1)
    print('This is fun{} ending'.format(n))


if __name__ == '__main__':
    pool = Pool(9)
    for i in range(10):
        pool.apply_async(test, args=(i,))  # 同步启动线程

结果发现主进程直接执行完毕,没有等待子进程,async是真异步.如果要改变成通常状态,需要在所有的任务提交完毕之后,先用pool.close()让进程池不再接受新的任务.此后再用pool.join()来感知进程池中的任务执行结束(进程池中的进程一直存在,但是任务会结束)

import time
from multiprocessing import Pool
import os


def test(n):
    print('This is fun{} running'.format(n))
    print('My id is', os.getpid())
    time.sleep(1)
    print('This is fun{} ending'.format(n))


if __name__ == '__main__':
    pool = Pool(9)
    for i in range(10):
        pool.apply_async(test, args=(i,))
    pool.close()  # 结束进程池接受任务
    pool.join()  # 感知进程池中的任务都结束了

这样的结果就和原来的传统方法一样了.

用多进程可以比较方便的实现并发,用进程池写一个自己实现的socketserver:

# Mysocketserver
import socket
from multiprocessing import Pool


def handler(conn, addr):
    print('Connection established from:', addr)
    while True:
        data = conn.recv(1024)
        conn.send(data.upper())


if __name__ == '__main__':
    pool = Pool()
    buffer_size = 1024
    ip_port = ('127.0.0.1', 8080,)
    sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sk.bind(ip_port)
    sk.listen()
    while True:
        conn, addr = sk.accept()
        pool.apply_async(handler, args=(conn, addr,))

每生成一个新的连接,就把这个连接丢到一个新的进程中,用handler去处理它.然后循环继续监听下一个连接.

进程池的返回值

如果进程中的func有返回值,会如何呢?进程池不像Process,可以拿到进程执行函数的返回值.

from multiprocessing import Pool


def func(i):
    return i*i


if __name__ == '__main__':
    pool = Pool()
    for j in range(10):
        res = pool.apply(func,args=(j,))
        print(res)

可以发现,拿到了各个进程调用函数的返回值.正常情况下,子进程里return一个值无法被父进程直接接收,只能通过IPC来实现.但是在用进程池的同步方法时,可以拿到各个进程调用函数的返回值.
如果改成apply_async,试验下边的代码:

from multiprocessing import Pool
import time

def func(i):
    time.sleep(2)
    return i*i


if __name__ == '__main__':
    pool = Pool()
    for j in range(10):
        res = pool.apply_async(func,args=(j,))
        print(res.get())  # get这里会阻塞,导致立刻计算出上一步的apply_async对象的结果

发现依然是同步结果,这是因为.get会阻塞,所以虽然异步提交,但依然是同步运行.如果想要从异步的程序里获得结果,依据一样的思路,将res放到一个列表里去,当所有的任务都结束以后,用for循环去get一下结果.

from multiprocessing import Pool
import time


def func(i):
    time.sleep(2)
    return i * i


if __name__ == '__main__':
    pool = Pool()
    res_list = []
    for j in range(10):
        res = pool.apply_async(func, args=(j,))
        res_list.append(res)
    for k in res_list:
        print(k.get())

可以看到先得到了第一批(进程池大小个数)的结果,之后是剩下的结果.
如果用map改写,由于map自带close和join,直接接收结果看看:

from multiprocessing import Pool
import time


def func(i):
    time.sleep(0.5)
    return i * i


if __name__ == '__main__':
    pool = Pool()
    res = pool.map(func,range(10))
    print(res)  # [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

map一次性会把所有的结果做成一个列表返回.在各个进程执行时间差异比较大的时候,一般还是使用async来部分取得返回结果.

回调函数

先来看例子:

from multiprocessing import Pool


def func1(n):
    print('in func')
    return n * 4


def func2(nn):
    print('in func2')
    print('in fun2',nn)


if __name__ == '__main__':
    pool = Pool(5)
    res = pool.apply_async(func1, args=(10,), callback=func2)
    print(res.get())
    pool.close()
    pool.join()

异步启动了一个进程,执行了func1,传参数10,func1执行的结果交给callback回调函数作为参数执行.注意,res这个时候拿到的依然是func1的返回值,不是回调函数的值.
回调函数不能通过进程池再传其他参数,其参数必须通过进程内调用的函数返回.回调函数是在主进程里执行.
回调函数因为是在主进程中执行,经常会在爬虫中用到.一般在爬虫中最耗时的是网络延迟,举一个例子:

import requests
from multiprocessing import Pool


def get_content(url):
    a = requests.get(url)
    return a.content.decode('utf-8'), url


def count_length(args):
    content,url = args
    print(url,len(content))
    return len(content)


if __name__ == "__main__":
    url_lst = [
        'http://www.cnblogs.com',
        'http://www.baidu.com',
        'http://www.sogou.com',
        'http://www.sohu.com'
    ]
    pool = Pool(5)
    for i in url_lst:
        pool.apply_async(get_content, args=(i,),callback=count_length)
    pool.close()
    pool.join()

这样可以显示出从各个网站取得字符串的长度.还有一个爬虫例子,暂且放着,回头来看.
到这里进程相关的内容就学习完了.知道了多进程之后,编程序可以启动同时启动多项工作来进行,如果各个进程直接需要IPC,则要小心的使用进程安全的传递方式.根据任务的分配,可以选择异步,也可以将各个子进程的结果进行同步.
这里练习比较少,因为多进程主要是附加在任务上的一种方式,需要多多练习才能掌握.