Python 多进程开发

进程的创建以及使用

一、创建进程

方式一:直接创建

​ process 模块是 multiprocessing库一个创建进程的模块,借助这个模块,就可以完成进程的创建。

​ Process([group [, target [, name [, args [, kwargs]]]]]),由该类实例化得到的对象,表示一个子进程中的任 务(尚未启动)

参数说明:

  • group 参数未使用,值始终为None
  • target 表示调用对象,即子进程要执行的任务
  • args 表示调用对象的位置参数元组,args=(1,2,’egon’,)
  • kwargs 表示调用对象的字典,kwargs={‘name’:’egon’,’age’:18}
  • name 为子进程的名称
1
2
3
4
5
6
7
8
9
10
11
# 补充:os模块的 getpid 方法用于获取当前进程的进程号,getppid 方法用于获取当前进程的父进程的进程号
from multiprocessing import Process
import os

def func(name):
print("我是一个子进程%s,我的进程号是%s,我的父进程的进程号是%s" % (name, os.getpid(), os.getppid()))

if __name__ == '__main__':
print("我是父进程,我的进程号是%s" % os.getpid())
p = Process(target=func, args=("cdc",)) # 实例化一个进程对象
p.start() # 开启一个子进程

方式二:通过类继承的方式来创建

1
2
3
4
5
6
7
8
9
10
class MyProcess(Process):
def __init__(self):
super().__init__()

def run(self):
print("开始启动子进程")

if __name__ == '__main__':
p = MyProcess()
p.start()

Process对象除了有 start 方法外,还有一个 run 方法,两者的区别是:

  • p.start():是指解释器告诉操作系统去帮我开启一个进程,至于什么时候执行,由操作系统来调度,即处于就绪状态
  • p.run():是指告诉操作系统,现在马上帮我执行这个子进程,即处于执行状态

当通过类继承的方式来创建一个子进程的时候,执行 start 方法会自动执行类中定义的 run 方法。

方式三:开启多个不同的子进程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
def func():
print("子进程 %s,父进程 %s" % (os.getpid(), os.getppid()))

if __name__ == '__main__':
print("父进程 %s" % os.getpid())
for i in range(5):
p = Process(target=func, args=())
p.start()

"""
父进程 7476
子进程 7876,父进程 7476
子进程 16044,父进程 7476
子进程 13908,父进程 7476
子进程 18332,父进程 7476
子进程 9312,父进程 7476
"""

注:在Windows操作系统中由于没有 fork (linux操作系统中创建进程的机制),在创建子进程的时候会自动 import 启动它的这个文件,而在 import 的时候又执行了整个文件。因此如果将Process()直接写在文件中就会无限递归创建子进程报错。所以必须把创建子进程的部分使用 if __name__ ==’__main__‘ 判断保护起来,import 的时候 ,就不会递归运行了。

二、进程常用方法

1
2
3
4
5
p.start():启动进程,并调用该子进程中的p.run() 
p.run():进程启动时运行的方法,正是它去调用target指定的函数,我们自定义类的类中一定要实现该方法
p.terminate():强制终止进程p,不会进行任何清理操作,如果p创建了子进程,该子进程就成了僵尸进程,使用该方法需要特别小心这种情况。如果p还保存了一个锁那么也将不会被释放,进而导致死锁
p.is_alive():如果p仍然运行,返回True
p.join([timeout]):主线程等待p终止(强调:是主线程处于等的状态,而p是处于运行的状态)。timeout是可选的超时时间,需要强调的是,p.join只能join住start开启的进程,而不能join住run开启的进程
  • is_alive 和 terminate
1
2
3
4
5
6
7
8
9
10
11
12
from multiprocessing import Process
import time

def func():
time.sleep(200) # 睡眠 200 秒

if __name__ == '__main__':
p = Process(target=func, args=()) # 创建一个子进程
p.start()
print(p.is_alive()) # 判断子进程是否存活 存活结果为True,死亡结果为False
time.sleep(10)
print(p.is_alive())
1
2
3
4
5
6
7
8
9
10
11
12
13
from multiprocessing import Process
import time

def func():
time.sleep(200) # 睡眠 200 秒

if __name__ == '__main__':
p = Process(target=func, args=()) # 创建一个子进程
p.start()
print(p.is_alive()) # True
p.terminate() # 杀死该进程
print(p.is_alive()) # True
print(p.is_alive()) # True

理论上来说,杀死子进程后得到的进程存活状态应该是 False,但是实际测试得到的还是 True,这是由于操作系统在切换和调度进程时都需要时间,而我们的代码执行的速度特别快,我们无法直接捕捉该现象,可以让程序等待一定的时间

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from multiprocessing import Process
import time

def func():
time.sleep(200) # 睡眠 200 秒

