if __name__ == '__main__': t = MyThread() t.start()
线程和进程的对比
时间方面,开启多个线程的速度比进程快
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
deffunc(): pass
if __name__ == '__main__': p_start = time.time() for i inrange(1000): p = Process(target=func, args=()) p.start() print("开启1000个进程的时间:%s" % (time.time() - p_start))
t_start = time.time() for i inrange(1000): t = Thread(target=func, args=()) t.start() print("开启1000个线程的时间:%s" % (time.time() - t_start))
if __name__ == '__main__': print("我是 mian,我的pid是 %s" % os.getpid()) for i inrange(3): p = Process(target=func, args=("子进程",)) p.start() for i inrange(3): t = Thread(target=func, args=("子线程",)) t.start()
操作全局变量方面:进程共享全局变量需要通过 Value 或者 Manger 以及锁机制的配合使用,线程可以直接共享全局变量
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
from threading import Thread
deffunc(): global num num -= 1
if __name__ == '__main__': num = 100 t_l = [] for i inrange(10): t = Thread(target=func, args=()) t.start() t_l.append(t) [t.join() for t in t_l] print(num)
共享全局变量引申:GIL 的影响
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
from threading import Thread import time
deffunc(): global num tmp = num time.sleep(0.1) num = tmp - 1
if __name__ == '__main__': num = 100 t_l = [] for i inrange(10): t = Thread(target=func, args=()) t.start() t_l.append(t) [t.join() for t in t_l] print(num) # 99
此时得到的结果并不是 90,这是由于 GIL 锁机制导致的。我们具体来分析一下原因:
当第一个线程创建并启动后,执行到 time.sleep() 时会阻塞睡眠等待,由于 GIL 在同一时间只允许有一个线程访问 CPU,且一般给线程执行的时间片只有5毫秒,所以在线程1睡眠阻塞的时候,会把线程1 踢出执行队列,去执行线程2;所有的线程都重复上述步骤,因此所有的线程都只执行到 sleep,后面的 num = tmp - 1 操作都未执行到。当10个线程都阻塞睡眠结束之后,GIL再重新逐个接入所有的线程继续向下执行,此时每个线程的 tmp 都还是原来的 100,并没有进行所谓的递减操作。
deffunc(l): global num l.acquire() tmp = num time.sleep(0.1) num = tmp - 1 l.release()
if __name__ == '__main__': num = 100 l = Lock() t_l = [] for i inrange(10): t = Thread(target=func, args=(l,)) t_l.append(t) t.start() [t.join() for t in t_l] print(num)
if __name__ == '__main__': e = Event() t = Thread(target=check_mysql, args=(e,)) t.start() for i inrange(10): # 产生10个线程都去尝试连接数据库 t1 = Thread(target=conn_mysql, args=(e, i)) t1.start()
if __name__ == '__main__': con = Condition() for i inrange(10): t = Thread(target=func, args=(con, i)) t.start() while1: num = int(input('>>>')) con.acquire() con.notify(num) # 手动输入制造几把钥匙,相当于发一个信号,允许几个线程可以执行了 con.release()
#1 介绍 concurrent.futures模块提供了高度封装的异步调用接口 ThreadPoolExecutor:线程池,提供异步调用 ProcessPoolExecutor: 进程池,提供异步调用 Both implement the same interface, which is defined by the abstract Executor class.
deffunc(n): sum = 0 for i inrange(n + 1): sum += i ** 2 returnsum
if __name__ == '__main__': pp = ThreadPoolExecutor(5) # 线程池中允许有5个线程同时工作 pp_l= list() for i inrange(15): s = pp.submit(func, i) pp_l.append(s) pp.shutdown()
[print(s.result()) for s in pp_l] # 取出返回值
三、进程池的使用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
from concurrent.futures import ProcessPoolExecutor
deffunc(n): sum = 0 for i inrange(n + 1): sum += i ** 2 returnsum
if __name__ == '__main__': pp = ProcessPoolExecutor(5) # 线程池中允许有5个进程同时工作 pp_l = list() for i inrange(15): s = pp.submit(func, i) pp_l.append(s) pp.shutdown()
# 下列代码是用for + submit提交多个任务的方式,对应拿结果的方法是result # res_l = [] # for i in range(1000): # re = t.submit(func,i) # res_l.append(re) # # t.shutdown() # [print(i.result()) for i in res_l] # 在Pool进程池中拿结果,是用get方法。 在ThreadPoolExecutor里边拿结果是用result方法
if __name__ == '__main__': print(os.getpid()) t = ProcessPoolExecutor(20) for i inrange(1000): t.submit(func, i).add_done_callback(call_back_fun) t.shutdown()
from threading import Thread from threading import current_thread from concurrent.futures import ThreadPoolExecutor import time deffunc(i): sum = 0 sum += i time.sleep(1) print('这是在子线程中',current_thread()) returnsum
if __name__ == '__main__': t = ThreadPoolExecutor(5) for i inrange(10): t.submit(func,i).add_done_callback(call_back) t.shutdown() print('这是在主线程中',current_thread())