Python学习——异步IO

前言

学完了python的多线程,终于满心欢喜地去分析lijiejie的扫描器了,结果发现他还用到了协程/(ㄒoㄒ)/~~,没办法,再回来学习一下协程的知识。

学过计组我们都知道,CPU的速度远高于IO设备的速度。每次执行一个任务,CPU执行一部分后需要等待IO设备执行完,CPU再去执行。这样严重拖慢了CPU的效率。这种运行方式就叫同步IO。那什么是异步IO呢?就是CPU再完成了自己一部分工作后,把IO的任务分配给IO设备后就不再过问,继续执行手头的活,等IO设备的任务执行完后,告诉CPU一声,CPU再接下去执行。这样的好处是CPU的运行不会受IO拖累,一直保持高速执行,从而提高整体执行效率。

在“发出IO请求”到收到“IO完成”的这段时间里,同步IO模型下,主线程只能挂起,但异步IO模型下,主线程并没有休息,而是在消息循环中继续处理其他消息。这样,在异步IO模型下,一个线程就可以同时处理多个IO请求,并且没有切换线程的操作。对于大多数IO密集型的应用程序,使用异步IO将大大提升系统的多任务处理能力。

generator与yield

好吧,在讲生成器之前,先讲一下列表生成式(ps:你咋不从盘古开天辟地开始讲…)

在python中可以很方便地产生一个list,如

1
l = [x * x for x in range(10)]

他将返回一个包含0-9的二次幂的list,但是如果我们要包含100万个元素呢?这将会占用很大的内存。如果我们仅仅需要访问前面几个元素,那后面的内存空间就白白浪费了。

如果列表中的元素可以按某种算法推算出来,那我们就不必在用之前将整个list计算出来,而可以在一边循环的时候一边再计算出具体的值。这种机制叫做生成器。还是上面这个例子,产生一个生成器最简单的方式就是将[]变成()即可将list变为生成器。如

1
g = (x * x for x in range(10))

获取其中的值的方式也很简单,一种是单步获取,通过不断next()函数来计算下一步的值;另一种是通过for循环来获取,和for循环一个list没什么两样。

生成器除了next()方法,还有send(value)和close()方法,send(value)方法会将当前yield表达式的值设置为value。close()方法就是关闭当前生成器。

上面说的是产生一个生成器的列表形式,如果要写成函数的形式呢?生成器有另一种定义方式,如果一个函数中包含yield关键字,那么这个函数就不是普通函数了,而是一个生成器。

简单地说,yield可以等同于return,yield就是 return 返回一个值,并且记住这个返回的位置,下次迭代就从这个位置后开始。

注意:用for循环一个生成器函数,会拿不到函数return的值。如果想要拿到返回值,必须捕获StopIteration错误,返回值包含在StopIterationvalue中,如:

1
2
3
4
5
6
7
8
g = generator(6)
while Ture:
try:
x = next(g)
print('g:',x)
except StopIteration as e:
print('Generator return value', e.value)
break

协程

在多进程和多线程中,我们说它们始终会有数量上限,无论是进程还是线程都不可能无上限并发,达到这个上限后CPU忙于切换,无暇处理真正的任务,导致效率变慢。进程的切换,一切进程需要反复进入内核,置换掉一大堆状态,会带来大量内存的加载。线程的切换,虽然是在同一个进程里,共享一个地址空间,不用麻烦地切换内存,刷新TLB,只需要把寄存器刷新一遍,大大减小了进程切换带来的资源开销。但这些操作依然是由系统来调度,如果我们可以由用户态来实现整个程序的逻辑流,就避免了系统来进行任务调度,由用户态来实现的任务调度机制就是协程。是在一个线程之内实现任务调度。

和多线程比,协程最大的优势就是极高的执行效率。因为子程序切换不是线程切换,而是由程序自身控制,因此,没有线程切换的开销,和多线程比,线程数量越多,协程的性能优势就越明显。

第二大优势就是不需要多线程的锁机制,因为只有一个线程,也不存在同时写变量冲突,在协程中控制共享资源不加锁,只需要判断状态就好了,所以执行效率比多线程高很多。

协程是一个线程执行,那怎么利用多核CPU呢?最简单的方法是多进程+协程,既充分利用多核,又充分发挥协程的高效率,可获得极高的性能。