if __name__ == '__main__':
p = Process(target=func, args=()) # 创建一个子进程
p.start()
print(p.is_alive()) # True
p.terminate() # 杀死该进程
time.sleep(0.1)
print(p.is_alive()) # False
print(p.is_alive()) # False
  • join
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
from multiprocessing import Process
import time

def func():
for i in range(500):
time.sleep(0.01)
print("我是子进程")

if __name__ == '__main__':
p = Process(target=func) # 创建一个子进程
p.start()

for i in range(100):
time.sleep(0.01)
print("我是父进程")

"""
...
我是父进程
我是父进程
我是父进程
我是子进程
我是父进程
我是子进程
我是父进程
我是子进程
我是父进程
...
"""

开启一个正常的子进程,父进程会等待子进程结束后,父进程也就是程序才结束,但是并不是先执行完子进程的方法后才会去执行父进程中的方法,父进程和子进程是异步执行的。

p.join() 方法就可以使父进程和子进程变为同步执行,父进程执行到join方法时,就会阻塞住,等待子进程执行完成后再继续执行主进程中的代码。

join必须放在start()后边

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
from multiprocessing import Process
import time

def func():
for i in range(500):
time.sleep(0.01)
print("我是子进程")

if __name__ == '__main__':
p = Process(target=func) # 创建一个子进程
p.start()
p.join()

for i in range(100):
time.sleep(0.01)
print("我是父进程")

"""
...
我是子进程
我是子进程
我是子进程
我是父进程
我是父进程
我是父进程
...
"""

多个子进程和主进程实现同步

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from multiprocessing import Process
import time

def func(num):
time.sleep(0.5)
print(num ** 2)

if __name__ == '__main__':
p_l = list() # 用于保存所有的子进程对象
for i in range(10):
p = Process(target=func, args=(i,))
p.start()
p_l.append(p)

[p.join() for p in p_l] # 对所有的子进程进行join
time.sleep(0.5)
print("计算结束")

三、进程常用属性

1
2
3
4
5
p.daemon:默认值为False,如果设为True,代表p为后台运行的守护进程,当p的父进程终止时,p也随之终止,并且设定为True后,p不能创建自己的新进程,必须在p.start()之前设置
p.name:进程的名称
p.pid:进程的pid
p.exitcode:进程在运行时为None,如果为–N,表示被信号N结束(了解即可)
p.authkey:进程的身份验证键,默认是由os.urandom()随机生成的32个字符。这个键的用途是为涉及网络连接的底层进程间通信提供安全性,这类连接只有在具有相同的身份验证键时才能成功(了解即可)
  • name 和 pid 属性
1
2
3
4
5
6
7
8
9
10
11
12
from multiprocessing import Process
import os

def func():
print(f"这里是子进程{os.getpid()}")

if __name__ == '__main__':
for i in range(3):
p = Process(target=func)
p.name = f"cdc{str(i)}"
p.start()
print(f"子进程{p.name}开始执行,进程号{p.pid}")
  • daemon 属性

使用 p.daemon = True 可以将普通的子进程设置成守护进程。对于一个正常的子进程,主进程和子进程是异步的执行各自的代码的,如果主进程代码已经执行结束,子进程代码还未执行完,那么主进程就会等待所有的子进程执行结束后再结束,即结束整个程序;而对于守护进程而言,会随着主进程代码的执行结束而立即结束

注意:守护进程是随着主进程的代码的结束而结束,而不是随着主进程的结束而结束。换句话说,正常执行中主进程永远是最后结束的,哪怕自己的代码已经全部执行完了,也不会结束,而是阻塞等待所有的子进程执行完了再结束,即结束整个程序。但只要主进程中的代码执行完了,守护进程肯定就跟着结束了。守护进程必须在 start 之前设置。

守护进程的三个特点:

  1. 守护进程会随着父进程代码的结束而结束

               2. 守护进程自身不能再创建子进程
               3. 守护进程必须要在start之前设置
    
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
from multiprocessing import Process
import time

def func1():
print("这是孙子")

def func():
print("这是儿子")
time.sleep(1)
p = Process(target=func1)
p.start()

if __name__ == '__main__':
p = Process(target=func)
p.daemon = True # 设置守护进程
p.start()
print(p.daemon)
time.sleep(5)
print("这是爸爸")
# AssertionError: daemonic processes are not allowed to have children 守护进程不允许有子进程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# 没有设置守护进程
from multiprocessing import Process
import time

def func():
time.sleep(10)
print("子进程代码结束")

if __name__ == '__main__':
p = Process(target=func)
p.start()
time.sleep(3)
print("父进程代码执行结束")
# 父进程三秒后就将代码全部执行完成了,阻塞等待子进程结束,最后载结束程序,结果为
"""
父进程代码执行结束
子进程代码结束
Process finished with exit code 0
"""


