天天看點

async 異步協程進階

協程通過 async/await 文法進行聲明,是編寫異步應用的推薦方式

例如新定義一個協程(coroutine object):

async def foo():
    return 42      

首先先來介紹下:

認識aysn和asyncio都有哪些函數方法:

建立一個future 對象:

task = asyncio.create_task(foo())
或者使用      
task=asyncio.ensure_future(foo())
那麼如何判斷建立的task到底是不是future 對象呢?      
async def exetask():
    # task = asyncio.create_task(foo())
    task = asyncio.ensure_future(foo())
    if isinstance(task,asyncio.Future):
        print("yes")
    else:
        print("no")
    s=await task      
asyncio.run(exetask())
結果如下:      

  yes

答案是肯定的。

2.如何運作一個協程:      

要真正運作一個協程,asyncio 提供了三種主要機制:

第一種:

  • ​​asyncio.run()​​ 函數用來運作最高層級的入口點 "main()" 函數 (參見上面的示例。)

第二種:

通過loop對象實作:

loop=asyncio.get_event_loop()      
async  def  jobs():
      i=10
      while i>0:
          # time.sleep(0.5)
          i=i-1
      return 0      
tasks=[asyncio.ensure_future(jobs(k)) for k in range(1,4)]      
res=loop.run_until_complete(asyncio.gather(*tasks))
print(res)
loop.close()      

3.并發

第1種并發運作:
當一個協程通過 asyncio.create_task() 等函數被打包為一個 任務,asyncio.create_task(coro)将 coro 協程 打包為一個 Task 排入日程準備執行,
傳回 Task 對象。此函數 在 Python 3.7 中被加入。在 Python 3.7 之前,of course你也可以改用低層級的 asyncio.ensure_future() 函數, This works in all Python versions but is less readable      
#### ordinal job async
async  def  jobs():
      i=10
      while i>0:
          # time.sleep(0.5)
          i=i-1
      return 0
async def  get_status():
     r=await jobs()
     return r      
loop=event=asyncio.get_event_loop()
tasks = [asyncio.create_task(get_status()) for k in range(1,4)]
for task in tasks:
    r=loop.run_until_complete(asyncio.wait_for(task,timeout=10))
    print(r)      

  這裡我采用了比較低級的loop事件對象來調用run_until_complete() 方法來實作:

事實上開發者一般更喜歡采用進階用法asyncio.wait()或者asyncio.wait_for()來實作這寫異步任務調用:

在進行下面介紹之前我想你應該先了解:​

​asyncio.​

​​

​wait_for​

​(aw, timeout, *, loop=None)

  等待 aw ​​可等待對象​​ 完成,指定 timeout 秒數後逾時。

  如果 aw 是一個協程,它将自動作為任務加入日程。

  timeout 可以為 ​

​None​

​,也可以為 float 或 int 型數值表示的等待秒數。如果 timeout 為 ​

​None​

​,則等待直到完成。

  如果發生逾時,任務将取消并引發 ​​asyncio.TimeoutError​​.

  要避免任務 ​​取消​​​,可以加上 ​​shield()​​。

  函數将等待直到目标對象确實被取消,是以總等待時間可能超過 timeout 指定的秒數。

  如果等待被取消,則 aw 指定的對象也會被取消。

  loop 參數已棄用,計劃在 Python 3.10 中移除。

​asyncio.​

​​

​wait​

​(aws, *, loop=None, timeout=None, return_when=ALL_COMPLETED)

并發運作 aws 指定的 ​​可等待對象​​ 并阻塞線程直到滿足 return_when 指定的條件。

如果 aws 中的某個可等待對象為協程,它将自動作為任務加入日程。直接向 ​

​wait()​

​​ 傳入協程對象已棄用,因為這會導緻 ​​令人迷惑的行為​​。

傳回兩個 Task/Future 集合: ​

​(done, pending)​

​。

用法:

done, pending = await asyncio.wait(aws)      

loop 參數已棄用,計劃在 Python 3.10 中移除。

如指定 timeout (float 或 int 類型) 則它将被用于控制傳回之前等待的最長秒數。

請注意此函數不會引發 ​​asyncio.TimeoutError​​。當逾時發生時,未完成的 Future 或 Task 将在指定秒數後被傳回。

return_when 指定此函數應在何時傳回。它必須為以下常數之一:

常數 描述

​FIRST_COMPLETED​

函數将在任意可等待對象結束或取消時傳回。

​FIRST_EXCEPTION​

函數将在任意可等待對象因引發異常而結束時傳回。當沒有引發任何異常時它就相當于 ​

​ALL_COMPLETED​

​。

​ALL_COMPLETED​

函數将在所有可等待對象結束或取消時傳回。

與 ​​wait_for()​​​ 不同,​

​wait()​

​ 在逾時發生時不會取消可等待對象。

