2021/10/25

asyncio in python3

asyncio 是 python 3.5+ 用來寫 coroutine 的 library,並提供 async, await syntax sugar 語法

coroutine

subroutine 是從某一個進入點到另一個離開的點。coroutine 則是有 entered, exited 及多個 resumed points。可用 async/await 語法實作

如要判斷函數是否為 coroutine

asyncio.iscoroutinefunction(func)

如要判斷物件是否為 coroutine

asyncio.iscoroutine(obj)

sample

python 3.5+ 可使用 asyncio library 以及 await

python 3.7+ 可使用 asyncio.run()

import asyncio
import time

now = lambda: time.time()

async def dosomething(num):
    print('第 {} 任務,第一步'.format(num))
    await asyncio.sleep(2)
    print('第 {} 任務,第二步'.format(num))

if __name__ == "__main__":
    start = now()
    tasks = [dosomething(i) for i in range(5)]
    asyncio.run(asyncio.wait(tasks))
    print('TIME: ', now() - start)

執行結果

第 0 任務,第一步
第 1 任務,第一步
第 4 任務,第一步
第 3 任務,第一步
第 2 任務,第一步
第 0 任務,第二步
第 1 任務,第二步
第 4 任務,第二步
第 3 任務,第二步
第 2 任務,第二步
TIME:  2.0054779052734375

async await

async 用來宣告 funcition 有非同步執行的功能

  • async def 的函數裡面,不能跟 yield, yield from 一起使用

await 是標記 coroutine 可 suspend/resume 的 entry points

  • await 後面必須要是一個 coroutine 物件,或是 awaitable 類型的物件
  • await 的目的是將控制權回傳給 event loop,實現的方法是用 yield

awaitable 特性的物件有三種

  1. coroutines

    一個 async def 函數就是 coroutine

  2. tasks

    tasks 是可被調度 coroutines 的物件,可利用 asyncio.create_task() 包裝 coroutines

  3. futures

    是一個 asynchronous operation,處理完成回傳的結果

event loop

pythojn 3.5 用 asyncio.geteventloop 產生 event_loop,然後將 coroutine 放到 run_until_complete() 裡面,直到所有 coroutine 完成。

import asyncio

async def hello_world(x):
    print('hello_world x' + str(x))
    await asyncio.sleep(x)

loop = asyncio.get_event_loop()
loop.run_until_complete(hello_world(3))
loop.close()
  • loop.rununtilcomplete(coroutine)
    • 執行 coroutine,完成時,就可以關閉 event loop
  • loop.run_forever()
    • event loop 永遠不會關閉,直到呼叫 loop.stop() 為止

python 3.7+ 有更簡潔寫法,將 loop 封裝,然後用 asyncio.run() 執行就可以了

import asyncio

async def hello_world(x):
    print('hello_world x' + str(x))
    await asyncio.sleep(x)

asyncio.run(hello_world(3))

task

產生 task 的方法有兩種

  • asyncio.ensure_future() 在所有 python 都可以使用
  • asyncio.create_task() 在 python 3.7+ 可使用
import asyncio
import time

async def dosomething(num):
    print('start{}'.format(num))
    await asyncio.sleep(num)
    print('sleep{}'.format(num))

async def main():
    task1 = asyncio.create_task(dosomething(1))
    task2 = asyncio.create_task(dosomething(2))
    task3 = asyncio.create_task(dosomething(3))
    await task1
    await task2
    await task3

if __name__ == '__main__':
    time_start = time.time()
    asyncio.run(main())
    print(time.time() - time_start)

執行結果

start1
start2
start3
sleep1
sleep2
sleep3
3.0059001445770264

同時執行多個 tasks

利用 asyncio.gather() 可同時放入多個 coroutines 或 awaitable 物件進入 event loop

asyncio.gather( *aws, loop=None, return_exceptions=False)

  1. *aws :可傳入多個 awaitable objects
  2. Loop:此參數將會在 Python 3.10 移除
  3. return_exceptions:default 是 False,當發生 exception 時會立即中斷 task,如果設定為 True 則發生錯誤的訊息會與其他成功訊息一起回傳 (最終的 results 結果裡面包含了 ValueError() 結果)
import asyncio
import time

now = lambda: time.time()

async def dosomething(num):
    print('第 {} 任務,第一步'.format(num))
    await asyncio.sleep(2)
    print('第 {} 任務,第二步'.format(num))
    return '第 {} 任務完成'.format(num)