来看一个协程的生产者-消费者例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
def consumer():
r = ''
while True:
n = yield r
if not n:
return
print('[CONSUMER] Consuming %s...' % n)
r = '200 OK'

def produce(c):
c.send(None)
n = 0
while n < 5:
n = n + 1
print('[PRODUCER] Producing %s...' % n)
r = c.send(n)
print('[PRODUCER] Consumer return: %s' % r)
c.close()

c = consumer()
produce(c)

执行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
[PRODUCER] Producing 1...
[CONSUMER] Consuming 1...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 2...
[CONSUMER] Consuming 2...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 3...
[CONSUMER] Consuming 3...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 4...
[CONSUMER] Consuming 4...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 5...
[CONSUMER] Consuming 5...
[PRODUCER] Consumer return: 200 OK

这段程序定义了两个函数,一个生产者,一个消费者,先产生一个消费者,作为参数传给生产者。这个生产者函数还有yield关键字,所以是一个生成器。首先执行生产者函数,调用c.send(None)调用了consumer(),这里我们用pdb来debug一下这个程序,看一下整个执行过程

执行过程1

call consumer(),走consumer()的程序,进入while循环后,yield返回r的值,n也被赋值为None,由于满足下面的if条件,程序被return到produce()

执行过程2

在produce()中进入while循环,对n进行了加1,打印一串字符串后继续send,调用consumer(),此时传入的n为1

image-20210219185234317

在回到consumer()后,注意是从上一次yield的地方开始执行,而不是从头执行,此时n为1,不满足if条件,所以不会被return,print后r被赋值为200ok,再执行到yield后返回到produce()并传回响应。接下来就是循环出来结束程序。建议自己去调试一下感受一下整个过程。

image-20210219185901228

整个流程无锁,由一个线程执行,produceconsumer协作完成任务,所以称为“协程”,而非线程的抢占式多任务。

asyncio

async和await的前生是基于生成器的协程。原本中间还想写一下用原始的生成器和yield from,但没必要了。从python3.10版本开始就会废弃这种模式。所以现在直接使用async和await就可以了,还简化了流程。注意,async和await在3.5及以后才可以使用。

async与await

async 定义一个协程,await 用来挂起阻塞方法的执行。先来看一个例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import asyncio
import time

async def say_after(delay, what):
await asyncio.sleep(delay)
print(what)

async def main():
print(f"started at {time.strftime('%X')}")

await say_after(1, 'hello')
await say_after(2, 'world')

print(f"finished at {time.strftime('%X')}")

asyncio.run(main())

运行结果

1
2
3
4
started at 17:13:52
hello
world
finished at 17:13:55

使用非常简单,在异步函数前加上async关键字,在费时间的操作前加上await关键字,就可以了。那这样是按程序顺序执行了两个say_after()函数,总的执行时间还是3秒

并发执行

那么如何并发执行呢。答案是可以用asyncio.create_task()函数,该函数可以接受一个协程,并返回一个任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import asyncio
import time

async def say_after(delay, what):
await asyncio.sleep(delay)
print(what)

async def main():
task1 = asyncio.create_task(say_after(1, 'hello'))
task2 = asyncio.create_task(say_after(2, 'world'))
print(f"started at {time.strftime('%X')}")

await task1
await task2

print(f"finished at {time.strftime('%X')}")

asyncio.run(main())

运行结果

1
2
3
4
started at 17:14:32
hello
world
finished at 17:14:34

可以看到,并发的执行时间几乎与执行时间最长的任务的执行时长相等。原先同步方式需要3秒,现在只需要2秒。

可等待对象

我们注意到这里有一个任务的概念,在python中被称为可等待对象,如果一个对象可以在await语句中使用,那么这个对象就是可等待对象。python中的可等待对象除了任务,还有协程Future

task:任务,它是对协程对象的进一步封装,包含了任务的各个状态。

Future:代表将来执行或没有执行的任务的结果,实际上和 task 没有本质区别

并发运行任务

有朋友可能会奇怪,上面不是讲过并发了吗?是的,只是这里用gather()方法对任务的处理做了简化。

先看一个计算阶乘的例子,用原先的写法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import asyncio

async def factorial(name, num):
"""docstring for factorial"""
f = 1
for i in range(2, num + 1):
print(f"Task {name}: Compute factorial({i})...")
await asyncio.sleep(1)
f *= i
print(f"Task {name}: factorial({num}) = {f}")