# 设置守护进程
from multiprocessing import Process
import time

def func():
time.sleep(10)
print("子进程代码结束")

if __name__ == '__main__':
p = Process(target=func)
p.daemon = True
p.start()

time.sleep(3)
print("父进程代码执行结束")
# 父进程三秒后就将代码全部执行完成了,此时守护进程跟随父进程代码的执行结束而结束,所以结果为
"""
父进程代码执行结束
Process finished with exit code 0
"""
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 利用守护进程编写计时器
# 计时5秒

from multiprocessing import Process
import time

def func():
for i in range(10):
time.sleep(1)
print(time.strftime('%H:%M:%S'))

if __name__ == '__main__':
p = Process(target=func)
p.daemon = True # 将p进程设置为守护进程,必须要在start之前设置
p.start()
time.sleep(5)
print('这是爸爸')

进程间通信

一、进程间无法共享内存

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from multiprocessing import Process

def func(num):
num -= 1

if __name__ == '__main__':
num = 100
p_l = list()
for i in range(5):
p = Process(target=func, args=(num,))
p.start()
p_l.append(p)
[p.join() for p in p_l]

print(num) # 100

以上代码创建了五个子进程,分别对全局变量 num 进行了减1的操作,理论上来说,num 最后的值应该是95,然而测试得到的结果还是100,这是因为每次执行子进程实际相当于重新开辟了一个空间,将子进程的代码和相关的变量都拷贝一份过去再操作,所以不管子进程中对全局的变量进行什么操作,都不会影响原本内存中变量对应的值,这也就是多进程之间无法共享内存的原因。

二、进程间直接的数据共享

虽然进程间数据独立,但可以通过Manager和Value实现数据共享

  • Manager
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from multiprocessing import Process, Manager

def func(lst):
lst[0] = "aaa"
print("子进程中的列表", lst) # 子进程中的列表 ['aaa', 22, 33]

if __name__ == '__main__':
m = Manager()
lst = m.list([11, 22, 33]) # 创建一个共享的列表数据
# dic = m.dict({"name": "cdc"}) # 创建一个共享的字典数据
print("父进程中一开始的列表", lst) # 父进程中一开始的列表 [11, 22, 33]
p = Process(target=func, args=(lst,))
p.start()
p.join()
print("父进程中后来的列表", lst) # 父进程中后来的列表 ['aaa', 22, 33]
  • Value
1
2
3
4
5
6
7
8
9
10
11
12
13
14
from multiprocessing import Process, Value

def func(num):
num.value -= 1
print("子进程中的num值", num.value) # 子进程中的num值 99

if __name__ == '__main__':
num = Value("i", 100) # 创建一个数值为100的共享整型数据

print("父进程中一开始的num", num.value) # 父进程中一开始的num 100
p = Process(target=func, args=(num,))
p.start()
p.join()
print("父进程中后来的num", num.value) # 父进程中后来的num 99

除了直接实现数据共享外,multiprocessing 中可以通过以下机制来实现进程间通信(IPC,inter process Communication)

三、锁机制

多个进程同时共享内存时会出现数据错乱的问题,例如如果多个进程同时向一个文件中写入,可能会出现一个进程刚写完还未来得及将写好的内容保存,另外一个进程已经将文件关闭了,此时第一个进程所写入的内容就丢失了。因此,我们可以给这个文件上一把锁,如果当前有进程正在操作文件,那么就不允许其他的进程再来操作这个文件了,只能等当前的进程操作结束后,再把锁打开,其他进程才能来使用文件。重复上述步骤,就不会出现数据错乱丢失的问题了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from multiprocessing import Lock

l = Lock() # 实例化一个锁
l.acquire() # 相当于拿走钥匙,锁上门,不允许其他人进来
l.release() # 释放锁,还钥匙,开门,允许其他人操作

# 如果加上锁后一直不释放,程序会阻塞等待,直到有锁被释放
from multiprocessing import Lock

l = Lock()
l.acquire()
print("aaaa")
l.acquire()
print("bbbb")
# 程序输出 aaaa 后一直阻塞等待
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 多进程操作同一个文件
from multiprocessing import Lock, Process
import os

def hand_file(l):
l.acquire() # 给文件加锁,不允许别的进程访问
with open(file="a.txt", mode="a+", encoding="utf-8") as f:
f.write(f"我是子进程{os.getpid()}" + "\n")
l.release() # 释放锁,允许其他进程操作文件

if __name__ == '__main__':
l = Lock()
for i in range(5):
p = Process(target=hand_file, args=(l,))
p.start()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# 模拟银行卡存钱取钱
from multiprocessing import Lock, Process, Value
import time

