进程的创建以及使用 一、创建进程 方式一:直接创建
process 模块是 multiprocessing库一个创建进程的模块,借助这个模块,就可以完成进程的创建。
Process([group [, target [, name [, args [, kwargs]]]]]),由该类实例化得到的对象,表示一个子进程中的任 务(尚未启动)
参数说明:
group 参数未使用,值始终为None
target 表示调用对象,即子进程要执行的任务
args 表示调用对象的位置参数元组,args=(1,2,’egon’,)
kwargs 表示调用对象的字典,kwargs={‘name’:’egon’,’age’:18}
name 为子进程的名称
1 2 3 4 5 6 7 8 9 10 11 from multiprocessing import Processimport osdef func (name ): print ("我是一个子进程%s,我的进程号是%s,我的父进程的进程号是%s" % (name, os.getpid(), os.getppid()))if __name__ == '__main__' : print ("我是父进程,我的进程号是%s" % os.getpid()) p = Process(target=func, args=("cdc" ,)) p.start()
方式二:通过类继承的方式来创建
1 2 3 4 5 6 7 8 9 10 class MyProcess (Process ): def __init__ (self ): super ().__init__() def run (self ): print ("开始启动子进程" )if __name__ == '__main__' : p = MyProcess() p.start()
Process对象除了有 start 方法外,还有一个 run 方法,两者的区别是:
p.start():是指解释器告诉操作系统去帮我开启一个进程,至于什么时候执行,由操作系统来调度,即处于就绪状态
p.run():是指告诉操作系统,现在马上帮我执行这个子进程,即处于执行状态
当通过类继承的方式来创建一个子进程的时候,执行 start 方法会自动执行类中定义的 run 方法。
方式三:开启多个不同的子进程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 def func (): print ("子进程 %s,父进程 %s" % (os.getpid(), os.getppid()))if __name__ == '__main__' : print ("父进程 %s" % os.getpid()) for i in range (5 ): p = Process(target=func, args=()) p.start() """ 父进程 7476 子进程 7876,父进程 7476 子进程 16044,父进程 7476 子进程 13908,父进程 7476 子进程 18332,父进程 7476 子进程 9312,父进程 7476 """
注:在Windows操作系统中由于没有 fork (linux操作系统中创建进程的机制),在创建子进程的时候会自动 import 启动它的这个文件,而在 import 的时候又执行了整个文件。因此如果将Process()直接写在文件中就会无限递归创建子进程报错。所以必须把创建子进程的部分使用 if __name__ ==’__main__‘ 判断保护起来,import 的时候 ,就不会递归运行了。
二、进程常用方法 1 2 3 4 5 p .start ():启动进程,并调用该子进程中的p .run () p .run ():进程启动时运行的方法,正是它去调用target指定的函数,我们自定义类的类中一定要实现该方法 p .terminate ():强制终止进程p ,不会进行任何清理操作,如果p 创建了子进程,该子进程就成了僵尸进程,使用该方法需要特别小心这种情况。如果p 还保存了一个锁那么也将不会被释放,进而导致死锁p .is_alive ():如果p 仍然运行,返回Truep .join ([timeout] ):主线程等待p 终止(强调:是主线程处于等的状态,而p 是处于运行的状态)。timeout是可选的超时时间,需要强调的是,p .join 只能join住start开启的进程,而不能join住run开启的进程
1 2 3 4 5 6 7 8 9 10 11 12 from multiprocessing import Processimport timedef func (): time.sleep(200 ) if __name__ == '__main__' : p = Process(target=func, args=()) p.start() print (p.is_alive()) time.sleep(10 ) print (p.is_alive())
1 2 3 4 5 6 7 8 9 10 11 12 13 from multiprocessing import Processimport timedef func (): time.sleep(200 ) if __name__ == '__main__' : p = Process(target=func, args=()) p.start() print (p.is_alive()) p.terminate() print (p.is_alive()) print (p.is_alive())
理论上来说,杀死子进程后得到的进程存活状态应该是 False,但是实际测试得到的还是 True,这是由于操作系统在切换和调度进程时都需要时间,而我们的代码执行的速度特别快,我们无法直接捕捉该现象,可以让程序等待一定的时间
1 2 3 4 5 6 7 8 9 10 11 12 13 14 from multiprocessing import Processimport timedef func (): time.sleep(200 ) if __name__ == '__main__' : p = Process(target=func, args=()) p.start() print (p.is_alive()) p.terminate() time.sleep(0.1 ) print (p.is_alive()) print (p.is_alive())
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 from multiprocessing import Processimport timedef func (): for i in range (500 ): time.sleep(0.01 ) print ("我是子进程" )if __name__ == '__main__' : p = Process(target=func) p.start() for i in range (100 ): time.sleep(0.01 ) print ("我是父进程" ) """ ... 我是父进程 我是父进程 我是父进程 我是子进程 我是父进程 我是子进程 我是父进程 我是子进程 我是父进程 ... """
开启一个正常的子进程,父进程会等待子进程结束后,父进程也就是程序才结束,但是并不是先执行完子进程的方法后才会去执行父进程中的方法,父进程和子进程是异步执行的。
p.join() 方法就可以使父进程和子进程变为同步执行,父进程执行到join方法时,就会阻塞住,等待子进程执行完成后再继续执行主进程中的代码。
join必须放在start()后边
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 from multiprocessing import Processimport timedef func (): for i in range (500 ): time.sleep(0.01 ) print ("我是子进程" )if __name__ == '__main__' : p = Process(target=func) p.start() p.join() for i in range (100 ): time.sleep(0.01 ) print ("我是父进程" ) """ ... 我是子进程 我是子进程 我是子进程 我是父进程 我是父进程 我是父进程 ... """
多个子进程和主进程实现同步
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 from multiprocessing import Processimport timedef func (num ): time.sleep(0.5 ) print (num ** 2 )if __name__ == '__main__' : p_l = list () for i in range (10 ): p = Process(target=func, args=(i,)) p.start() p_l.append(p) [p.join() for p in p_l] time.sleep(0.5 ) print ("计算结束" )
三、进程常用属性 1 2 3 4 5 p .daemon :默认值为False,如果设为True,代表p 为后台运行的守护进程,当p 的父进程终止时,p 也随之终止,并且设定为True后,p 不能创建自己的新进程,必须在p .start ()之前设置p .name :进程的名称p .pid :进程的pidp .exitcode :进程在运行时为None ,如果为–N,表示被信号N结束(了解即可)p .authkey :进程的身份验证键,默认是由os.urandom ()随机生成的32 个字符。这个键的用途是为涉及网络连接的底层进程间通信提供安全性,这类连接只有在具有相同的身份验证键时才能成功(了解即可)
1 2 3 4 5 6 7 8 9 10 11 12 from multiprocessing import Processimport osdef func (): print (f"这里是子进程{os.getpid()} " )if __name__ == '__main__' : for i in range (3 ): p = Process(target=func) p.name = f"cdc{str (i)} " p.start() print (f"子进程{p.name} 开始执行,进程号{p.pid} " )
使用 p.daemon = True 可以将普通的子进程设置成守护进程。对于一个正常的子进程,主进程和子进程是异步的执行各自的代码的,如果主进程代码已经执行结束,子进程代码还未执行完,那么主进程就会等待所有的子进程执行结束后再结束,即结束整个程序;而对于守护进程而言,会随着主进程代码的执行结束而立即结束 。
注意:守护进程是随着主进程的代码的结束而结束,而不是随着主进程的结束而结束。换句话说,正常执行中主进程永远是最后结束的,哪怕自己的代码已经全部执行完了,也不会结束,而是阻塞等待所有的子进程执行完了再结束,即结束整个程序。但只要主进程中的代码执行完了,守护进程肯定就跟着结束了。守护进程必须在 start 之前设置。
守护进程的三个特点:
守护进程会随着父进程代码的结束而结束
2. 守护进程自身不能再创建子进程
3. 守护进程必须要在start之前设置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 from multiprocessing import Processimport timedef func1 (): print ("这是孙子" )def func (): print ("这是儿子" ) time.sleep(1 ) p = Process(target=func1) p.start()if __name__ == '__main__' : p = Process(target=func) p.daemon = True p.start() print (p.daemon) time.sleep(5 ) print ("这是爸爸" )
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 from multiprocessing import Processimport timedef func (): time.sleep(10 ) print ("子进程代码结束" )if __name__ == '__main__' : p = Process(target=func) p.start() time.sleep(3 ) print ("父进程代码执行结束" )""" 父进程代码执行结束 子进程代码结束 Process finished with exit code 0 """ from multiprocessing import Processimport timedef func (): time.sleep(10 ) print ("子进程代码结束" )if __name__ == '__main__' : p = Process(target=func) p.daemon = True p.start() time.sleep(3 ) print ("父进程代码执行结束" )""" 父进程代码执行结束 Process finished with exit code 0 """
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 from multiprocessing import Processimport timedef func (): for i in range (10 ): time.sleep(1 ) print (time.strftime('%H:%M:%S' ))if __name__ == '__main__' : p = Process(target=func) p.daemon = True p.start() time.sleep(5 ) print ('这是爸爸' )
进程间通信 一、进程间无法共享内存 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 from multiprocessing import Processdef func (num ): num -= 1 if __name__ == '__main__' : num = 100 p_l = list () for i in range (5 ): p = Process(target=func, args=(num,)) p.start() p_l.append(p) [p.join() for p in p_l] print (num)
以上代码创建了五个子进程,分别对全局变量 num 进行了减1的操作,理论上来说,num 最后的值应该是95,然而测试得到的结果还是100,这是因为每次执行子进程实际相当于重新开辟了一个空间,将子进程的代码和相关的变量都拷贝一份过去再操作,所以不管子进程中对全局的变量进行什么操作,都不会影响原本内存中变量对应的值,这也就是多进程之间无法共享内存的原因。
二、进程间直接的数据共享 虽然进程间数据独立,但可以通过Manager和Value实现数据共享
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 from multiprocessing import Process, Managerdef func (lst ): lst[0 ] = "aaa" print ("子进程中的列表" , lst) if __name__ == '__main__' : m = Manager() lst = m.list ([11 , 22 , 33 ]) print ("父进程中一开始的列表" , lst) p = Process(target=func, args=(lst,)) p.start() p.join() print ("父进程中后来的列表" , lst)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 from multiprocessing import Process, Valuedef func (num ): num.value -= 1 print ("子进程中的num值" , num.value) if __name__ == '__main__' : num = Value("i" , 100 ) print ("父进程中一开始的num" , num.value) p = Process(target=func, args=(num,)) p.start() p.join() print ("父进程中后来的num" , num.value)
除了直接实现数据共享外,multiprocessing 中可以通过以下机制来实现进程间通信(IPC,inter process Communication)
三、锁机制 多个进程同时共享内存时会出现数据错乱的问题,例如如果多个进程同时向一个文件中写入,可能会出现一个进程刚写完还未来得及将写好的内容保存,另外一个进程已经将文件关闭了,此时第一个进程所写入的内容就丢失了。因此,我们可以给这个文件上一把锁,如果当前有进程正在操作文件,那么就不允许其他的进程再来操作这个文件了,只能等当前的进程操作结束后,再把锁打开,其他进程才能来使用文件。重复上述步骤,就不会出现数据错乱丢失的问题了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 from multiprocessing import Lock l = Lock() l.acquire() l.release() from multiprocessing import Lock l = Lock() l.acquire()print ("aaaa" ) l.acquire()print ("bbbb" )
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 from multiprocessing import Lock, Processimport osdef hand_file (l ): l.acquire() with open (file="a.txt" , mode="a+" , encoding="utf-8" ) as f: f.write(f"我是子进程{os.getpid()} " + "\n" ) l.release() if __name__ == '__main__' : l = Lock() for i in range (5 ): p = Process(target=hand_file, args=(l,)) p.start()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 from multiprocessing import Lock, Process, Valueimport timedef get_money (l, money ): l.acquire() for i in range (6 ): money.value -= 100 time.sleep(0.5 ) l.release() def save_money (l, money ): l.acquire() for i in range (3 ): money.value += 100 time.sleep(0.5 ) l.release() if __name__ == '__main__' : money = Value("i" , 1000 ) l = Lock() p1 = Process(target=get_money, args=(l, money)) p1.start() p2 = Process(target=save_money, args=(l, money)) p2.start() p1.join() p2.join() print (money.value)
进程中还会存在死锁 情况,需要借助递归锁 来解决,由于死锁和递归锁原理和多线程相同,此处在多线程中再详细介绍。
四、信号量机制 信号量机制相当于是一把锁配好几把钥匙。信号量机制比锁机制多了一个计数器,这个计数器是用来记录当前剩余几把钥匙的。对于计数器来说,每acquire一次,计数器内部就减1,release一次,计数器就加1,当计数器为0时,表示没有钥匙了,此时acquire()处于阻塞,直到有钥匙归还才解除阻塞。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 from multiprocessing import Semaphore sem = Semaphore(5 ) sem.acquire() print (1111 ) sem.acquire() print (2222 ) sem.acquire() print (3333 ) sem.acquire() print (4444 ) sem.acquire() print (5555 ) sem.acquire() print (6666 )
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 from multiprocessing import Semaphore, Processimport random, timedef eat (i, sem ): sem.acquire() print (f"第{str (i)} 位客人准备吃饭了" ) time.sleep(random.uniform(1 , 5 )) print (f"第{str (i)} 位客人吃完了" ) sem.release() if __name__ == '__main__' : sem = Semaphore(5 ) for i in range (20 ): p = Process(target=eat, args=(i, sem)) p.start()
五、事件机制 事件机制 Event 对象主要有以下几种方法:
is_set():is_set 的值是布尔类型,事件机制通过该值来判断 wait 方法是否应该处于阻塞状态,is_set 的值为 True,则 wait 处于非阻塞状态,否则处于阻塞状态
wait():判断is_set的值,如果为True,则非阻塞;值为False,则阻塞
set():将is_set设为True
clear():将is_set设为False
1 2 3 4 5 6 7 8 9 10 11 12 from multiprocessing import Process, Event e = Event() print (e.is_set()) e.set ()print (e.is_set()) e.wait()print ("aaaa" ) e.clear()print (e.is_set()) e.wait()print ("bbbb" )
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 from multiprocessing import Process, Eventimport timedef traffic_light (e ): while True : if e.is_set(): time.sleep(5 ) print ("红灯亮了" ) e.clear() else : time.sleep(3 ) print ("绿灯亮了" ) e.set () def Car (e, i ): e.wait() print (f"第{str (i)} 辆车通过" )if __name__ == '__main__' : e = Event() light = Process(target=traffic_light, args=(e,)) light.start() for i in range (50 ): if i % 3 == 0 : time.sleep(5 ) p = Process(target=Car, args=(e, i)) p.start()
六、队列
队列:先进先出(FIFO,First In First Out)
栈:先进后出(FILO,First In Last Out)
创建共享的进程队列,Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递,队列在进程间通信是安全的 。队列有以下常用方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 q .get ( [ block [ ,timeout ] ] ) 返回q中的一个项目。如果q为空,此方法将阻塞,直到队列中有项目可用为止。block用于控制阻塞行为,默认为True. 如果设置为False,将引发Queue.Empty异常(定义在Queue模块中)。timeout是可选超时时间,用在阻塞模式中。如果在指定的时间间隔内没有项目可用,将引发Queue.Empty异常。q .get_nowait ( ) 同q .get (False)方法。q .put (item [, block [,timeout ] ] ) 将item放入队列。如果队列已满,此方法将阻塞至有空间可用为止。block控制阻塞行为,默认为True。如果设置为False,将引发Queue.Full异常(定义在Queue库模块中)。timeout指定在阻塞模式中等待可用空间的时间长短。超时后将引发Queue.Full异常。q .qsize () 返回队列中目前项目的正确数量。此函数的结果并不可靠,因为在返回结果和在稍后程序中使用结果之间,队列中可能添加或删除了项目。在某些系统上,此方法可能引发NotImplementedError异常。q .empty () 如果调用此方法时 q为空,返回True。如果其他进程或线程正在往队列中添加项目,结果是不可靠的。也就是说,在返回和使用结果之间,队列中可能已经加入新的项目。q .full () 如果q已满,返回为True. 由于线程的存在,结果也可能是不可靠的(参考q .empty ()方法)q .close () 关闭队列,防止队列中加入更多数据。调用此方法时,后台线程将继续写入那些已入队列但尚未写入的数据,但将在此方法完成时马上关闭。如果q被垃圾收集,将自动调用此方法。关闭队列不会在队列使用者中生成任何类型的数据结束信号或异常。例如,如果某个使用者正被阻塞在get()操作上,关闭生产者中的队列不会导致get()方法返回错误。q .cancel_join_thread () 不会再进程退出时自动连接后台线程。这可以防止join_thread ()方法阻塞。q .join_thread () 连接队列的后台线程。此方法用于在调用q .close ()方法后,等待所有队列项被消耗。默认情况下,此方法由不是q的原始创建者的所有进程调用。调用q .cancel_join_thread ()方法可以禁止这种行为。
1 2 3 4 5 6 7 8 9 10 from multiprocessing import Queue q = Queue(3 ) q.put(123 ) q.put("abc" ) q.put([1 , 2 , 3 ])print ("到此为止队列已经存满了" ) q.put(236 , False )
1 2 3 4 5 6 7 8 9 10 11 12 13 14 from multiprocessing import Queue q = Queue(3 ) q.put(123 ) q.put("abc" ) q.put([1 , 2 , 3 ])print (q.get()) print (q.get()) print (q.get()) print ("此时队列已经全部取完了" )print (q.get(False ))
1 2 3 4 5 6 7 8 9 from multiprocessing import Queue q = Queue(3 ) q.put(123 ) q.put("abc" ) q.put([1 , 2 , 3 ])print ("到此为止队列已经存满了" ) q.put_nowait(236 )
1 2 3 4 5 6 7 8 9 10 11 12 13 from multiprocessing import Queue q = Queue(3 ) q.put(123 ) q.put("abc" ) q.put([1 , 2 , 3 ])print (q.get()) print (q.get()) print (q.get()) print ("此时队列已经全部取完了" )print (q.get_nowait())
1 2 3 4 5 6 7 8 9 10 11 from multiprocessing import Queue,Processdef func (q ): q.put('我是四川的' )if __name__ == '__main__' : q = Queue(5 ) p = Process(target=func,args=(q,)) p.start() print (q.get())
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 from multiprocessing import Process, Queue,freeze_supportimport randomimport osdef put_func (q ): info = str (os.getpid()) + '\t:\t' + str (random.randint(0 , 100 )) q.put(info)def get_func (q ): print ('%s 获取到数据 :\033[33m; %s \033[0m' % (os.getpid(), q.get()))if __name__ == '__main__' : q = Queue(5 ) l_put = [] l_get = [] for i in range (10 ): p_put = Process(target=put_func, args=(q,)) p_put.start() l_put.append(p_put) for i in range (10 ): p_get = Process(target=get_func, args=(q,)) p_get.start() l_put.append(p_get) [i.join() for i in l_put] [i.join() for i in l_get]
七、JoinableQueue 可连接的队列 JoinableQueue常用于解决生产者消费者模型问题,它是继承Queue的,所以可以使用Queue中的方法并且JoinableQueue又多了两个方法:
q.join() 用于生产者。该方法会等待 q.task_done的返回结果,通过返回结果,生产者就能获得消费者当前消费了多少个数据
q.task_done() 用于消费者,是指每消费队列中一个数据,就给join返回一个标识。
假设生产者生产了100个数据,join就先记录下100这个数据。每次消费者消费一个数据,task_done就会返回一个标识,当生产者(join)接收到100个消费者返回来的标识的时候,生产者就能知道消费者已经把所有数据都消费完了。
八、管道 管道是用于多进程之间通信的一种方式。管道是不安全的。管道在实例化对象时,会产生两个端口,即 con1,con2 = Pipe()
在单进程中使用管道,如果con1负责接收数据,con2就负责发送数据;如果con2负责接收数据,con1就负责发送数据
1 2 3 4 5 6 7 8 9 from multiprocessing import Pipe con1, con2 = Pipe() con1.send("abc" )print (con2.recv()) con2.send(123 )print (con1.recv())
在多进程中使用管道:如果父进程使用con1收,那么子进程就必须使用con2发; 如果父进程使用con1发,那么子进程就必须使用con2收; 如果父进程使用con2收,那么子进程就必须使用con1发; 如果父进程使用con2发,那么子进程就必须使用con1收;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 from multiprocessing import Process, Pipedef func (con1, con2 ): con1, con2 = con1, con2 con1.close() while True : try : print (con2.recv()) except : con2.close() break if __name__ == '__main__' : con1, con2 = Pipe() p = Process(target=func, args=(con1, con2)) p.start() con2.close() for i in range (10 ): con1.send(i) con1.close()
生产者消费者模型 在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题。该模式通过平衡生产进程和消费进程的工作能力来提高程序的整体处理数据的速度。举个应用栗子:全栈开发时候,前端接收客户请求,后端处理请求逻辑。当某时刻客户请求过于多的时候,后端处理不过来,此时完全可以借助队列来辅助,将客户请求放入队列中,后端逻辑代码处理完一批客户请求后马上从队列中继续获取,这样平衡两端的效率。
为什么要使用生产者和消费者模式 在进程世界里,生产者就是生产数据的进程,消费者就是消费数据的进程。在多进程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。
什么是生产者消费者模式 生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。
基于队列实现的生产者消费者模型 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 from multiprocessing import Process, Queuedef consumer (q ): while True : info = q.get() print ("我吃到了" + info)def productor (q ): for i in range (20 ): info = f"包子{str (i+1 )} 号" q.put(info)if __name__ == '__main__' : q = Queue(5 ) pro_con = Process(target=consumer, args=(q,)) pro_con.start() pro_pro = Process(target=productor, args=(q,)) pro_pro.start()
第一版代码中存在这样一个问题,生产者生产完数据后正常结束代码了,但是消费者会不断的从队列中取数据,如果获取不到,get 方法就会阻塞,导致整个程序无法结束,所以我们要修改一下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 from multiprocessing import Process, Queueimport timedef consumer (q ): while True : try : info = q.get_nowait() print ("我吃到了" + info) except : print ("包子取完了" ) break def productor (q ): for i in range (20 ): time.sleep(0.5 ) info = f"包子{str (i + 1 )} 号" print (info) q.put(info)if __name__ == '__main__' : q = Queue(20 ) pro_con = Process(target=consumer, args=(q,)) pro_pro = Process(target=productor, args=(q,)) pro_con.start() pro_pro.start()
第二版在消费者函数中加了一个判断,如果不能取到数据,就退出。但是这个逻辑是有问题的,如果生产者的速度比较慢,数据还没来及放到队列中时,消费者已经去取了,发现没取到就判断为生产者已经全部生产完了,自己就退出了。而实际上生产者还在不断的生产数据,所以为了保险起见,生产者在生产完所有的数据后,可以在队列中加一个结束的标识,告诉消费者我已经生产结束了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 from multiprocessing import Process, Queuedef consumer (q ): while True : info = q.get() if info is None : print ("包子取完了" ) break else : print ("我吃到了" + info)def productor (q ): for i in range (20 ): info = f"包子{str (i + 1 )} 号" q.put(info) q.put(None ) if __name__ == '__main__' : q = Queue(5 ) pro_con = Process(target=consumer, args=(q,)) pro_pro = Process(target=productor, args=(q,)) pro_con.start() pro_pro.start()
这次可以完美实现了,但是这是只有一个生产者和一个消费者的情况,如果有一个生产者和多个消费者,生产者放入的结束标识被某一个消费者拿到后,后面几个消费者就没有办法再次拿到了,所以还是会阻塞,因此,有多少个消费者就应该有多少个结束标识,再次基础上我们再来优化一下代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 from multiprocessing import Process, Queuedef consumer (q ): while True : info = q.get() if info is None : break else : print ("我吃到了" + info)def productor (q ): for i in range (20 ): info = f"包子{str (i + 1 )} 号" q.put(info)if __name__ == '__main__' : q = Queue(5 ) pro_con1 = Process(target=consumer, args=(q,)) pro_con2 = Process(target=consumer, args=(q,)) pro_con3 = Process(target=consumer, args=(q,)) pro_pro1 = Process(target=productor, args=(q,)) pro_pro2 = Process(target=productor, args=(q,)) pro_con1.start() pro_con2.start() pro_con3.start() pro_pro1.start() pro_pro2.start() pro_pro1.join() pro_pro2.join() q.put(None ) q.put(None ) q.put(None )
基于可连接队列实现的生产者消费者模型
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 from multiprocessing import JoinableQueue, Processdef consumer (q ): while True : info = q.get() print ("我吃到了" + info) q.task_done() def productor (q ): for i in range (20 ): info = f"包子{str (i + 1 )} 号" q.put(info) q.join() if __name__ == '__main__' : q = JoinableQueue(10 ) p_pro = Process(target=productor, args=(q,)) p_con = Process(target=consumer, args=(q,)) p_con.daemon = True p_pro.start() p_con.start() p_pro.join()
实现的逻辑是这样的,首先生产者由于q.join的缘故,要等到消费者的task_done不断返回,等 join 计数变为0的时候,生产者才能结束;主进程由于p_pro.join()的缘故,必须等待生产者结束,自身的代码才能执行结束,而一旦主进程的代码执行结束了,消费者这个守护进程就会跟着结束,因此就不会存在消费者循环阻塞的问题了。
进程池 进程池实际上就是一个池子,它会帮程序员去管理池中的进程。进程池里边有固定数量的进程。这些进程一直处于待命状态,一旦有任务来,马上就有进程去处理。因为在实际业务中,任务量是有多有少的,如果任务量特别的多,不可能要开对应那么多的进程数,其次也不是开的进程越多,效率越高。开启那么多进程首先就需要消耗大量的时间让操作系统来为你管理它。其次还需要消耗大量时间让 CPU 帮你调度它。一边拿来说,进程数量是 CPU 核数 + 1 时,对CPU的利用率是最高的。
一、使用进程池的三个方法 map(func,iterable) 方法
func:进程池中的进程执行的任务函数
iterable:可迭代对象,是把可迭代对象中的每个元素依次传给任务函数当参数
1 2 3 4 5 6 7 8 9 10 11 12 13 from multiprocessing import Pooldef func (num ): num += 1 print (num) return numif __name__ == '__main__' : p = Pool(5 ) res = p.map (func, [i for i in range (100 )]) p.close() p.join() print ('主进程中map的返回值' , res)
apply(func,args=()) 方法
func:进程池中的进程执行的任务函数
args:可迭代对象型的参数,是传给任务函数的参数
该方法为同步方法,即根据进程池中的进程数量接入对应数量的任务,但是进程是一个一个执行的。同步处理任务时,不需要close和join,进程池中的所有进程是普通进程,即主进程需要等待其执行结束。
1 2 3 4 5 6 7 8 9 10 11 12 13 from multiprocessing import Poolimport osdef func (num ): num += 1 return numif __name__ == '__main__' : pool = Pool(os.cpu_count() + 1 ) for i in range (100 ): res = pool.apply(func,args=(i,)) print (res)
apply_async(func,args=(),callback=None) 方法
func:进程池中的进程执行的任务函数
args:可迭代对象型的参数,是传给任务函数的参数
callback:回调函数,就是说每当进程池中有进程处理完任务了,返回的结果可以交给回调函数,由回调函数进行进一步的处理,回调函数只有异步才有,同步是没有的
异步处理任务时,进程池中的所有进程是守护进程(主进程代码执行完毕守护进程就结束),所以异步处理任务时,必须要加上close和join,同步处理不需要
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 from multiprocessing import Poolimport timedef func (num ): num += 1 return numif __name__ == '__main__' : p = Pool(5 ) start = time.time() l = [] for i in range (10000 ): res = p.apply_async(func,args=(i,)) l.append(res) p.close() p.join() print (time.time() - start) [print (i.get()) for i in l]
二、回调函数的使用 进程的任务函数的返回值,被当成回调函数的形参接收到,以此进行进一步的处理操作。回调函数是由主进程调用的,而不是子进程 ,子进程只负责把结果传递给回调函数。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 from multiprocessing import Poolimport requestsimport time,osdef func (url ): res = requests.get(url) print ('子进程的pid:%s,父进程的pid:%s' %(os.getpid(),os.getppid())) if res.status_code == 200 : return url,res.textdef record_result (res ): url,text = res print ('回调函数的pid' , os.getpid()) with open ('a.txt' ,'a' ,encoding='utf-8' ) as f: f.write(url + text) if __name__ == '__main__' : p = Pool(5 ) l = ['https://www.baidu.com' , 'http://www.jd.com' , 'http://www.taobao.com' , 'http://www.mi.com' , 'http://www.cnblogs.com' , 'https://www.bilibili.com' , ] print ('主进程的pid' ,os.getpid()) for i in l: p.apply_async(func, args=(i,),callback=record_result) p.close() p.join()