如何驗證wait 不取消,wait_for 取消aw對象呢:

我們來看個例子:

先看wait_for:

async def foo(k=0):
    await asyncio.sleep(30)
    return k

async def exetask():
    task=asyncio.create_task(foo(k=1))
    try:
      await asyncio.wait_for(task,timeout=1)
      print(task.cancelled())
    #3.7 改為當 aw 因逾時被取消,wait_for 會等待 aw 被取消,3.7之前直接報異常,
    except asyncio.TimeoutError:
        print("timeout ")
        # task.cancel()
        print(task.cancelled())
asyncio.run(exetask())
結果:      

C:\Python37\python.exe D:/workspace/AutoFate/src/commonutils/asyncutils.py

timeout

True

再看wait,這裡注意由于waitexpect a list of futures, not Task,我換種寫法:

async def foo(k=0):
    await asyncio.sleep(30)
    return k


async def exetask():
    task = asyncio.create_task(foo(k=1))
    try:
        #請注意wait函數不會引發 asyncio.TimeoutError
        await asyncio.wait([task], timeout=1)
        ## 3.7 改為當 aw 因逾時被取消,wait_for 會等待 aw 被取消,3.7之前直接報異常,
        # await asyncio.wait_for(task,timeout=1)

    except asyncio.TimeoutError:
        print("timeout ")
    print(task.cancelled())


asyncio.run(exetask())      
結果:      

C:\Python37\python.exe D:/workspace/AutoFate/src/commonutils/asyncutils.py

False

Process finished with exit code 0

 這裡完美的展現了阻塞的魅力!!!!!!!

第2種并發運作:
awaitable asyncio.gather(*aws, loop=None, return_exceptions=False)      

并發 運作 aws 序列中的 ​​可等待對象​​。

下面來看個簡單的例子:

async def count(k):
    print(f"start job{k} {time.asctime()} ")
    await asyncio.sleep(0.6)
    print(f"end job{k} {time.asctime()}")
    return k
async def mayns():
    r=await asyncio.gather(count(1),count(2))
    return r
def test():
    import time
    st=time.perf_counter()
    results=asyncio.run(mayns())
    elapsed=time.perf_counter()-st
    print(f"result return :{results}  execute in {elapsed:0.2} seconds.")      

運作結果:

start job1 Fri Dec 13 23:00:38 2019

start job2 Fri Dec 13 23:00:38 2019

end job1 Fri Dec 13 23:00:39 2019

end job2 Fri Dec 13 23:00:39 2019

result return :[1, 2] execute in 0.6 seconds.

gather直接傳回的是調用的task的所有result清單

當然你也可以這樣搜集任務:

最後介紹一下如何實作異步http請求:      
# request 庫同步阻塞,aiohttp才是異步的請求,pip install aiohttp
from aiohttp import ClientSession as session
async def  test2(k):
    r=await other_test(k)
    return r
async def  other_test(k):
    print("start await job %s,%s"%(k,time.asctime()))
    urls = ["http://127.0.0.1:8000/index", "http://127.0.0.1:8000/stuTable/"]
    async with session() as request:
        async with request.get(urls[0]) as rq:
            r=await  rq.read()
            res=r.decode("utf-8")
    print("end await job %s,%s"%(k,time.asctime()))
    return res

def test_aiohttp():
    klist=[100,50,88]
    loop=asyncio.get_event_loop()
    tasks=[asyncio.ensure_future(test2(k)) for k in klist]
    # loop.run_until_complete(asyncio.wait(tasks))
    #可通過asyncio.gather(*tasks)将響應全部收集起來
    res=loop.run_until_complete(asyncio.gather(*tasks))
    print(res)
    loop.close()      

  結果:

C:\Python37\python.exe D:/workspace/AutoFate/src/commonutils/asyncutils.py

start await job 100,Fri Dec 13 23:54:31 2019

start await job 50,Fri Dec 13 23:54:31 2019

start await job 88,Fri Dec 13 23:54:31 2019

end await job 88,Fri Dec 13 23:54:31 2019

end await job 50,Fri Dec 13 23:54:31 2019

end await job 100,Fri Dec 13 23:54:31 2019

['{"user": "test001", "msg": "this is test index view "}', '{"user": "test001", "msg": "this is test index view "}', '{"user": "test001", "msg": "this is test index view "}']

Process finished with exit code 0

服務是自己用djangO起的:      
from django.shortcuts import render
# Create your views here.
from django.http import HttpResponse
import json
from . models import Student,Grade
from django.db import models
def index(request):
    data={"user":"test001","msg":"this is test index view "}
    js=json.dumps(data)
    return HttpResponse(js)



def stuTable(request):
    import time
    time.sleep(3)
    # stu_object=Student.objects.all()
    # return  render(request,template_name="index.html",context={"student_object":stu_object})
    return HttpResponse(json.dumps({"A":888,"NN":899}))