# 取钱
def get_money(l, money):
l.acquire() # 取钱的时候不允许别人对同一个账号存钱
for i in range(6):
money.value -= 100
time.sleep(0.5)
l.release() # 取完钱了,可以让别人存钱了

# 存钱
def save_money(l, money):
l.acquire() # 存钱的时候不允许别人对同一个账号取钱
for i in range(3):
money.value += 100
time.sleep(0.5)
l.release() # 存完钱了,可以让别人取钱了

if __name__ == '__main__':
money = Value("i", 1000) # 定义 1000块钱
l = Lock() # 实例化一个锁
p1 = Process(target=get_money, args=(l, money)) # 该进程用于取钱操作
p1.start()
p2 = Process(target=save_money, args=(l, money)) # 该进程用于存钱操作
p2.start()
p1.join()
p2.join()
print(money.value)

进程中还会存在死锁情况,需要借助递归锁来解决,由于死锁和递归锁原理和多线程相同,此处在多线程中再详细介绍。

四、信号量机制

信号量机制相当于是一把锁配好几把钥匙。信号量机制比锁机制多了一个计数器,这个计数器是用来记录当前剩余几把钥匙的。对于计数器来说,每acquire一次,计数器内部就减1,release一次,计数器就加1,当计数器为0时,表示没有钥匙了,此时acquire()处于阻塞,直到有钥匙归还才解除阻塞。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from multiprocessing import Semaphore

sem = Semaphore(5) # 参数为整型,表示可以有多少把钥匙,此处表示有5把
sem.acquire() # 拿走钥匙,锁上门
print(1111)
sem.acquire() # 拿走钥匙,锁上门
print(2222)
sem.acquire() # 拿走钥匙,锁上门
print(3333)
sem.acquire() # 拿走钥匙,锁上门
print(4444)
sem.acquire() # 拿走钥匙,锁上门
print(5555)
sem.acquire() # 此时钥匙已经被前面全部拿完了,acquire会阻塞在这里,直到有人归还钥匙
print(6666)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 多人同时就餐示例
# 一共有五个餐位,后来的人要等前面有空位才能就餐
from multiprocessing import Semaphore, Process
import random, time

def eat(i, sem):
sem.acquire() # 占一个位置吃饭
print(f"第{str(i)}位客人准备吃饭了")
time.sleep(random.uniform(1, 5))
print(f"第{str(i)}位客人吃完了")
sem.release() # 吃完了,把位置腾出来给别人吃饭

if __name__ == '__main__':
sem = Semaphore(5) # 初始化了一把锁5把钥匙,也就是说允许5个人同时进入餐厅,
# 之后其他人必须等待,等有人从餐厅出来,还了钥匙,才能允许后边的人进入
# 来了20位客人
for i in range(20):
p = Process(target=eat, args=(i, sem))
p.start()

五、事件机制

事件机制 Event 对象主要有以下几种方法:

  1. is_set():is_set 的值是布尔类型,事件机制通过该值来判断 wait 方法是否应该处于阻塞状态,is_set 的值为 True,则 wait 处于非阻塞状态,否则处于阻塞状态
  2. wait():判断is_set的值,如果为True,则非阻塞;值为False,则阻塞
  3. set():将is_set设为True
  4. clear():将is_set设为False
1
2
3
4
5
6
7
8
9
10
11
12
from multiprocessing import Process, Event

e = Event() # 实例化一个事件对象
print(e.is_set()) # False 此时wait应该是阻塞的
e.set()
print(e.is_set()) # True 此时wait是非阻塞的
e.wait()
print("aaaa")
e.clear()
print(e.is_set()) # False 此时wait是阻塞的,wait后面的代码无法继续执行
e.wait()
print("bbbb")
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# 使用事件机制模拟信号灯场景
from multiprocessing import Process, Event
import time

# 信号灯函数
def traffic_light(e):
# 交通等得一直亮着,要么是红灯,要么是绿灯
while True:
if e.is_set(): # 如果is_set是True,那么就是绿灯,wait非阻塞,汽车就通过
time.sleep(5) # 绿的亮5秒
print("红灯亮了") # 5秒后,切换到红灯
e.clear() # 将is_set的值设为False,wait阻塞,汽车无法通过
else:
time.sleep(3) # 红的亮三秒
print("绿灯亮了") # 5秒后,切换到绿灯
e.set() # 将is_set的值设为True,wait非阻塞,汽车可以通过

# 车的函数
def Car(e, i):
e.wait()
print(f"第{str(i)}辆车通过")

if __name__ == '__main__':
e = Event()
light = Process(target=traffic_light, args=(e,))
light.start()
# 定义50辆车
for i in range(50):
if i % 3 == 0: # 为了测试效果明显,控制一下汽车生产的速度
time.sleep(5)
p = Process(target=Car, args=(e, i))
p.start()

