Python IO多路复用

IO 模型

  • blocking IO 阻塞IO
  • nonblocking IO 非阻塞IO
  • IO multiplexing IO多路复用
  • signal driven IO 信号驱动IO(在实际中并不常用)
  • asynchronous IO 异步IO(python实现不了,但是有tornado框架天生自带异步)

IO发生时涉及的对象和步骤举例:

对于一个network IO (这里我们以read举例),它会涉及到两个系统对象,一个是调用这个IO的process (or thread),另一个就是系统内核(kernel)。当一个read操作发生时,该操作会经历两个阶段:

  1. 等待数据准备
  2. 将数据从内核拷贝到进程中

以上提及的 IO 模型的区别就是在这两个阶段上各有不同的情况。

阻塞IO

在linux中,默认情况下所有的socket都是blocking,一个典型的读操作流程大概是这样:

​ 当用户进程调用了recvfrom这个系统调用,kernel就开始了IO的第一个阶段:准备数据。对于network io来说,很多时候数据在一开始还没有到达(比如,还没有收到一个完整的UDP包),这个时候kernel就要等待足够的数据到来。

​ 而在用户进程这边,整个进程会被阻塞。当kernel一直等到数据准备好了,它就会将数据从kernel中拷贝到用户内存,然后kernel返回结果,用户进程才解除block的状态,重新运行起来。所以,blocking IO的特点就是在IO执行的两个阶段(等待数据和拷贝数据两个阶段)都被block了。

​ 几乎所有的程序员第一次接触到的网络编程都是从listen()、send()、recv() 等接口开始的,使用这些接口可以很方便的构建服务器/客户机的模型。然而大部分的socket接口都是阻塞型的。实际上,除非特别指定,几乎所有的IO接口 ( 包括socket接口 ) 都是阻塞型的。这给网络编程带来了一个很大的问题,如在调用recv(1024)的同时,线程将被阻塞,在此期间,线程将无法执行任何运算或响应任何的网络请求。

我们可以使用以下方式解决阻塞IO问题,但是都有缺陷:

1
# 在服务器端使用多线程(或多进程)。多线程(或多进程)的目的是让每个连接都拥有独立的线程(或进程),这样任何一个连接的阻塞都不会影响其他的连接。

该方案的问题是:

1
# 开启多进程或都线程的方式,在遇到要同时响应成百上千路的连接请求,则无论多线程还是多进程都会严重占据系统资源,降低系统对外界响应效率,而且线程与进程本身也更容易进入假死状态。

改进方案:

1
# 很多程序员可能会考虑使用“线程池”或“连接池”。“线程池”旨在减少创建和销毁线程的频率,其维持一定合理数量的线程,并让空闲的线程重新承担新的执行任务。“连接池”维持连接的缓存池,尽量重用已有的连接、减少创建和关闭连接的频率。这两种技术都可以很好的降低系统开销,都被广泛应用很多大型系统,如websphere、tomcat和各种数据库等。

改进后方案其实也存在着问题:

1
# “线程池”和“连接池”技术也只是在一定程度上缓解了频繁调用IO接口带来的资源占用。而且,所谓“池”始终有其上限,当请求大大超过上限时,“池”构成的系统对外界的响应并不比没有池的时候效果好多少。所以使用“池”必须考虑其面临的响应规模,并根据响应规模调整“池”的大小。

对应上例中的所面临的可能同时出现的上千甚至上万次的客户端请求,“线程池”或“连接池”或许可以缓解部分压力,但是不能解决所有问题。总之,多线程模型可以方便高效的解决小规模的服务请求,但面对大规模的服务请求,多线程模型也会遇到瓶颈,可以用非阻塞接口来尝试解决这个问题。

非阻塞IO

可以通过设置socket使其变为non-blocking。当对一个non-blocking socket执行读操作时,流程是这个样子:

