Python 协程开发

协程引入

​ 之前我们学习了线程、进程的概念,了解了在操作系统中**进程是资源分配的最小单位,线程是CPU调度的最小单位。**按道理来说我们已经算是把 CPU 的利用率提高很多了。但是我们知道无论是创建多进程还是创建多线程来解决问题,都要消耗一定的时间来创建进程、创建线程、以及管理他们之间的切换。

  随着我们对于效率的追求不断提高,基于单线程来实现并发又成为一个新的课题,即只用一个主线程(很明显可利用的 CPU 只有一个)情况下实现并发。这样就可以节省创建线进程所消耗的时间。

​ CPU 正在运行一个任务,会在两种情况下切走去执行其他的任务(切换由操作系统强制控制),一种情况是该任务发生了阻塞(IO 请求等),另外一种情况是该任务计算的时间过长(CPU 分配的时间片用完了)。在单线程中,如果存在多个函数,有某个函数发生 IO 操作,我们想让程序马上切换到另一个函数去执行,以此来实现一个假的并发现象,提高 CPU 的利用率。

​ 为此我们需要先回顾下并发的本质:切换+保存状态。我们可以借助 yield 去实现假的并发现象,因为 yield 本身就可以实现保存状态的作用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import time

def func():
print(111)
num = 100
print(222)
yield num
print(333)
yield num + 1
print(444)
yield num + 2


def fff():
g = func() # 并不会执行func函数,只是得到一个生成器对象
print(g.__next__()) # 真正开始执行 func 函数
print("aaa")
time.sleep(1)
print(g.__next__())
print("bbb")
time.sleep(2)
print(g.__next__())
print("ccc")


if __name__ == '__main__':
fff()

每一次使用 next 取值,都是接着上一次的 yield 的位置继续往后执行,并没有重新从头开始,所以在两个函数的切换过程中,会保留原来的执行状态。那么我们就尝试使用 yield 去实现一个单线程内的伪并发,并和普通的循环实现对比效率。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import time

# 普通for循环
def consumer(l):
for i in l:
print("吃了包子%s" % i)

def producer():
l = []
for i in range(100000000):
l.append("包子%s" % (i + 1))
return l


start = time.time()
l = producer()
consumer(l)
print(time.time() - start)


# 使用yield实现伪并发
def consumer():
while True:
x = yield
print("吃了包子%s" % x)

def producer():
g = consumer()
next(g)
for i in range(100000000):
g.send("包子%s" % (i + 1))


start = time.time()
producer()
print('yield:', time.time() - start)

​ 测试可得当生产者的生产数量越大,yield 的好处就越明显。但是,yield 只能实现单纯的切换函数和保存函数状态的功能,不能实现当某一个函数遇到IO阻塞时,自动的切换到另一个函数去执行。如果只是拿yield去单纯的实现一个切换的现象,你会发现根本没有程序串行执行效率高。在Python中,有专门的实现此类功能的机制,我们称之未协程机制。协程的本质还是主要依靠于yield去实现的。

协程介绍

​ 协程是一个比线程更加轻量级的单位,是组成线程的各个函数,是单线程下的并发,又称微线程,纤程,协程本身没有实体。协程的本质是在单线程下,由用户自己控制一个任务遇到IO阻塞了就切换另外一个任务去执行,以此来提升效率。所以协程必须具备以下的要求:

  • 可以控制多个任务之间的切换,切换之前将任务的状态保存下来,以便重新运行时,可以基于暂停的位置继续执行。(切换+保存状态)

  • 作为上述条件的补充,协程还可以检测IO操作,在遇到IO操作的情况下才发生切换。

协程具有以下特点:

  1. 必须在只有一个单线程里实现并发
  2. 修改共享数据不需加锁
  3. 用户程序里自己保存多个控制流的上下文栈
  4. 一个协程遇到IO操作自动切换到其它协程(如何实现检测IO,yield、greenlet 都无法实现,就用到了gevent 模块(select机制))

协程的使用

一、greenlet

1
2
3
4
5
6
7
# greenlet 只是可以实现一个简单的切换功能,还是不能做到遇到IO就切换
# g1 = greenlet(func) 实例化一个对象,func为要执行的函数
# g1.switch() 用这种方式去调用func函数,func需要的参数也在这里传入,只需要在第一次调用的时候传入一次即可