async def main():
"""docstring for main"""
task1 = asyncio.create_task(factorial("A",2))
task2 = asyncio.create_task(factorial("B",3))
task3 = asyncio.create_task(factorial("C",4))

await task1
await task2
await task3

asyncio.run(main())

执行结果

1
2
3
4
5
6
7
8
9
Task A: Compute factorial(2)...
Task B: Compute factorial(2)...
Task C: Compute factorial(2)...
Task A: factorial(2) = 2
Task B: Compute factorial(3)...
Task C: Compute factorial(3)...
Task B: factorial(3) = 6
Task C: Compute factorial(4)...
Task C: factorial(4) = 24

现在改用gather(*aws, loop=None, return_exceptions=False),参数aws可以直接接受一个协程,将自动转为任务。如果所有可等待对象都成功完成,结果将是一个由所有返回值聚合而成的列表。结果值的顺序与 aws 中可等待对象的顺序一致。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import asyncio

async def factorial(name, number):
f = 1
for i in range(2, number + 1):
print(f"Task {name}: Compute factorial({i})...")
await asyncio.sleep(1)
f *= i
print(f"Task {name}: factorial({number}) = {f}")

async def main():
# Schedule three calls *concurrently*:
await asyncio.gather(
factorial("A", 2),
factorial("B", 3),
factorial("C", 4),
)

asyncio.run(main())

执行结果与上面的程序一致

协程爬虫

弄明白了协程的概念,学会了简单的协程并发编程,我们可以尝试用上面的知识写一个小爬虫。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import asyncio
import requests
import time

URLS = ['https://www.baidu.com/',
'https://www.sina.com/',
'https://www.qq.com/',
'https://www.bilibili.com',
'https://www.zhihu.com']

async def load_url(url):
"""docstring for load_url"""
print(f"loading {url}...")
res = await requests.get(url)
sc = res.status_code
print(f"res {url}:{sc}")

async def main():
"""docstring for main"""
tasks = []
for url in URLS:
task = load_url(url)
tasks.append(task)
await asyncio.gather(*tasks)

start = time.time()
asyncio.run(main())
print("spend time: %.2f seconds." % (time.time() - start))

执行结果报错了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
loading https://www.baidu.com/...
loading https://www.sina.com/...
loading https://www.qq.com/...
loading https://www.bilibili.com...
loading https://www.zhihu.com...
Traceback (most recent call last):
File "crawl_async.py", line 27, in <module>
asyncio.run(main())
File "/usr/lib/python3.8/asyncio/runners.py", line 43, in run
return loop.run_until_complete(main)
File "/usr/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete
return future.result()
File "crawl_async.py", line 24, in main
await asyncio.gather(*tasks)
File "crawl_async.py", line 14, in load_url
res = await requests.get(url,timeout=timeout)
TypeError: object Response can't be used in 'await' expression

最后一行是报错的原因,await不能用在response对象上,根据官方文档说明,await 后面的对象必须是如下格式之一:

  • 一个原生 coroutine 对象。
  • 一个由 types.coroutine() 修饰的生成器,这个生成器可以返回 coroutine 对象。
  • 一个包含 __await 方法的对象返回的一个迭代器。

可以参见:https://www.python.org/dev/peps/pep-0492/#await-expression

reqeusts 返回的 Response 不符合上面任一条件,因此就会报上面的错误了。

那如果将requests.get()方法包裹成 coroutine 对象,试试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import asyncio
import requests
import time

URLS = ['https://www.baidu.com/',
'https://www.sina.com/',
'https://www.qq.com/',
'https://www.bilibili.com',
'https://www.zhihu.com']

async def get(url):
return requests.get(url)

async def load_url(url):
"""docstring for load_url"""
print(f"loading {url}...")
res = await get(url)
sc = res.status_code
print(f"res {url}:{sc}")

async def main():
"""docstring for main"""
tasks = []
for url in URLS:
task = load_url(url)
tasks.append(task)
await asyncio.gather(*tasks)

start = time.time()
asyncio.run(main())
print("spend time: %.2f seconds." % (time.time() - start))

执行结果

1
2
3
4
5
6
7
8
9
10
11
loading https://www.baidu.com/...
res https://www.baidu.com/:200
loading https://www.sina.com/...
res https://www.sina.com/:200
loading https://www.qq.com/...
res https://www.qq.com/:200
loading https://www.bilibili.com...
res https://www.bilibili.com:200
loading https://www.zhihu.com...
res https://www.zhihu.com:403
spend time: 1.98 seconds.