​ 当用户进程发出read操作时,如果kernel中的数据还没有准备好,那么它并不会block用户进程,而是立刻返回一个error。从用户进程角度讲 ,它发起一个read操作后,并不需要等待,而是马上就得到了一个结果。用户进程判断结果是一个error时,它就知道数据还没有准备好,于是用户就可以在本次到下次再发起read询问的时间间隔内做其他事情,或者直接再次发送read操作。一旦kernel中的数据准备好了,并且又再次收到了用户进程的system call,那么它马上就将数据拷贝到了用户内存(这一阶段仍然是阻塞的),然后返回。

		也就是说非阻塞的recvform系统调用调用之后,进程并没有被阻塞,内核马上返回给进程,如果数据还没准备好,此时会返回一个error。进程在返回之后,可以干点别的事情,然后再发起recvform系统调用。重复上面的过程,循环往复的进行recvform系统调用。这个过程通常被称之为轮询。轮询检查内核数据,直到数据准备好,再拷贝数据到进程,进行数据处理。需要注意,拷贝数据整个过程,进程仍然是属于阻塞的状态。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 非阻塞IO版套接字server端示例第一版
import socket

sk = socket.socket()
sk.setblocking(False) # 设置为非阻塞,但是没有客户端连接的情况下会报错
sk.bind(("127.0.0.1", 8080))
sk.listen()

l = list() # 用于保存连接的客户端

while True:
try:
# 如果是阻塞IO模型,没有客户端连接在这里程序会一直等待。
# 如果是非阻塞IO模型,没有客户端连接会报错,所以要做异常处理
conn, addr = sk.accept()
l.append(conn) # 为了方便对多个客户端进行操作,可以将连接上的客户端先统一保存
except BlockingIOError:
continue

这样就不会因为某一个客户端导致阻塞,而无法去和其他客户端进行交互了,下面我们继续对连接上的客户端进行操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# 非阻塞IO版套接字示例第二版
import socket

sk = socket.socket()
sk.setblocking(False) # 设置为非阻塞,但是没有客户端连接的情况下会报错
sk.bind(("127.0.0.1", 8080))
sk.listen()

l = list() # 用于保存连接的客户端

while True:
try:
# 如果是阻塞IO模型,没有客户端连接在这里程序会一直等待。
# 如果是非阻塞IO模型,没有客户端连接会报错,所以要做异常处理
conn, addr = sk.accept()
l.append(conn) # 为了方便对多个客户端进行操作,可以将连接上的客户端先统一保存
except BlockingIOError:
for conn in l: # 遍历每一个连接上的客户端,看看有没有数据过来
try:
# 尝试接收,看看客户端有没有数据过来
# 如果客户端没有发送数据,还是会报阻塞错误,需要做异常处理
# 如果客户端强制退出,也会报错,需要进行异常处理
info = conn.recv(1024).decode("utf-8")
except BlockingIOError: # 处理客户端没有数据发送过来
continue
except ConnectionResetError: # 处理客户端强制关闭
continue

对于正常退出的客户端以及正常发送数据的客户端进行处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# 非阻塞IO版套接字示例第三版
import socket

sk = socket.socket()
sk.setblocking(False) # 设置为非阻塞,但是没有客户端连接的情况下会报错
sk.bind(("127.0.0.1", 8080))
sk.listen()

l = list() # 用于保存连接的客户端
del_l = list() # 用于保存没有发送信息要关闭掉的客户端

