1. 引言
在博文中<python 基础知识梳理——Python中的多进程和多线程>,我们还没有讲如何通过生成器来实现Python的协程。
协程是一种实现并发编程的方式,当然多进程/多线程也是解决并发的手法,但是当同时连接到服务器的客户端达到一定的量级,进程的上下文切换占用了大量的资源,线程也顶不住如此巨大的压力了,此时我们就需要一个调度器来对任务进行调度,节省多线程中启动线程、管理线程、同步锁等各种开销。Nginx,在高并发下能够保持低资源、低消耗、高性能就是依赖调度器(例如:轮询算法)。
在Python中,使用生成器实现协程在Python2中常见,在Python3.7及之后的版本中,提供了新的基于asyncio和async/await的方法,鉴于现在已经2020[::-1]
年(笑),我们从Python的新特性来讲新的协程。
2. 协程的实现
2.1 例子1:爬虫
%time
为jupyter notebook
中ipython
解释器的语法糖,用于测试语句运行时间。
4个任务共耗时10秒,接下来我们用协程实现并发来优化一下,提高效率。
import asyncio
async def get_page(url):
print('acquire page {}'.format(url))
sleep_time = int(url.split('_')[-1])
await asyncio.sleep(sleep_time)
print('ok {}'.format(url))
async def main(urls):
for url in urls:
await get_page(url)
asyncio.run(main(['url_1','url_2','url_3','url_4']))
# 输出
acquire page url_1
ok url_1
acquire page url_2
ok url_2
acquire page url_3
ok url_3
acquire page url_4
ok url_4
Wall time: 10 s
在Python3.7之后,协程写异步程序非常简单,大部分协程用到的魔法方法都被
asyncio
库包含了,我们只需要在函数中用async
修饰词声明异步函数,再用await
调用即可。
2.2 我们来理一下思路:
首先,在例子中,我们使用import asyncio
导入包,然后用async
声明了get_page()
和 main()
为异步函数,当我们调用异步函数时,我们就会得到一个协程对象。
我们声明了异步函数后,就需要对异步函数进行调用,常用的协程执行方法有3种:
-
我们可以通过
await
来调用,await
执行的效果和Python正常执行的效果是一样的,程序执行后阻塞在这里,进入被调用的协程函数,执行完毕返回后再继续,这也是await
的意思。await asyncio.sleep(sleep_time)
的意思是在这里休息数秒,await get_page(url)
则表示执行get_page()函数。 -
我们还可以用
asyncio.create_task()
来创建任务,后续可能会写一篇博文详细整理一下并发编程,这里我们先略过。 -
最后,通过
asyncio.run
来触发运行,asyncio.run
这个函数可以非常简单的调用协程,不用关注协程中的事件循环问题,使用方法参考源码中的实例。Example: async def main(): await asyncio.sleep(1) print('hello') asyncio.run(main())
我们发现,运行时间还是10秒?这是怎么回事呢?await
是同步调用,因此,get_page(url)
并不会在当前的调用结束后触发下一次调用,相当于用异步接口写了一个同步代码。
下面,我们用asyncio.create_task()
来创建任务,实现异步。
import asyncio
async def get_page(url):
print('acquire page {}'.format(url))
sleep_time = int(url.split('_')[-1])
await asyncio.sleep(sleep_time)
print('ok {}'.format(url))
async def main(urls):
tasks = [asyncio.create_task(get_page(url)) for url in urls]
for task in tasks:
await task
asyncio.run(main(['url_1','url_2','url_3','url_4']))
# 输出
acquire page url_1
acquire page url_2
acquire page url_3
acquire page url_4
ok url_1
ok url_2
ok url_3
ok url_4
Wall time: 3.66 s
很明显,对比输出结果,四个任务几乎是同时创建的,任务创建后很快就被调度执行,任务代码也不会阻塞在这里,所以我们要等待所有任务都结束才执行,用for task in tasks:await task
即可。
很显然,对比多线程,协程的写法更清晰且一目了然,对于task
任务,其实还有一种写法,我们来看一看:
import asyncio
async def get_page(url):
print('acquire page {}'.format(url))
sleep_time = int(url.split('_')[-1])
await asyncio.sleep(sleep_time)
print('ok {}'.format(url))
async def main(urls):
tasks = [asyncio.create_task(get_page(url)) for url in urls]
await asyncio.gather(*tasks)# 一个解包操作
asyncio.run(main(['url_1','url_2','url_3','url_4']))
# 输出
acquire page url_1
acquire page url_2
acquire page url_3
acquire page url_4
ok url_1
ok url_2
ok url_3
ok url_4
Wall time: 3.66 s
2.3 总结
相比之前的代码,多了一个*tasks
的解包操作,将列表变为了函数的参数;**tasks
则是将字典变成了函数的参数
相比python2
中的yield
创建协程,python3.7
后提供的asyncio.create_task()
、asyncio.run()
、await
相比于旧接口更容易理解和阅读,不需要关注内部实现,更关注代码本身(写着写着感觉更像numpy
和pytorch
的感觉了,哈哈哈哈)
3. 协程的底层实现
3.1 例子2
import asyncio
async def work_1():
print('work 1 start ')
await asyncio.sleep(1)
print('work 1 is done!')
async def work_2():
print('work 2 start ')
await asyncio.sleep(2)
print('work 2 is done')
async def main():
print('before await ')
await work_1()
print('awaited work_1')
await work_2()
print('awaited work_2')
asyncio.run(main())
# 输出
before await
work 1 start
work 1 is done!
awaited work_1
work 2 start
work 2 is done
awaited work_2
3.2 例子3
import asyncio
async def work_1():
print('work 1 start ')
await asyncio.sleep(1)
print('work 1 is done!')
async def work_2():
print('work 2 start ')
await asyncio.sleep(2)
print('work 2 is done')
async def main():
task1 = asyncio.create_task(work_1())
task2 = asyncio.create_task(work_2())
print('before await ')
await task1
print('awaited work 1')
await task2
print('awaited work 2')
asyncio.run(main())
# 输出
before await
work 1 start
work 2 start
work 1 is done!
awaited work 1
work 2 is done
awaited work 2
例子2和例子3中的执行顺序,是不是有些不一样呢?
asyncio.run(main())
表示程序进入main()函数,事件循环开始;- task1和task2任务被创建,进入事件循环等待,然后
print('before await ')
; await task1
执行,用户选择从当前的主任务中切出,事件调度器开始调度work_1;- work_1开始执行,运行
print('work 1 start ')
,然后运行await asyncio.sleep(1)
,从当前任务切出,事件调度器开始调度work_2; - work_2开始运行,运行
print('work 2 start ')
,然后运行await asyncio.sleep(2)
,从当前任务切出; - 以上所有的事件的运行时间,都应该在1ms~10ms,甚至更短,事件调度器从这个时候开始暂停调度;
- 一秒钟后,work_1的sleep结束,事件调度器将控制权重新交给task_1,输出
work 1 is done!
,task_1任务完成,从事件循环中退出; await task1
完成,事件调度器将控制器传给主任务,输出awaited work 1
,然后在await task2处等待;- 两秒钟后,work_2的sleep结束,事件调度器将控制权重新传给task_2,输出
work 2 is done!
,task_2任务完成,从事件循环中退出; - 主任务输出
awaited work 2
,协程任务完成,事件循环结束。
3.3 超时任务
假如我们在Python中配置爬虫,那么当爬取一条任务时候出错该怎么办呢?最简单的应该是超时取消,又该怎么做呢?
import asyncio
async def work_1():
await asyncio.sleep(1)
return 1
async def work_2():
await asyncio.sleep(2)
return 2/0
async def work_3():
await asyncio.sleep(3)
return 3
async def main():
task_1 = asyncio.create_task(work_1())
task_2 = asyncio.create_task(work_2())
task_3 = asyncio.create_task(work_3())
await asyncio.sleep(2)
task_3.cancel()
res = await asyncio.gather(task_1,task_2,task_3,return_exceptions=True)
print(res)
asyncio.run(main())
# 输出
[1, ZeroDivisionError('division by zero'), CancelledError()]
上述例子中,work_1工作正常,work_2运行中出现错误,work_3执行时间过长被我们cancel掉了,这些信息被返回到res中并被打印出来了,其中我们设置了return_exceptions=True
,如果不设置为True
,那么我们就是必须捕获异常,也就无法往下继续执行了。
3.4 生产者消费者模型
import asyncio
import random
async def consumer(queque,id):
while True:
val = await queque.get()
print('{} get a val: {} '.format(id,val))
await asyncio.sleep(1)
async def producer(queue,id):
for i in range(5):
val = random.randint(1,10)
await queue.put(val)
print('{} put a val : {}'.format(id,val))
await asyncio.sleep(1)
async def main():
queue = asyncio.Queue()
consumer_1 = asyncio.create_task(consumer(queue,'consumer_1'))
consumer_2 = asyncio.create_task(consumer(queue,'consumer_2'))
producer_1 = asyncio.create_task(producer(queue,'producer_1'))
producer_2 = asyncio.create_task(producer(queue,'producer_2'))
await asyncio.sleep(10)
consumer_1.cancel()
consumer_2.cancel()
await asyncio.gather(consumer_1,consumer_2,producer_1,producer_2,return_exceptions=True)
asyncio.run(main())
# 输出
producer_1 put a val : 1
producer_2 put a val : 1
consumer_1 get a val: 1
consumer_2 get a val: 1
producer_1 put a val : 2
producer_2 put a val : 2
consumer_1 get a val: 2
consumer_2 get a val: 2
producer_1 put a val : 6
producer_2 put a val : 10
consumer_1 get a val: 6
consumer_2 get a val: 10
producer_1 put a val : 8
producer_2 put a val : 2
consumer_1 get a val: 8
consumer_2 get a val: 2
producer_1 put a val : 9
producer_2 put a val : 1
consumer_1 get a val: 9
consumer_2 get a val: 1
4. 总结
- 协程和多线程的区别:①协程是单线程的;②协程由用户决定在什么时候交出控制权,切换到下一个任务
- 在Python3.7版本后,协程的写法更加简单,结合库
asyncio
中的async/await
和create_task
,对中小级别的并发编程已经毫无压力 - 协程的使用,什么时候暂停等待I/O,什么时候需要执行到底,需要有一个事件循环的概念
Q.E.D.