1.协程的概念

首先说什么是协程,协程就是用户态轻量级的线程, 是在一个线程内部实现的,它的开销要比线程小。

协程的在多任务处理时非抢占式的, 这种情况下协程会主动交出控制权

函数(子程序)在执行时都时层级调用的,比如A调用B, B调用C, C执行完毕返回,B执行完毕返回,最后A执行完毕返回,所以函数调用时通过栈来实现的, 调用顺寻明确。

而协程不是这样的, 协程看上去也是子程序, 但是协程在执行过程中,内部可能中断, 然后转而执行别的子程序,在适当的时候在返回来执行

2.协程的优势
  • 没有锁等待,协程逻辑上只有一个线程,不存在同时写变量冲突,在协程中控制共享资源不需要加锁,仅需要判断状态就可以了
  • 执行效率高,函数(子程序)的切换不是线程的切换,没有线程切换所需的开销
3.协程的使用场景
  • 一个线程内的多个协程是串行执行的, 不能利用多核, 所以,协程不适合CPU密集型的场景, 适合I/O密集型的场景
  • I/O相对于CPU而言是阻塞型的, 当一个I/O密集型的程序在操作I/O的时候,CPU实际上是空闲的
4.协程使用实例

Python2使用生成器实现协程,Python 3.7 提供了基于aysncioasync/await的方法

模拟爬虫

  • 传统方法:

    1
    import time
    2
    3
    def crawl_page(url):
    4
        print('crawling {}'.format(url))
    5
        sleep_time = int(url.split("_")[-1])
    6
        time.sleep(sleep_time)
    7
        print("OK {}".format(url))
    8
    9
    def main(urls):
    10
        for url in urls:
    11
            crawl_page(url)
    12
    13
    14
    if __name__ == "__main__":
    15
        main(["url_1", "url_2", "url_3", "url_4"])

    output

    1
    crawling url_1
    2
    OK url_1
    3
    crawling url_2
    4
    OK url_2
    5
    crawling url_3
    6
    OK url_3
    7
    crawling url_4
    8
    OK url_4
  • 使用协程

    1
    import asyncio
    2
    3
    4
    async def crawl_page(url):
    5
        print('crawling {}'.format(url))
    6
        sleep_time = int(url.split("_")[-1])
    7
        await asyncio.sleep(sleep_time)
    8
        print("OK {}".format(url))
    9
    10
    async def main(urls):
    11
        tasks = [asyncio.create_task(crawl_page(url)) for url in urls]
    12
        for task in tasks:
    13
            await task
    14
    15
    if __name__ == "__main__":
    16
        asyncio.run(main(["url_1", "url_2", "url_3", "url_4"]))

    output

    1
    crawling url_1
    2
    crawling url_2
    3
    crawling url_3
    4
    crawling url_4
    5
    OK url_1
    6
    OK url_2
    7
    OK url_3
    8
    OK url_4

    tasks是解包列表, 将列表变成函数的参数, 对应的*dict 将字典变成函数的参数

5. 协程运行时
1
import asyncio
2
3
async def worker_1():
4
    print('worker_1 start')
5
    await asyncio.sleep(1)
6
    print('worker_1 done')
7
8
async def worker_2():
9
    print('worker_2 start')
10
    await asyncio.sleep(2)
11
    print('worker_2 done')
12
13
async def main():
14
    print('before await')
15
    await worker_1()
16
    print('awaited worker_1')
17
    await worker_2()
18
    print('awaited worker_2')
19
20
%time asyncio.run(main())
21
22
########## 输出 ##########
23
24
before await
25
worker_1 start
26
worker_1 done
27
awaited worker_1
28
worker_2 start
29
worker_2 done
30
awaited worker_2
31
Wall time: 3 s
1
import asyncio
2
3
async def worker_1():
4
    print('worker_1 start')
5
    await asyncio.sleep(1)
6
    print('worker_1 done')
7
8
async def worker_2():
9
    print('worker_2 start')
10
    await asyncio.sleep(2)
11
    print('worker_2 done')
12
13
async def main():
14
    task1 = asyncio.create_task(worker_1())
15
    task2 = asyncio.create_task(worker_2())
16
    print('before await')
17
    await task1
18
    print('awaited worker_1')
19
    await task2
20
    print('awaited worker_2')
21
22
%time asyncio.run(main())
23
24
########## 输出 ##########
25
26
before await
27
worker_1 start
28
worker_2 start
29
worker_1 done
30
awaited worker_1
31
worker_2 done
32
awaited worker_2
33
Wall time: 2.01 s

第二段代码步骤如下:

  • asyncio.run(main()), 程序进入main()函数, 事件循环开启;
  • task1taks2任务被创建,并进入事件循环等待运行,运行到print, 输出'before await';
  • await task1执行,用户选择从当前的主任务中切出,事件调度器开始调度worker_1;
  • worker_1开始运行, 运行print输出'worker_1 start ', 然后运行到await asyncio.sleep(1), 从当前任务切出,事件调度器开始调度worker_2;
  • worker_2 开始运行, 运行print 输出'worker_2 start',然后运行await asyncio.sleep(2)从当前任务切出;
  • 以上所有事件的运行事件,都应该在1ms10ms之前, 甚至更短,事件调度器从这个时候开始暂停调度
  • 1s后,worker_1的sleep完成,事件调度器将控制权重新传给task_1, 输出'worker_1 done', task1完成任务,从事件循环中退出;
  • await task1 完成, 事件调度器将控制器传给主任务, 输出'awaited worker_1' , 然后在await task2 处继续等待
  • 2s后,worker_2sleep完成,事件调度器将控制权重新传给task_2, 输出'worker_2 done'task_2完成任务, 从事件循环中退出
  • 主任务输出'awaited worker_2', 协程全任务结束, 事件循环结束

错误处理

1
import asyncio
2
3
async def worker_1():
4
    await asyncio.sleep(1)
5
    return 1
6
7
async def worker_2():
8
    await asyncio.sleep(2)
9
    return 2 / 0
10
11
async def worker_3():
12
    await asyncio.sleep(3)
13
    return 3
14
15
async def main():
16
    task_1 = asyncio.create_task(worker_1())
17
    task_2 = asyncio.create_task(worker_2())
18
    task_3 = asyncio.create_task(worker_3())
19
20
    await asyncio.sleep(2)
21
    task_3.cancel()
22
23
    res = await asyncio.gather(task_1, task_2, task_3, return_exceptions=True)
24
    print(res)
25
26
%time asyncio.run(main())
27
28
########## 输出 ##########
29
30
[1, ZeroDivisionError('division by zero'), CancelledError()]
31
Wall time: 2 s