# 当使用switch调用func的时候,什么时候func会停止运行?
# 1 要么return
# 2 要么在func内部又遇到 switch
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
from greenlet import greenlet
import time


def eat(name):
print("%s在吃炸鸡" % name)
time.sleep(2)
f2.switch("tr")
print("%s在吃烤肉" % name)
f2.switch()


def drink(name):
print('%s喝啤酒' % name)
f1.switch()
print('%s喝可乐' % name)


if __name__ == '__main__':
f1 = greenlet(eat)
f2 = greenlet(drink)
f1.switch("cdc")

和 yield 一样,虽然实现了函数间的切换,但是无法做到遇到IO阻塞自动切换,还是会等待IO结束再顺序执行下去。

二、gevent

1
2
3
4
5
6
7
8
# gevent 可以实现  当函数中遇到io操作时,就自动的切换到另一个函数
# g1 = gevent.spawn(func,参数)
# gevent.join(g1) 等待g1指向的任务执行结束
# gevent.joinall([g1,g2,g3,g4]) 等待g1,g2,g3,g4指向的多个任务执行结束

# func停止的原因:
# 1 func执行完了
# 2 遇到IO操作了
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import gevent
import time


def func1():
print("1 2 3 4")
gevent.sleep(1)
# time.sleep(1) # gevent只能识别特定的IO操作,time.sleep无法识别
print("3 2 3 4")
gevent.sleep(1)


def func2():
print("2 2 3 4")
gevent.sleep(1)
print("再来一次")


if __name__ == '__main__':
g1 = gevent.spawn(func1)
g2 = gevent.spawn(func2)
g1.join() # 等待g1指向的任务执行结束

gevent 只能识别特定的 IO 操作,我们可以使用如下方式,让 gevent 能识别大多数的 IO 操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import gevent
from gevent import monkey
monkey.patch_all() # 让gevent 可以识别大多数的IO操作
import time


def func1():
print("1 2 3 4")
# gevent.sleep(1)
time.sleep(1)
print("3 2 3 4")
# gevent.sleep(1)
time.sleep(1)


def func2():
print("2 2 3 4")
# gevent.sleep(1)
time.sleep(1)
print("再来一次")


if __name__ == '__main__':
g1 = gevent.spawn(func1)
g2 = gevent.spawn(func2)
g1.join() # 等待g1指向的任务执行结束

串行和并发的效率比较

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import gevent
from gevent import monkey
monkey.patch_all()
import time

def func(num):
time.sleep(1)
print(num)


# 串行
start_1 = time.time()
for i in range(10):
func(i)
print("串行执行时间为:%s" % (time.time() - start_1))

# 并行
start_2 = time.time()
l = list()
for i in range(10):
l.append(gevent.spawn(func, i))
gevent.joinall(l)
print("并行执行时间为:%s" % (time.time() - start_2))

串行花了十秒多,并行只花了一秒多,差距还是比较明显的。

爬虫效率对比

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import gevent
from gevent import monkey
monkey.patch_all()
import time
import requests


# 任务函数
def get_response(url):
res = requests.get(url)
print(url, res.status_code, len(res.text))


# 串行去爬取(同步调用)
def sync_func(url_l):
for url in url_l:
get_response(url)


# 并行去爬取,异步调用
def async_func(url_l):
tasks = list()
for url in url_l:
# 使用gevent协程去并发实现执行任务函数,当遇见请求某个网页发生比较大的网络延迟(IO),马上会切换到其他的任务函数
tasks.append(gevent.spawn(get_response, url))
gevent.joinall(tasks)


if __name__ == '__main__':
urls = ['http://www.baidu.com',
'http://www.jd.com',
'http://www.taobao.com',
'http://www.qq.com',
'http://www.mi.com',
'http://www.cnblogs.com']

start_sync = time.time()
sync_func(urls)
print("串行执行任务的时间:%s" % (time.time() - start_sync))

start_async = time.time()
async_func(urls)
print("串行执行任务的时间:%s" % (time.time() - start_async))

Python 协程开发
https://clark-cdc.github.io/2019/05/22/0017-协程/
作者
clark
发布于
2019年5月22日
许可协议