while True:
try:
# 如果是阻塞IO模型,没有客户端连接在这里程序会一直等待。
# 如果是非阻塞IO模型,没有客户端连接会报错,所以要做异常处理
conn, addr = sk.accept()
l.append(conn) # 为了方便对多个客户端进行操作,可以将连接上的客户端先统一保存
except BlockingIOError:
for conn in l: # 遍历每一个连接上的客户端,看看有没有数据过来
try:
# 尝试接收,看看客户端有没有数据过来
# 如果客户端没有发送数据,还是会报阻塞错误,需要做异常处理
# 如果客户端强制退出,也会报错,需要进行异常处理
info = conn.recv(1024).decode("utf-8")
if not info: # 如果客户端正常执行了close,服务器会接收到一个空
del_l.append(conn) # 将已经结束的客户端的conn,添加到要删除的列表中
print('客户端正常退出了!')
conn.close() # 因为客户端已经主动close,所以服务器端的conn也要close
else:
print(info)
conn.send(info.upper().encode('utf-8')) # 给客户端返回数据
except BlockingIOError: # 处理客户端没有数据发送过来
continue
except ConnectionResetError: # 处理客户端强制关闭
continue
if del_l:
for conn in del_l:
l.remove(conn)
del_l = [] # 在删除完主动关闭的客户端的连接之后,应该把此列表清空,否则报错
1
2
3
4
5
6
7
8
9
10
11
12
# client端
import socket
import time


sk = socket.socket()

sk.connect(('127.0.0.1',8080))
time.sleep(3)
sk.send(b'he')
print(sk.recv(10))
sk.close()

非阻塞IO的优点:能够在等待任务完成的时间里干其他活了(包括提交其他任务,也就是 “后台” 可以有多个任务在“”同时“”执行)。

非阻塞IO的缺点:

  1. 循环调用recv()将大幅度推高CPU占用率;这也是我们在代码中留一句time.sleep(2)的原因,否则在低配主机下极容易出现卡机情况

  2. 任务完成的响应延迟增大了,因为每过一段时间才去轮询一次read操作,而任务可能在两次轮询之间的任意时间完成。这会导致整体数据吞吐量的降低。

所以非阻塞IO是非常不推荐的。

多路复用IO

​ 也称这种IO方式为事件驱动IO(event driven IO)。它的基本原理就是委托select/epoll这个代理function不断的轮询所负责的所有socket,当某个socket有数据到达了,就通知用户进程。select/epoll的好处就在于单个process就可以同时处理多个网络连接的IO。它的流程如图:

​ 形象的来理解的话,各种recv或者accept相当于是一个个的小区住户,select代理是门卫老大爷,kernel相当于快递站点,住户自己平时有其他事情要做,就委托老大爷帮忙看看快递啥时候到(委托代理去轮询),老大爷也不可能每隔一段时间就跟住户说一下快递的情况,只有等住户的快递到了(内核层接收到外来的数据),老大爷才会根据不同的快递去通知不同的住户来签收快递(代理只是负责轮询和通知,不负责接收数据)。

​ 官方一点来说,当用户进程调用了select,那么整个进程会被block,而同时,kernel会“监视”所有select负责的socket,当任何一个socket中的数据准备好了,select就会返回。这个时候用户进程再调用read操作,将数据从kernel拷贝到用户进程。

​ 和阻塞IO模型相比,多路复用IO模型在整个机制上可能还要更差一点,因为它需要使用两个系统调用(select和recvfrom),而阻塞IO只调用了一个系统调用(recvfrom)。但是,用select的优势在于它可以同时处理多个连接。

1
2
3
4
5
6
7
# select 模块
r,w,x = select.select([],[],[])

# 使用说明:
# 1.传入三个列表参数,存放的都是委托select要监听的对象
# 2.传入的参数分别为 "读"相关的对象(对于客户端来说就是是否有连接请求或者外来数据),"写"相关的对象,"修改"相关的对象
# 3.返回值也为三个列表类型的值,select监听的某一类对象有动静了,就会把该对象所在的列表返回
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# 基于select的网络IO模型 server端
import select
import socket

sk = socket.socket()
sk.bind(("127.0.0.1", 8080))
sk.listen()
rlist = [sk]
del_l = list()