六、队列

  • 队列:先进先出(FIFO,First In First Out)
  • 栈:先进后出(FILO,First In Last Out)

创建共享的进程队列,Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递,队列在进程间通信是安全的。队列有以下常用方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
q.get( [ block [ ,timeout ] ] ) 
返回q中的一个项目。如果q为空,此方法将阻塞,直到队列中有项目可用为止。block用于控制阻塞行为,默认为True. 如果设置为False,将引发Queue.Empty异常(定义在Queue模块中)。timeout是可选超时时间,用在阻塞模式中。如果在指定的时间间隔内没有项目可用,将引发Queue.Empty异常。

q.get_nowait( )
q.get(False)方法。

q.put(item [, block [,timeout ] ] )
将item放入队列。如果队列已满,此方法将阻塞至有空间可用为止。block控制阻塞行为,默认为True。如果设置为False,将引发Queue.Full异常(定义在Queue库模块中)。timeout指定在阻塞模式中等待可用空间的时间长短。超时后将引发Queue.Full异常。

q.qsize()
返回队列中目前项目的正确数量。此函数的结果并不可靠,因为在返回结果和在稍后程序中使用结果之间,队列中可能添加或删除了项目。在某些系统上,此方法可能引发NotImplementedError异常。


q.empty()
如果调用此方法时 q为空,返回True。如果其他进程或线程正在往队列中添加项目,结果是不可靠的。也就是说,在返回和使用结果之间,队列中可能已经加入新的项目。

q.full()
如果q已满,返回为True. 由于线程的存在,结果也可能是不可靠的(参考q.empty()方法)

q.close()
关闭队列,防止队列中加入更多数据。调用此方法时,后台线程将继续写入那些已入队列但尚未写入的数据,但将在此方法完成时马上关闭。如果q被垃圾收集,将自动调用此方法。关闭队列不会在队列使用者中生成任何类型的数据结束信号或异常。例如,如果某个使用者正被阻塞在get()操作上,关闭生产者中的队列不会导致get()方法返回错误。

q.cancel_join_thread()
不会再进程退出时自动连接后台线程。这可以防止join_thread()方法阻塞。

q.join_thread()
连接队列的后台线程。此方法用于在调用q.close()方法后,等待所有队列项被消耗。默认情况下,此方法由不是q的原始创建者的所有进程调用。调用q.cancel_join_thread()方法可以禁止这种行为。
1
2
3
4
5
6
7
8
9
10
# put
from multiprocessing import Queue

q = Queue(3) # 实例化一个队列,参数为规定队列中最多可以存放的数据个数
q.put(123)
q.put("abc")
q.put([1, 2, 3])
print("到此为止队列已经存满了") # 队列已经存放了三个数据了,满了
# q.put(236) # 此时 q.put(236) 会阻塞等待,知道队列空出一个位置,即有数据被取出来
q.put(236, False) # 此时不会阻塞,直接引发 queue.Full 错误
1
2
3
4
5
6
7
8
9
10
11
12
13
14
# get
from multiprocessing import Queue

q = Queue(3) # 实例化一个队列,参数为规定队列中最多可以存放的数据个数
q.put(123)
q.put("abc")
q.put([1, 2, 3])

print(q.get()) # 123
print(q.get()) # abc
print(q.get()) # [1, 2, 3]
print("此时队列已经全部取完了")
# print(q.get()) # 队列中已经没有值了,此时get会阻塞等待,直到队列中有值
print(q.get(False)) # 将block设置为False后不会阻塞,直接引发queue.Empty错误
1
2
3
4
5
6
7
8
9
# put_nowait
from multiprocessing import Queue

q = Queue(3) # 实例化一个队列,参数为规定队列中最多可以存放的数据个数
q.put(123)
q.put("abc")
q.put([1, 2, 3])
print("到此为止队列已经存满了")
q.put_nowait(236) # 不会阻塞,直接引发queue.Full异常,等同于 q.put(236, False)
1
2
3
4
5
6
7
8
9
10
11
12
13
# get_nowait
from multiprocessing import Queue

q = Queue(3) # 实例化一个队列,参数为规定队列中最多可以存放的数据个数
q.put(123)
q.put("abc")
q.put([1, 2, 3])

print(q.get()) # 123
print(q.get()) # abc
print(q.get()) # [1, 2, 3]
print("此时队列已经全部取完了")
print(q.get_nowait()) # 不会阻塞,直接引发queue.Empty异常,等同于 q.get(False)
1
2
3
4
5
6
7
8
9
10
11
# 进程中使用队列
from multiprocessing import Queue,Process
def func(q):
q.put('我是四川的')

