天天看點

python協程實時輸出_一篇很有營養的幹貨 | 看懂Python中的異步協程,這篇就夠了!...

原标題:一篇很有營養的幹貨 | 看懂Python中的異步協程,這篇就夠了!

源 /進擊的Coder文 /崔慶才

01.

什麼是協程

協程,英文叫做 Coroutine,又稱微線程,纖程,協程是一種使用者态的輕量級線程。

協程擁有自己的寄存器上下文和棧。協程排程切換時,将寄存器上下文和棧儲存到其他地方,在切回來的時候,恢複先前儲存的寄存器上下文和棧。

我們可以使用協程來實作異步操作,比如在網絡爬蟲場景下,我們發出一個請求之後,需要等待一定的時間才能得到響應,但其實在這個等待過程中,程式可以幹許多其他的事情,等到響應得到之後才切換回來繼續處理,這樣可以充分利用 CPU 和其他資源,這就是異步協程的優勢。

02.定義協程

首先我們來定義一個協程,體驗一下它和普通程序在實作上的不同之處,代碼如下:

importasyncio

asyncdefexecute(x):

print('Number:', x)

coroutine = execute(1)

print('Coroutine:', coroutine)

print('After calling execute')

loop = asyncio.get_event_loop()

loop.run_until_complete(coroutine)

print('After calling loop')

運作結果:

Coroutine:

After calling execute

Number: 1

After calling loop

1).先import Python裡面大名鼎鼎的asyncio這個異步包,然後才可以使用 async 和 await。我們接着 async 定義了一個 execute() 方法,方法接收一個數字參數,方法執行之後會列印這個數字。

2).随後我們直接調用了這個方法,然而這個方法并沒有執行,而是傳回了一個 coroutine 協程對象。

3).随後我們使用 get_event_loop() 方法建立了一個事件循環 loop,并調用了 loop 對象的 run_until_complete() 方法将協程注冊到事件循環 loop 中,然後啟動。最後我們才看到了 execute() 方法列印了輸出結果。

4).可見,async 定義的方法就會變成一個無法直接執行的 coroutine 對象,必須将其注冊到事件循環中才可以執行。

5).上文我們還提到了 task,它是對 coroutine 對象的進一步封裝,它裡面相比 coroutine 對象多了運作狀态,比如 running、finished 等,我們可以用這些狀态來擷取協程對象的執行情況。

在上面的例子中,當我們将 coroutine 對象傳遞給 run_until_complete() 方法的時候,實際上它進行了一個操作就是将 coroutine 封裝成了 task 對象,我們也可以顯式地進行聲明,如下所示:

importasyncio

asyncdefexecute(x):

print('Number:', x)

returnx

coroutine = execute(1)

print('Coroutine:', coroutine)

print('After calling execute')

loop = asyncio.get_event_loop()

task = loop.create_task(coroutine)

print('Task:', task)

loop.run_until_complete(task)

print('Task:', task)

print('After calling loop')

運作結果:

Coroutine:

After calling execute

Task: >

Number: 1

Task: result=1>

After calling loop

這裡我們定義了 loop 對象之後,接着調用了它的 create_task() 方法将 coroutine 對象轉化為了 task 對象

随後我們列印輸出一下,發現它是 pending 狀态。接着我們将 task 對象添加到事件循環中得到執行,

随後我們再列印輸出一下 task 對象,發現它的狀态就變成了 finished

同時還可以看到其 result 變成了 1,也就是我們定義的 execute() 方法的傳回結果。

另外定義 task 對象還有一種方式,就是直接通過 asyncio 的 ensure_future() 方法,傳回結果也是 task 對象,這樣的話我們就可以不借助于 loop 來定義,即使我們還沒有聲明 loop 也可以提前定義好 task 對象,寫法如下:

importasyncio

asyncdefexecute(x):

print('Number:', x)

returnx

coroutine = execute(1)

print('Coroutine:', coroutine)

print('After calling execute')

task = asyncio.ensure_future(coroutine)

print('Task:', task)

loop = asyncio.get_event_loop()

loop.run_until_complete(task)

print('Task:', task)

print('After calling loop')