while True:
r, w, x = select.select(rlist, [], []) # 将sk委托给select,让其帮忙监听是否会有连接请求
if r: # 如果有连接请求或者外来数据发送请求,r就会有返回值
for i in r:
if i == sk: # 如果有反应的对象是sk,那么就连接请求的客户端
conn, addr = sk.accept()
rlist.append(conn) # 将连接的客户端继续交给select去监听,监听是否会有数据过来
else: # 如果不是sk有反应,那就是已经连接的客户端有反应了
try:
info = i.recv(1024).decode("utf-8")
if not info: # 接收的数据为空,表示客户端正常断开
del_l.append(i)
i.close()
else:
print(info)
i.send((info.upper().encode('utf-8')))
except ConnectionResetError: # 处理客户端异常断开的情况
continue
if del_l: # 删除那些主动断开连接的客户端的conn
for conn in del_l:
rlist.remove(conn)
del_l.clear()
1
2
3
4
5
6
7
8
9
10
11
12
13
# client端
import socket
import time

sk = socket.socket()
sk.connect(("127.0.0.1", 8080))

while True:
res = input(">>>")
sk.send(res.encode("utf-8"))
time.sleep(1)
info = sk.recv(1024).decode("utf-8")
print(info)

注意:

  1. 如果处理的连接数不是很高的话,使用select/epoll的web server不一定比使用multi-threading + blocking IO的web server性能更好,可能延迟还更大。select/epoll的优势并不是对于单个连接能处理得更快,而是在于能处理更多的连接。

  2. 在多路复用模型中,对于每一个socket,一般都设置成为non-blocking,但是,如上图所示,整个用户的process其实是一直被block的。只不过process是被select这个函数block,而不是被socket IO给block。

所以,select的优势在于可以处理多个连接,不适用于单个连接

多路复用IO的优点

​ 相比其他模型,使用select() 的事件驱动模型只用单线程(进程)执行,占用资源少,不消耗太多 CPU,同时能够为多客户端提供服务。如果试图建立一个简单的事件驱动的服务器程序,这个模型有一定的参考价值。

多路复用IO的缺点

  • 首先select()接口并不是实现“事件驱动”的最好选择。因为当需要探测的句柄值较大时,select()接口本身需要消耗大量时间去轮询各个句柄。很多操作系统提供了更为高效的接口,如linux提供了epoll,BSD提供了kqueue,Solaris提供了/dev/poll,…。如果需要实现更高效的服务器程序,类似epoll这样的接口更被推荐。遗憾的是不同的操作系统特供的epoll接口有很大差异,所以使用类似于epoll的接口实现具有较好跨平台能力的服务器会比较困难。
  • 其次,该模型将事件探测和事件响应夹杂在一起,一旦事件响应的执行体庞大,则对整个模型是灾难性的。

select poll epoll的区别:

  • select 和 poll 有一个共同的机制,都是采用轮训的方式去询问内核,有没有数据准备好了。
  • select有一个最大监听事件的限制,32位机限制1024,6位机限制2048;poll没有,理论上poll可以开启无限大,1G内存大概够你开10W个事件去监听。
  • epoll是最好的,采用的是回调机制,解决了select和poll共同存在的问题,而且epoll理论上也可以开启无限多个监听事件。

异步IO

​ 异步IO模型在几种模型中是最好的,用户进程发起read操作之后,立刻就可以开始去做其它的事。而另一方面,从kernel的角度,当它受到一个asynchronous read之后,首先它会立刻返回,所以不会对用户进程产生任何block。然后,kernel会等待数据准备完成,然后将数据拷贝到用户内存,当这一切都完成之后,kernel会给用户进程发送一个signal,告诉它read操作完成了。

形象的来说,在异步IO中,没有门卫大爷的角色,只有住户(用户进程)和快递员(数据)。住户先打电话给快递员问一下快递有没有到,没有到的话住户就先去做自己的事情了,等快递到了以后,快递员会主动给住户打电话通知取快递。


Python IO多路复用
https://clark-cdc.github.io/2019/06/10/0019-IO多路复用/
作者
clark
发布于
2019年6月10日
许可协议