if __name__ == '__main__':
q = Queue(5)
p = Process(target=func,args=(q,))
p.start()
# print(q.get_nowait()) # 此处,可能会报错,因为子进程和父进程同时运行,不一定队列中有数据
print(q.get())# 此处一定不会报错,因为get是阻塞获取数据,如果队列没有就等着
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# 使用队列q对象调用get函数来取得队列中最先进入的数据

from multiprocessing import Process, Queue,freeze_support
import random
import os

def put_func(q):
info = str(os.getpid()) + '\t:\t' + str(random.randint(0, 100))
q.put(info)

def get_func(q):
print('%s 获取到数据 :\033[33m; %s \033[0m' % (os.getpid(), q.get()))

if __name__ == '__main__':
# freeze_support() 如果有windows系统开启多进程导致程序崩溃,可尝试调用此函数
q = Queue(5)
l_put = []
l_get = []
for i in range(10):
p_put = Process(target=put_func, args=(q,))
p_put.start()
l_put.append(p_put)

for i in range(10):
p_get = Process(target=get_func, args=(q,))
p_get.start()
l_put.append(p_get)

[i.join() for i in l_put]
[i.join() for i in l_get]

七、JoinableQueue 可连接的队列

JoinableQueue常用于解决生产者消费者模型问题,它是继承Queue的,所以可以使用Queue中的方法并且JoinableQueue又多了两个方法:

  • q.join() 用于生产者。该方法会等待 q.task_done的返回结果,通过返回结果,生产者就能获得消费者当前消费了多少个数据
  • q.task_done() 用于消费者,是指每消费队列中一个数据,就给join返回一个标识。

假设生产者生产了100个数据,join就先记录下100这个数据。每次消费者消费一个数据,task_done就会返回一个标识,当生产者(join)接收到100个消费者返回来的标识的时候,生产者就能知道消费者已经把所有数据都消费完了。

八、管道

管道是用于多进程之间通信的一种方式。管道是不安全的。管道在实例化对象时,会产生两个端口,即 con1,con2 = Pipe()

  • 在单进程中使用管道,如果con1负责接收数据,con2就负责发送数据;如果con2负责接收数据,con1就负责发送数据
1
2
3
4
5
6
7
8
9
from multiprocessing import Pipe

con1, con2 = Pipe()

con1.send("abc")
print(con2.recv())

con2.send(123)
print(con1.recv())
  • 在多进程中使用管道:如果父进程使用con1收,那么子进程就必须使用con2发;
    如果父进程使用con1发,那么子进程就必须使用con2收;
    如果父进程使用con2收,那么子进程就必须使用con1发;
    如果父进程使用con2发,那么子进程就必须使用con1收;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
from multiprocessing import Process, Pipe

def func(con1, con2):
con1, con2 = con1, con2
con1.close() # 子进程中使用con2和主进程交互,所以con1可以关闭了
while True:
try:
print(con2.recv()) # 当主进程的con1发数据时,子进程要死循环的去接收。
except: # 如果主进程的con1发完数据并关闭con1,子进程的con2继续接收时,就会报错,使用try的方式,获取错误
con2.close() # 获取到错误,就是指子进程已经把管道中所有数据都接收完了,所以用这种方式去关闭管道
break


if __name__ == '__main__':
con1, con2 = Pipe()
p = Process(target=func, args=(con1, con2))
p.start()
con2.close() # 主进程中使用con1和子进程交互,所以con2可以关闭了
for i in range(10): # 生产数据
con1.send(i) # 给子进程的con2发送数据
con1.close() # 生产完数据,关闭父进程这一端的管道

生产者消费者模型

在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题。该模式通过平衡生产进程和消费进程的工作能力来提高程序的整体处理数据的速度。举个应用栗子:全栈开发时候,前端接收客户请求,后端处理请求逻辑。当某时刻客户请求过于多的时候,后端处理不过来,此时完全可以借助队列来辅助,将客户请求放入队列中,后端逻辑代码处理完一批客户请求后马上从队列中继续获取,这样平衡两端的效率。

为什么要使用生产者和消费者模式

在进程世界里,生产者就是生产数据的进程,消费者就是消费数据的进程。在多进程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。

什么是生产者消费者模式

生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。

基于队列实现的生产者消费者模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# 第一版
from multiprocessing import Process, Queue

# 消费者函数
def consumer(q):
while True:
info = q.get()
print("我吃到了" + info)

# 生产者函数
def productor(q):
for i in range(20):
info = f"包子{str(i+1)}号"
q.put(info)

if __name__ == '__main__':
q = Queue(5)
pro_con = Process(target=consumer, args=(q,))
pro_con.start()
pro_pro = Process(target=productor, args=(q,))
pro_pro.start()