async def raise_error(num):
    raise ValueError
    print('這邊不會執行到')

async def main():
    tasks = [dosomething(i) for i in range(5)]
    tasks1 = [raise_error(i) for i in range(5)]

    results = await asyncio.gather(*tasks, *tasks1, return_exceptions=True)
    print(results)


if __name__ == "__main__":

    start = now()
    asyncio.run(main())
    print('TIME: ', now() - start)

執行結果

第 0 任務,第一步
第 1 任務,第一步
第 2 任務,第一步
第 3 任務,第一步
第 4 任務,第一步
第 0 任務,第二步
第 1 任務,第二步
第 2 任務,第二步
第 3 任務,第二步
第 4 任務,第二步
['第 0 任務完成', '第 1 任務完成', '第 2 任務完成', '第 3 任務完成', '第 4 任務完成', ValueError(), ValueError(), ValueError(), ValueError(), ValueError()]
TIME:  2.0041749477386475

chaining coroutines

可將任務分成三個 coroutines,當第一個完成,會將結果傳入第二個,然後是第三個。最終會透過 asyncio.gather 儲存 coroutine 的結果

import asyncio
import time
import random

now = lambda: time.time()

# 任務一
async def step_one(num, n):
    time_sleep = random.randint(0, 1)
    print('Task {},step one, sleep {}'.format(n, time_sleep))
    await asyncio.sleep(time_sleep)
    print('Task {},step one, wakeup'.format(n))
    num += 1
    return num

# 任務二
async def step_two(num, n):
    time_sleep = random.randint(0, n)
    print('Task {},step two, sleep {}'.format(n, time_sleep))

    await asyncio.sleep(time_sleep)
    print('Task {},step two, wakeup'.format(n))
    num += 2
    return num

# 任務三
async def step_three(num, n):
    time_sleep = random.randint(0, n)
    print('Task {},step three, sleep {}'.format(n, time_sleep))
    await asyncio.sleep(time_sleep)
    print('Task {},step three, wakeup'.format(n))
    num += 3
    return [n, num]

# 任務調度
async def asyncio_chain(n):
    s1 = await step_one(n, n)
    s2 = await step_two(s1, n)
    s3 = await step_three(s2, n)
    return s3

# 收集任務結果
async def main():
    tasks = [asyncio_chain(n) for n in range(3)]
    result = await asyncio.gather(*tasks)
    print(result)


if __name__ == "__main__":

    start = now()
    asyncio.run(main())
    print('TIME: ', now() - start)

執行結果

Task 0,step one, sleep 0
Task 1,step one, sleep 1
Task 2,step one, sleep 1
Task 0,step one, wakeup
Task 0,step two, sleep 0
Task 0,step two, wakeup
Task 0,step three, sleep 0
Task 0,step three, wakeup
Task 1,step one, wakeup
Task 1,step two, sleep 1
Task 2,step one, wakeup
Task 2,step two, sleep 2
Task 1,step two, wakeup
Task 1,step three, sleep 1
Task 2,step two, wakeup
Task 2,step three, sleep 0
Task 2,step three, wakeup
Task 1,step three, wakeup
[[0, 6], [1, 7], [2, 8]]
TIME:  3.0117080211639404

coroutine with queue

將 tasks 分為 producer 與 consumer 兩種角色,但 producer 不直接跟 cosumer 連接,而是透過 queue 傳遞資料。

producers 可任意將多個 task 填入 queue,然後由多個 cosumers 由 queue 取出並執行。

python 標準函式庫有四種 queues

  1. queue.Queue
  2. asyncio.Queue
  3. multiprocessing.Queue
  4. collections.deque

asyncio.Queue 使用時要注意,並不是 thread-safe

  • q = asyncio.Queue(maxsize=num_consumers):限制 queue 的大小
  • q.put()
  • q.get()
  • q.task_done():在完成一項工作之後,向queue發送一個信號
  • q.join():等到queue為空,再執行後續操作
  • q.qsize():回傳 queue size
  • q.empty(): 如果 queue 為空,回傳 True,反之 False
  • q.full(): 如果 queue is full,回傳 True,反之 False
import asyncio