能正常执行,但看这个输出,并没有按我们预期的异步并发执行,也就是说我们仅仅将涉及 IO 操作的代码封装到 async 修饰的方法里面是不可行的!我们必须要使用支持异步操作的请求方式才可以实现真正的异步,所以这里就需要 aiohttp 派上用场了。

aiohttp

官方文档链接为:aiohttp,它分为两部分,一部分是 Client,一部分是 Server,详细的内容可以参考官方文档。

下面我们将 aiohttp 用上来,将代码改成如下样子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import asyncio
import aiohttp
import time

URLS = ['https://www.baidu.com/',
'https://www.sina.com/',
'https://www.qq.com/',
'https://www.bilibili.com',
'https://www.zhihu.com']


async def load_url(url):
"""docstring for load_url"""
print(f"loading {url}...")
async with aiohttp.ClientSession() as session:
async with session.get(url) as res:
print(f"res {url}:{res.status}")

async def main():
"""docstring for main"""
tasks = []
for url in URLS:
task = load_url(url)
tasks.append(task)
await asyncio.gather(*tasks)

start = time.time()
asyncio.run(main())
print("spend time: %.2f seconds." % (time.time() - start))

执行结果

1
2
3
4
5
6
7
8
9
10
11
loading https://www.baidu.com/...
loading https://www.sina.com/...
loading https://www.qq.com/...
loading https://www.bilibili.com...
loading https://www.zhihu.com...
res https://www.qq.com/:200
res https://www.sina.com/:200
res https://www.baidu.com/:200
res https://www.zhihu.com:200
res https://www.bilibili.com:200
spend time: 0.46 seconds.

看到这个执行结果,终于心里松了口气,异步了。

再看到这个执行时间,心里不禁:woc,牛逼。。比之前快了好几倍。

与多线程、多进程对比

我们把上面的代码先改成多线程的方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
from multiprocessing.dummy import Pool as ThreadPool
import requests as r
import time

URLS = ['https://www.baidu.com/',
'https://www.sina.com/',
'https://www.qq.com/',
'https://www.bilibili.com',
'https://www.zhihu.com']

def load_url(url):
print(f"loading {url}...")
with r.get(url) as conn:
print(f"res {url}:{conn.code}")

pool = ThreadPool(processes = 50)
start = time.time()
results = pool.map(load_url,URLS)
print("spend time: %.2f seconds." % (time.time() - start))
pool.close()
pool.join()

执行结果

1
2
3
4
5
6
7
8
9
10
11
loading https://www.baidu.com/...
loading https://www.sina.com/...
loading https://www.qq.com/...
loading https://www.bilibili.com...
loading https://www.zhihu.com...
res https://www.sina.com/:200
res https://www.baidu.com/:200
res https://www.qq.com/:200
res https://www.zhihu.com:403
res https://www.bilibili.com:200
spend time: 0.54 seconds.

可以看到开了个线程依然没有协程快

我们再把代码改成多进程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from multiprocessing import Pool
import requests as r
import time

URLS = ['https://www.baidu.com/',
'https://www.sina.com/',
'https://www.qq.com/',
'https://www.bilibili.com',
'https://www.zhihu.com']

def load_url(url):
"""docstring for load_url"""
print(f"loading {url}...")
with r.get(url) as conn:
print(f"res {url}:{conn.status_code}")

pool = Pool(50)
start = time.time()
for url in URLS:
pool.apply_async(load_url, args=(url,))
pool.close()
pool.join()
print("spend time: %.2f seconds." % (time.time() - start))

执行结果

1
2
3
4
5
6
7
8
9
10
11
loading https://www.baidu.com/...
loading https://www.sina.com/...
loading https://www.qq.com/...
loading https://www.bilibili.com...
loading https://www.zhihu.com...
res https://www.sina.com/:200
res https://www.qq.com/:200
res https://www.baidu.com/:200
res https://www.zhihu.com:403
res https://www.bilibili.com:200
spend time: 1.33 seconds.

比多线程还慢,不得不说,协程最快,消耗的资源还少

根据上面的协程小爬虫,在安全开发中,也可以改装成扫描器

参考

async/await 实现协程


Python学习——异步IO
https://wanf3ng.github.io/2021/02/16/Python学习——异步IO/
作者
wanf3ng
发布于
2021年2月16日
许可协议