運作結果:

Coroutine:

After calling execute

Task: >

Number: 1

Task: result=1>

After calling loop

發現其效果都是一樣的。

03.綁定回調

另外我們也可以為某個 task 綁定一個回調方法,來看下面的例子:

importasyncio

importrequests

asyncdefrequest():

url = 'https://www.baidu.com'

status = requests.get(url)

returnstatus

defcallback(task):

print('Status:', task.result())

coroutine = request()

task = asyncio.ensure_future(coroutine)

task.add_done_callback(callback)

print('Task:', task)

loop = asyncio.get_event_loop()

loop.run_until_complete(task)

print('Task:', task)

在這裡我們定義了一個 request() 方法,請求了百度,傳回狀态碼,但是這個方法裡面我們沒有任何 print() 語句。

随後我們定義了一個 callback() 方法,這個方法接收一個參數,是 task 對象,然後調用 print() 方法列印了 task 對象的結果。

這樣我們就定義好了一個 coroutine 對象和一個回調方法,我們現在希望的效果是,當 coroutine 對象執行完畢之後,就去執行聲明的 callback() 方法。

那麼它們二者怎樣關聯起來呢?很簡單,隻需要調用 add_done_callback() 方法即可,我們将 callback() 方法傳遞給了封裝好的 task 對象,這樣當 task 執行完畢之後就可以調用 callback() 方法了。

同時 task 對象還會作為參數傳遞給 callback() 方法,調用 task 對象的 result() 方法就可以擷取傳回結果了。

運作結果:

Task: cb=[callback() at demo.py:11]>

Status:

Task: result=>

實際上不用回調方法,直接在 task 運作完畢之後也可以直接調用 result() 方法擷取結果,如下所示:

importasyncio

importrequests

asyncdefrequest():

url = 'https://www.baidu.com'

status = requests.get(url)

returnstatus

coroutine = request()

task = asyncio.ensure_future(coroutine)

print('Task:', task)

loop = asyncio.get_event_loop()

loop.run_until_complete(task)

print('Task:', task)

print('Task Result:', task.result())

運作結果是一樣的:

Task: >

Task: result=>

Task Result:

04.多任務協程

上面的例子我們隻執行了一次請求,如果我們想執行多次請求應該怎麼辦呢?我們可以定義一個 task 清單,然後使用 asyncio 的 wait() 方法即可執行,看下面的例子:

importasyncio

importrequests

asyncdefrequest():

url = 'https://www.baidu.com'

status = requests.get(url)

returnstatus

tasks = [asyncio.ensure_future(request()) for_ inrange(5)]

print('Tasks:', tasks)

loop = asyncio.get_event_loop()

loop.run_until_complete(asyncio.wait(tasks))

fortask intasks:

print('Task Result:', task.result())

這裡我們使用一個 for 循環建立了五個 task,組成了一個清單,然後把這個清單首先傳遞給了 asyncio 的 wait() 方法,然後再将其注冊到時間循環中,就可以發起五個任務了。最後我們再将任務的運作結果輸出出來,運作結果如下:

Tasks: [>, >, >, >, >]

Task Result:

Task Result:

Task Result:

Task Result:

Task Result:

可以看到五個任務被順次執行了,并得到了運作結果。

05.一個實戰爬蟲例子,協程實作

前面說了這麼一通,又是 async,又是 coroutine,又是 task,又是 callback,但似乎并沒有看出協程的優勢啊?反而寫法上更加奇怪和麻煩了!

為了表現出協程的優勢,最好的方法就是模拟一個需要等待一定時間才可以擷取傳回結果的網頁,最好的方式是自己在本地模拟一個慢速伺服器,這裡我們選用 Flask,然後編寫伺服器代碼如下:

fromflask importFlask

importtime

app = Flask(__name__)

@app.route('/')

defindex():

time.sleep(3)

return'Hello!'

if__name__ == '__main__':

app.run(threaded=True)

這裡我們定義了一個 Flask 服務,主入口是 index() 方法,方法裡面先調用了 sleep() 方法休眠 3 秒,然後接着再傳回結果,也就是說,每次請求這個接口至少要耗時 3 秒,這樣我們就模拟了一個慢速的服務接口。

