协程引入 之前我们学习了线程、进程的概念,了解了在操作系统中**进程是资源分配的最小单位,线程是CPU调度的最小单位。**按道理来说我们已经算是把 CPU 的利用率提高很多了。但是我们知道无论是创建多进程还是创建多线程来解决问题,都要消耗一定的时间来创建进程、创建线程、以及管理他们之间的切换。
随着我们对于效率的追求不断提高,基于单线程来实现并发 又成为一个新的课题,即只用一个主线程(很明显可利用的 CPU 只有一个)情况下实现并发。这样就可以节省创建线进程所消耗的时间。
CPU 正在运行一个任务,会在两种情况下切走去执行其他的任务(切换由操作系统强制控制),一种情况是该任务发生了阻塞(IO 请求等),另外一种情况是该任务计算的时间过长(CPU 分配的时间片用完了)。在单线程中,如果存在多个函数,有某个函数发生 IO 操作,我们想让程序马上切换到另一个函数去执行,以此来实现一个假的并发现象,提高 CPU 的利用率。
为此我们需要先回顾下并发的本质:切换+保存状态。我们可以借助 yield 去实现假的并发现象,因为 yield 本身就可以实现保存状态的作用。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 import timedef func (): print (111 ) num = 100 print (222 ) yield num print (333 ) yield num + 1 print (444 ) yield num + 2 def fff (): g = func() print (g.__next__()) print ("aaa" ) time.sleep(1 ) print (g.__next__()) print ("bbb" ) time.sleep(2 ) print (g.__next__()) print ("ccc" )if __name__ == '__main__' : fff()
每一次使用 next 取值,都是接着上一次的 yield 的位置继续往后执行,并没有重新从头开始,所以在两个函数的切换过程中,会保留原来的执行状态。那么我们就尝试使用 yield 去实现一个单线程内的伪并发,并和普通的循环实现对比效率。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 import timedef consumer (l ): for i in l: print ("吃了包子%s" % i)def producer (): l = [] for i in range (100000000 ): l.append("包子%s" % (i + 1 )) return l start = time.time() l = producer() consumer(l)print (time.time() - start) def consumer (): while True : x = yield print ("吃了包子%s" % x)def producer (): g = consumer() next (g) for i in range (100000000 ): g.send("包子%s" % (i + 1 )) start = time.time() producer()print ('yield:' , time.time() - start)
测试可得当生产者的生产数量越大,yield 的好处就越明显。但是,yield 只能实现单纯的切换函数和保存函数状态的功能,不能实现当某一个函数遇到IO阻塞时,自动的切换到另一个函数去执行。如果只是拿yield去单纯的实现一个切换的现象,你会发现根本没有程序串行执行效率高。在Python中,有专门的实现此类功能的机制,我们称之未协程机制。协程的本质还是主要依靠于yield去实现的。
协程介绍 协程是一个比线程更加轻量级的单位,是组成线程的各个函数,是单线程下的并发,又称微线程,纤程,协程本身没有实体。协程的本质是在单线程下,由用户自己控制一个任务遇到IO阻塞了就切换另外一个任务去执行,以此来提升效率。所以协程必须具备以下的要求:
协程具有以下特点:
必须在只有一个单线程里实现并发
修改共享数据不需加锁
用户程序里自己保存多个控制流的上下文栈
一个协程遇到IO操作自动切换到其它协程(如何实现检测IO,yield、greenlet 都无法实现,就用到了gevent 模块(select机制))
协程的使用 一、greenlet 1 2 3 4 5 6 7 # greenlet 只是可以实现一个简单的切换功能,还是不能做到遇到IO就切换 # g1 = greenlet(func ) 实例化一个对象,func 为要执行的函数 # g1.switch() 用这种方式去调用func 函数,func 需要的参数也在这里传入,只需要在第一次调用的时候传入一次即可 # 当使用switch调用func 的时候,什么时候func 会停止运行? # 1 要么return # 2 要么在func 内部又遇到 switch
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 from greenlet import greenletimport timedef eat (name ): print ("%s在吃炸鸡" % name) time.sleep(2 ) f2.switch("tr" ) print ("%s在吃烤肉" % name) f2.switch()def drink (name ): print ('%s喝啤酒' % name) f1.switch() print ('%s喝可乐' % name)if __name__ == '__main__' : f1 = greenlet(eat) f2 = greenlet(drink) f1.switch("cdc" )
和 yield 一样,虽然实现了函数间的切换,但是无法做到遇到IO阻塞自动切换,还是会等待IO结束再顺序执行下去。
二、gevent 1 2 3 4 5 6 7 8 # gevent 可以实现 当函数中遇到io操作时,就自动的切换到另一个函数 # g1 = gevent.spawn(func ,参数) # gevent.join(g1) 等待g1指向的任务执行结束 # gevent.joinall([g1,g2,g3,g4]) 等待g1,g2,g3,g4指向的多个任务执行结束 # func 停止的原因: # 1 func 执行完了 # 2 遇到IO操作了
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 import geventimport timedef func1 (): print ("1 2 3 4" ) gevent.sleep(1 ) print ("3 2 3 4" ) gevent.sleep(1 )def func2 (): print ("2 2 3 4" ) gevent.sleep(1 ) print ("再来一次" )if __name__ == '__main__' : g1 = gevent.spawn(func1) g2 = gevent.spawn(func2) g1.join()
gevent 只能识别特定的 IO 操作,我们可以使用如下方式,让 gevent 能识别大多数的 IO 操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 import geventfrom gevent import monkey monkey.patch_all() import timedef func1 (): print ("1 2 3 4" ) time.sleep(1 ) print ("3 2 3 4" ) time.sleep(1 )def func2 (): print ("2 2 3 4" ) time.sleep(1 ) print ("再来一次" )if __name__ == '__main__' : g1 = gevent.spawn(func1) g2 = gevent.spawn(func2) g1.join()
串行和并发的效率比较 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 import geventfrom gevent import monkey monkey.patch_all()import timedef func (num ): time.sleep(1 ) print (num) start_1 = time.time()for i in range (10 ): func(i)print ("串行执行时间为:%s" % (time.time() - start_1)) start_2 = time.time() l = list ()for i in range (10 ): l.append(gevent.spawn(func, i)) gevent.joinall(l)print ("并行执行时间为:%s" % (time.time() - start_2))
串行花了十秒多,并行只花了一秒多,差距还是比较明显的。
爬虫效率对比 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 import geventfrom gevent import monkey monkey.patch_all()import timeimport requestsdef get_response (url ): res = requests.get(url) print (url, res.status_code, len (res.text))def sync_func (url_l ): for url in url_l: get_response(url)def async_func (url_l ): tasks = list () for url in url_l: tasks.append(gevent.spawn(get_response, url)) gevent.joinall(tasks)if __name__ == '__main__' : urls = ['http://www.baidu.com' , 'http://www.jd.com' , 'http://www.taobao.com' , 'http://www.qq.com' , 'http://www.mi.com' , 'http://www.cnblogs.com' ] start_sync = time.time() sync_func(urls) print ("串行执行任务的时间:%s" % (time.time() - start_sync)) start_async = time.time() async_func(urls) print ("串行执行任务的时间:%s" % (time.time() - start_async))