第一版代码中存在这样一个问题,生产者生产完数据后正常结束代码了,但是消费者会不断的从队列中取数据,如果获取不到,get 方法就会阻塞,导致整个程序无法结束,所以我们要修改一下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# 第二版
from multiprocessing import Process, Queue
import time

# 消费者函数
def consumer(q):
while True:
try:
info = q.get_nowait()
print("我吃到了" + info)
except:
print("包子取完了")
break

# 生产者函数
def productor(q):
for i in range(20):
time.sleep(0.5)
info = f"包子{str(i + 1)}号"
print(info)
q.put(info)

if __name__ == '__main__':
q = Queue(20)
pro_con = Process(target=consumer, args=(q,))
pro_pro = Process(target=productor, args=(q,))
pro_con.start()
pro_pro.start()

第二版在消费者函数中加了一个判断,如果不能取到数据,就退出。但是这个逻辑是有问题的,如果生产者的速度比较慢,数据还没来及放到队列中时,消费者已经去取了,发现没取到就判断为生产者已经全部生产完了,自己就退出了。而实际上生产者还在不断的生产数据,所以为了保险起见,生产者在生产完所有的数据后,可以在队列中加一个结束的标识,告诉消费者我已经生产结束了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# 第三版
from multiprocessing import Process, Queue

# 消费者函数
def consumer(q):
while True:
info = q.get()
if info is None:
print("包子取完了")
break
else:
print("我吃到了" + info)

# 生产者函数
def productor(q):
for i in range(20):
info = f"包子{str(i + 1)}号"
q.put(info)
q.put(None) # 生产结束了,放入一个结束的标志来提醒消费者


if __name__ == '__main__':
q = Queue(5)
pro_con = Process(target=consumer, args=(q,))
pro_pro = Process(target=productor, args=(q,))
pro_con.start()
pro_pro.start()

这次可以完美实现了,但是这是只有一个生产者和一个消费者的情况,如果有一个生产者和多个消费者,生产者放入的结束标识被某一个消费者拿到后,后面几个消费者就没有办法再次拿到了,所以还是会阻塞,因此,有多少个消费者就应该有多少个结束标识,再次基础上我们再来优化一下代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# 终极版
from multiprocessing import Process, Queue

# 消费者函数
def consumer(q):
while True:
info = q.get()
if info is None:
break
else:
print("我吃到了" + info)

# 生产者函数
def productor(q):
for i in range(20):
info = f"包子{str(i + 1)}号"
q.put(info)

if __name__ == '__main__':
q = Queue(5)
pro_con1 = Process(target=consumer, args=(q,))
pro_con2 = Process(target=consumer, args=(q,))
pro_con3 = Process(target=consumer, args=(q,))

pro_pro1 = Process(target=productor, args=(q,))
pro_pro2 = Process(target=productor, args=(q,))

pro_con1.start()
pro_con2.start()
pro_con3.start()
pro_pro1.start()
pro_pro2.start()

# 我们可以把添加结束标识放在主进程中操作,主进程必须等生产者生产完后再添加标识
pro_pro1.join()
pro_pro2.join()

# 有几个消费者就添加几个标识
q.put(None)
q.put(None)
q.put(None)

基于可连接队列实现的生产者消费者模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
from multiprocessing import JoinableQueue, Process

def consumer(q):
while True:
info = q.get()
print("我吃到了" + info)
q.task_done() # 没吃到一个包子就告诉生产者一下,生产者的计数器(join)就会自减 1
def productor(q):
for i in range(20):
info = f"包子{str(i + 1)}号"
q.put(info)
q.join() # 此时生产者会将生产的包子的数目,即20,记录下来,阻塞等待消费者消费完队列中所有的数据(即join计数为0)再解除阻塞

if __name__ == '__main__':
q = JoinableQueue(10)
p_pro = Process(target=productor, args=(q,))
p_con = Process(target=consumer, args=(q,))

# 将消费者设置成守护进程
p_con.daemon = True

p_pro.start()
p_con.start()

# 主进程等待生产者全部结束再结束
p_pro.join()

实现的逻辑是这样的,首先生产者由于q.join的缘故,要等到消费者的task_done不断返回,等 join 计数变为0的时候,生产者才能结束;主进程由于p_pro.join()的缘故,必须等待生产者结束,自身的代码才能执行结束,而一旦主进程的代码执行结束了,消费者这个守护进程就会跟着结束,因此就不会存在消费者循环阻塞的问题了。

进程池

进程池实际上就是一个池子,它会帮程序员去管理池中的进程。进程池里边有固定数量的进程。这些进程一直处于待命状态,一旦有任务来,马上就有进程去处理。因为在实际业务中,任务量是有多有少的,如果任务量特别的多,不可能要开对应那么多的进程数,其次也不是开的进程越多,效率越高。开启那么多进程首先就需要消耗大量的时间让操作系统来为你管理它。其次还需要消耗大量时间让 CPU 帮你调度它。一边拿来说,进程数量是 CPU 核数 + 1 时,对CPU的利用率是最高的。

