1. 并发、并行、同步、异步、阻塞、非阻塞
- 并发:一个时间段内在同一个 cpu 上有多个程序在运行,但任意时刻只有一个程序在运行
- 并行:任意时刻多个程序同时运行在多个 cpu 上
- 同步:代码调用 IO 操作时,必须等待 IO 操作完成才返回
- 异步:代码调用 IO 操作时,不必等待 IO 操作完成才返回
- 阻塞:调用函数时,当前线程被挂起
- 非阻塞:调用函数时,单曲线程不会被挂起,而是立即返回
2. C10K 问题和 IO 多路复用
2.1 C10K问题
2.2 Linux 五种 IO 模型
Linux IO 模型
2.3 select + 回调 + 事件循环模拟 http 请求
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
| import selectors
import socket
import time
import urllib.parse
selector = selectors.DefaultSelector() # 根据不同平台自动选择最好的 io 多路复用方式
urls = ['http://www.baidu.com'] * 20
stop = False
class Fetcher:
def __init__(self):
self.host = None
self.path = None
self.client = None
self.cur_url = None
self.data = b''
def get_url(self, url):
self.cur_url = url
url = urllib.parse.urlparse(url)
self.host = url.netloc
self.path = url.path if url.path != '' else '/'
# 建立 socket 连接
self.client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.client.setblocking(False)
try:
self.client.connect((self.host, 80))
except BlockingIOError as e:
pass
# 把 socket 注册到 selector 上
selector.register(self.client.fileno(), selectors.EVENT_WRITE, self.connected) # 回调模式,当这个 socket 上可写时,调用
def connected(self, key):
selector.unregister(key.fd)
self.client.send(
'GET {} HTTP/1.1\r\nHost:{}\r\nConnection:Close\r\n\r\n'.format(self.path, self.host).encode('utf8'))
selector.register(self.client.fileno(), selectors.EVENT_READ, self.readable)
def readable(self, key): # 准备好一段读一段,该函数可能有多次 EVENT_READ 多次被调用
d = self.client.recv(1024)
if d:
self.data += d
else:
selector.unregister(key.fd)
data = self.data.decode('utf8')
html = data.split('\r\n\r\n')[1]
print(html)
self.client.close()
urls.remove(self.cur_url)
if not urls:
global stop
stop = True
def loop():
# 事件循环,不停的请求 socket 的状态,并调用对应的回调函数
# twisted、gevent、asyncio 本质上来讲都是这种模式:回调 + 事件循环 + select\poll\epoll
# 1. select 本身不支持 register 模式(回调)的
# 2. socket 状态变化以后的回调是由我们的程序完成的
while not stop:
ready = selector.select()
for key, mask in ready:
call_back = key.data
call_back(key)
def get_url(url):
url = urllib.parse.urlparse(url)
host = url.netloc
path = url.path if url.path != '' else '/'
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.setblocking(False)
try:
client.connect((url.netloc, 80))
except BlockingIOError as e:
pass
while True:
try:
client.send('GET {} HTTP/1.1\r\nHost:{}\r\nConnection:Close\r\n\r\n'.format(path, host).encode('utf8'))
except OSError as e:
pass
else:
break
data = b''
while True:
try:
d = client.recv(1024)
except BlockingIOError as e:
continue
if d:
data += d
else:
break
data = data.decode('utf8')
html = data.split('\r\n\r\n')[1]
print(html)
client.close()
if __name__ == '__main__':
# 异步
start = time.time()
for url in urls:
fetcher = Fetcher()
fetcher.get_url(url)
loop()
print('select:', time.time() - start)
# 同步
urls = ['http://www.baidu.com'] * 20
start = time.time()
for url in urls:
get_url(url)
print('for :', time.time() - start)
|
2.4 回调之痛
问题:
- 异常不由主函数捕获,需要在 loop 中处理,难以处理
- 嵌套回调,层数多了难以理解和维护,如果某一层出异常,难以处理
- 变量在回调间共享难以维护
总结:
3. 协程
3.1 C10M 问题
随互联网发展,C10K 都不够用了
3.2 协程
要实现线程内切换,需要可暂停的函数,并且可以在适当的时候恢复以继续执行
协程:可暂停的函数,可以向暂停的地方传入值
3.3 生成器高级特性
启动生成器的方法:
其他方法:
gen.close()
:关闭生成器gen.throw()
:向上次暂停的地方传入异常
return 值:
- 运行到 return 语句会抛出
StopIteration
异常,e.value
就是返回值
3.4 yield from
itertools.chan()
可以连接多个 Iterable 对象
yield from [sub-generator | iterable]
会在调用方和子生成器之间建立一个双向通道:
next()
会从子生成器 yield 出一个值send()
会发送到子生成器throw()
会发送到子生成器- 子生成器的 return 值会返回到 yield from 所在行,赋值给左边(把子生成器抛出的
StopIteration
异常中的 value
复制给左边)。
3.5 async 和 await
python 为了语义明确,就引入了 async 和 await 关键词用于定义原生协程