注意這裡服務啟動的時候,run() 方法加了一個參數 threaded,這表明 Flask 啟動了多線程模式,不然預設是隻有一個線程的。

如果不開啟多線程模式,同一時刻遇到多個請求的時候,隻能順次處理,這樣即使我們使用協程異步請求了這個服務,也隻能一個一個排隊等待,瓶頸就會出現在服務端。是以,多線程模式是有必要打開的。

啟動之後,Flask 應該預設會在 127.0.0.1:5000 上運作,運作之後控制台輸出結果如下:

* Running on http://127.0.0.1:5000/ (Press CTRL+C to quit)

接下來我們再重新使用上面的方法請求一遍:

importasyncio

importrequests

importtime

start = time.time()

asyncdefrequest():

url = 'http://127.0.0.1:5000'

print('Waiting for', url)

response = requests.get(url)

print('Get response from', url, 'Result:', response.text)

tasks = [asyncio.ensure_future(request()) for_ inrange(5)]

loop = asyncio.get_event_loop()

loop.run_until_complete(asyncio.wait(tasks))

end = time.time()

print('Cost time:', end - start)

在這裡我們還是建立了五個 task,然後将 task 清單傳給 wait() 方法并注冊到時間循環中執行。

運作結果如下:

Waiting forhttp://127.0.0.1:5000

Get response fromhttp://127.0.0.1:5000Result: Hello!

Waiting forhttp://127.0.0.1:5000

Get response fromhttp://127.0.0.1:5000Result: Hello!

Waiting forhttp://127.0.0.1:5000

Get response fromhttp://127.0.0.1:5000Result: Hello!

Waiting forhttp://127.0.0.1:5000

Get response fromhttp://127.0.0.1:5000Result: Hello!

Waiting forhttp://127.0.0.1:5000

Get response fromhttp://127.0.0.1:5000Result: Hello!

Cost time: 15.049368143081665

可以發現和正常的請求并沒有什麼兩樣,依然還是順次執行的,耗時 15 秒,平均一個請求耗時 3 秒,說好的異步處理呢?

其實,要實作異步處理,我們得先要有挂起的操作,當一個任務需要等待 IO 結果的時候,可以挂起目前任務,轉而去執行其他任務,這樣我們才能充分利用好資源,上面方法都是一本正經的串行走下來,連個挂起都沒有,怎麼可能實作異步?想太多了。

要實作異步,接下來我們再了解一下 await 的用法,使用 await 可以将耗時等待的操作挂起,讓出控制權。當協程執行的時候遇到 await,時間循環就會将本協程挂起,轉而去執行别的協程,直到其他的協程挂起或執行完畢。

是以,我們可能會将代碼中的 request() 方法改成如下的樣子:

asyncdefrequest():

url = 'http://127.0.0.1:5000'

print('Waiting for', url)

response = awaitrequests.get(url)

print('Get response from', url, 'Result:', response.text)

僅僅是在 requests 前面加了一個 await,然而執行以下代碼,會得到如下報錯:

Waiting forhttp://127.0.0.1:5000

Waiting forhttp://127.0.0.1:5000

Waiting forhttp://127.0.0.1:5000

Waiting forhttp://127.0.0.1:5000

Waiting forhttp://127.0.0.1:5000

Cost time: 15.048935890197754

Task exception was never retrieved

future: exception=TypeError("object Response can't be used in 'await' expression",)>

Traceback (most recent call last):

File "demo.py", line 10, inrequest

status = awaitrequests.get(url)

TypeError: object Response can't be used in 'await' expression

這次它遇到 await 方法确實挂起了,也等待了,但是最後卻報了這麼個錯,這個錯誤的意思是 requests 傳回的 Response 對象不能和 await 一起使用,為什麼呢?因為根據官方文檔說明,await 後面的對象必須是如下格式之一:

A native coroutine object returned from a native coroutine function,一個原生 coroutine 對象。

A generator-based coroutine object returned from a function decorated with types.coroutine(),一個由 types.coroutine() 修飾的生成器,這個生成器可以傳回 coroutine 對象。