一、使用进程池的三个方法

map(func,iterable) 方法

  • func:进程池中的进程执行的任务函数
  • iterable:可迭代对象,是把可迭代对象中的每个元素依次传给任务函数当参数
1
2
3
4
5
6
7
8
9
10
11
12
13
from multiprocessing import Pool

def func(num):
num += 1
print(num)
return num

if __name__ == '__main__':
p = Pool(5)
res = p.map(func, [i for i in range(100)])
p.close()
p.join()
print('主进程中map的返回值', res)

apply(func,args=()) 方法

  • func:进程池中的进程执行的任务函数
  • args:可迭代对象型的参数,是传给任务函数的参数

该方法为同步方法,即根据进程池中的进程数量接入对应数量的任务,但是进程是一个一个执行的。同步处理任务时,不需要close和join,进程池中的所有进程是普通进程,即主进程需要等待其执行结束。

1
2
3
4
5
6
7
8
9
10
11
12
13
from multiprocessing import Pool
import os

def func(num):
num += 1
return num

if __name__ == '__main__':
pool = Pool(os.cpu_count() + 1)
for i in range(100): # 同步处理这100个任务,同步是指,哪怕我进程中有5个进程,也依旧是1个进程1个进程的去执行任务
res = pool.apply(func,args=(i,))
print(res)

apply_async(func,args=(),callback=None) 方法

  • func:进程池中的进程执行的任务函数
  • args:可迭代对象型的参数,是传给任务函数的参数
  • callback:回调函数,就是说每当进程池中有进程处理完任务了,返回的结果可以交给回调函数,由回调函数进行进一步的处理,回调函数只有异步才有,同步是没有的

异步处理任务时,进程池中的所有进程是守护进程(主进程代码执行完毕守护进程就结束),所以异步处理任务时,必须要加上close和join,同步处理不需要

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from multiprocessing import Pool
import time

def func(num):
num += 1
return num

if __name__ == '__main__':
p = Pool(5)
start = time.time()
l = []
for i in range(10000):
# 异步处理这100个任务,异步是指,进程中有5个进程,一下就处理5个任务,接下来哪个进程处理完任务了,就马上去接收下一个任务
res = p.apply_async(func,args=(i,)) # 得到的结果是一个AsyncResul的实例obj,先将结果放入列表
l.append(res)
p.close() # 关闭进程池,防止还有其他的任务过来
p.join() # 阻塞等待进程池中的子进程执行结束
print(time.time() - start)

[print(i.get()) for i in l]

# 异步机制执行的结果是AsyncResul的实例obj,需要通过get方法得到实际结果;同步机制没有此方法,同步机制能直接拿到实际结果。
# 其实get是阻塞等待的,也就是说,如果没有上边的close和join,主进程一样会阻塞在get等待进程池中给返回结果。进程池异步执行任务获取结果,每次有一个进程返回结果后,就能get到一个结果,然后for循环到下一次继续阻塞等待拿结果,所以为了不影响执行效率,通常是将进程池执行的初始结果统一保存,然后统一取值

二、回调函数的使用

进程的任务函数的返回值,被当成回调函数的形参接收到,以此进行进一步的处理操作。回调函数是由主进程调用的,而不是子进程,子进程只负责把结果传递给回调函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
from multiprocessing import Pool
import requests
import time,os

def func(url):
res = requests.get(url)
print('子进程的pid:%s,父进程的pid:%s'%(os.getpid(),os.getppid()))
# print(res.text)
if res.status_code == 200:
return url,res.text

def record_result(res):
url,text = res
print('回调函数的pid', os.getpid()) # 所有打印出来的回调函数的pid都和主进程是一样的,所以回调函数都是由主进程负责调用的
with open('a.txt','a',encoding='utf-8') as f:
f.write(url + text)
# print('回调函数中!',url)

if __name__ == '__main__':
p = Pool(5)
l = ['https://www.baidu.com',
'http://www.jd.com',
'http://www.taobao.com',
'http://www.mi.com',
'http://www.cnblogs.com',
'https://www.bilibili.com',
]
print('主进程的pid',os.getpid())
for i in l:
p.apply_async(func, args=(i,),callback=record_result)#
# 异步执行任务func,每有一个进程执行完任务后,在func中return一个结果,结果会自动的被callback指定的函数,当成形式参数来接收到
p.close()
p.join()

Python 多进程开发
https://clark-cdc.github.io/2019/05/13/0015-多进程/
作者
clark
发布于
2019年5月13日
许可协议