# consumer
async def consumer(q, n):
    print('consumer {}: starting...'.format(n))
    while True:
        print('consumer {}: prepare to get'.format(n))
        item = await q.get()
        print('consumer {}: get {}'.format(n, item))
        await asyncio.sleep(0.15)
        # 完成一項工作,發送訊號給 queue,表示可再容納一個新的 task
        q.task_done()
    print('consumer {}: ending'.format(n))

# producer A
async def producer_a(q, num_products):
    print('producer A: starting producer A...')
    for i in range(num_products):
        await q.put('A ' + str(i))
        print('producer A: create A {}'.format(i))
        await asyncio.sleep(0.01)

# producer B
async def producer_b(q, num_products):
    print('producer B: starting producer B...')
    for i in range(num_products):
        await q.put('B ' + str(i))
        print('producer B: create B {}'.format(i))
        await asyncio.sleep(0.02)

# 任務調度
async def main(num_consumers, num_products):
    # 以 consumers 的數量為 queue size 上限,當 queue 裡面的 items 數量到達上限,就無法再放入 item
    q = asyncio.Queue(maxsize=num_consumers)

    # 產生兩個 producer tasks,每一個 producer 都會產生 num_products 個 products,放入 q
    prod_a = [asyncio.create_task(producer_a(q, num_products))]
    prod_b = [asyncio.create_task(producer_b(q, num_products))]

    # 產生
    consumers = [
        asyncio.create_task(consumer(q, i)) for i in range(num_consumers)
    ]


    # 測試 coroutine 的 cancel
    # await asyncio.sleep(0.2)
    # for c in consumers:
    #     c.cancel()

    await asyncio.gather(*prod_a, *prod_b)
    print('producers All: ending')

    # 表示 queue 裡面已經沒有 product tasks
    await q.join()
    print('consumers All: ending')

    # 以 cancel 確認將所有 consumers 取消,否則 consumer 會一直等待 queue
    for c in consumers:
        c.cancel()

# main(消費者數量, products 數量)
asyncio.run(main(3, 4))

執行結果

producer A: starting producer A...
producer A: create A 0
producer B: starting producer B...
producer B: create B 0
consumer 0: starting...
consumer 0: prepare to get
consumer 0: get A 0
consumer 1: starting...
consumer 1: prepare to get
consumer 1: get B 0
consumer 2: starting...
consumer 2: prepare to get
producer A: create A 1
consumer 2: get A 1
producer B: create B 1
producer A: create A 2
producer A: create A 3
consumer 0: prepare to get
consumer 0: get B 1
consumer 1: prepare to get
consumer 1: get A 2
producer B: create B 2
consumer 2: prepare to get
consumer 2: get A 3
producer B: create B 3
producers All: ending
consumer 0: prepare to get
consumer 0: get B 2
consumer 1: prepare to get
consumer 1: get B 3
consumer 2: prepare to get
consumer 0: prepare to get
consumer 1: prepare to get
consumers All: ending

futures

future 是一個特殊的 awaitable 物件,代表某個 asynchronous operation 的 eventual result

當有一個 Future 物件在 awaited 狀態,表示 coroutine 會一直 wait,直到 Future is resolved

Future 物件用在 async/wait 的 callback-based code

通常在 application level 不需要自行產生 Future object

async def main():
    await function_that_returns_a_future_object()

    # this is also valid:
    await asyncio.gather(
        function_that_returns_a_future_object(),
        some_python_coroutine()
    )

這是自行產生 future 物件的 sample

def mark_done(future, result):
    print('setting future result to {!r}'.format(result))
    future.set_result(result)

async def main():
    loop = asyncio.get_event_loop()
    future = loop.create_future()
    print('scheduling mark_done')
    # 在 event loop 呼叫 mark_done,mark_done 完成時,
    # 會透過 future 物件的 set_result 將結果傳回來
    loop.call_soon(mark_done, future, 'the result')
    
    print('suspending the coroutine')
    result = await future
    print('awaited result: {!r}'.format(result))
    print('future result: {!r}'.format(future.result()))
    return result

if __name__ == '__main__':
    print('entering the event loop')
    result = asyncio.run(main())
    print('returned result: {!r}'.format(result))

References

[Python爬蟲教學]整合asyncio與aiohttp打造Python非同步網頁爬蟲

Python Asyncio 協程(二)

【Python教學】淺談 Coroutine 協程使用方法

asyncio由簡入繁

python的asyncio模組(一):異步執行的好處

沒有留言:

張貼留言