An object with an await__ method returning an iterator,一個包含 __await 方法的對象傳回的一個疊代器。

reqeusts 傳回的 Response 不符合上面任一條件,是以就會報上面的錯誤了。那麼有的小夥伴就發現了,既然 await 後面可以跟一個 coroutine 對象,那麼我用 async 把請求的方法改成 coroutine 對象不就可以了嗎?是以就改寫成如下的樣子:

importasyncio

importrequests

importtime

start = time.time()

asyncdefget(url):

returnrequests.get(url)

asyncdefrequest():

url = 'http://127.0.0.1:5000'

print('Waiting for', url)

response = awaitget(url)

print('Get response from', url, 'Result:', response.text)

tasks = [asyncio.ensure_future(request()) for_ inrange(5)]

loop = asyncio.get_event_loop()

loop.run_until_complete(asyncio.wait(tasks))

end = time.time()

print('Cost time:', end - start)

這裡我們将請求頁面的方法獨立出來,并用 async 修飾,這樣就得到了一個 coroutine 對象,我們運作一下看看:

Waiting forhttp://127.0.0.1:5000

Get response fromhttp://127.0.0.1:5000Result: Hello!

Waiting forhttp://127.0.0.1:5000

Get response fromhttp://127.0.0.1:5000Result: Hello!

Waiting forhttp://127.0.0.1:5000

Get response fromhttp://127.0.0.1:5000Result: Hello!

Waiting forhttp://127.0.0.1:5000

Get response fromhttp://127.0.0.1:5000Result: Hello!

Waiting forhttp://127.0.0.1:5000

Get response fromhttp://127.0.0.1:5000Result: Hello!

Cost time: 15.134317874908447

還是不行,它還不是異步執行,也就是說我們僅僅将涉及 IO 操作的代碼封裝到 async 修飾的方法裡面是不可行的!我們必須要使用支援異步操作的請求方式才可以實作真正的異步,是以這裡就需要 aiohttp 派上用場了。

06.使用 aiohttp

aiohttp 是一個支援異步請求的庫,利用它和 asyncio 配合我們可以非常友善地實作異步請求操作。我們将 aiohttp 用上來,将代碼改成如下樣子:

importasyncio

importaiohttp

importtime

start = time.time()

asyncdefget(url):

session = aiohttp.ClientSession()

response = awaitsession.get(url)

result = awaitresponse.text()

session.close()

returnresult

asyncdefrequest():

url = 'http://127.0.0.1:5000'

print('Waiting for', url)

result = awaitget(url)

print('Get response from', url, 'Result:', result)

tasks = [asyncio.ensure_future(request()) for_ inrange(5)]

loop = asyncio.get_event_loop()

loop.run_until_complete(asyncio.wait(tasks))

end = time.time()

print('Cost time:', end - start)

在這裡我們将請求庫由 requests 改成了 aiohttp,通過 aiohttp 的 ClientSession 類的 get() 方法進行請求,結果如下:

Waiting forhttp://127.0.0.1:5000

Waiting forhttp://127.0.0.1:5000

Waiting forhttp://127.0.0.1:5000

Waiting forhttp://127.0.0.1:5000

Waiting forhttp://127.0.0.1:5000

Get response fromhttp://127.0.0.1:5000Result: Hello!

Get response fromhttp://127.0.0.1:5000Result: Hello!

Get response fromhttp://127.0.0.1:5000Result: Hello!

Get response fromhttp://127.0.0.1:5000Result: Hello!

Get response fromhttp://127.0.0.1:5000Result: Hello!

Cost time: 3.0199508666992188

成功了!我們發現這次請求的耗時由 15 秒變成了 3 秒,耗時直接變成了原來的 1/5。

這篇文章是慶才的大作,雖然很長,幹貨很多,需要慢慢消化,裡面的關鍵字非常多,大家動手敲敲代碼練習一下,方能體會Python協程的巧妙之處!

-END-

轉載聲明:本文選自「進擊的Coder」,搜尋「FightingCoder」即可關注

關注“Python學習聯盟”

點選菜單欄擷取更多幹貨!傳回搜狐,檢視更多

責任編輯: