From :廖雪峰 異步IO :https://www.liaoxuefeng.com/wiki/1016959663602400/1017959540289152
Python Async/Await入門指南 :https://zhuanlan.zhihu.com/p/27258289
Python 生成器 和 yield 關鍵字:https://blog.csdn.net/freeking101/article/details/51126293
協程與任務 官網文檔:https://docs.python.org/zh-cn/3/library/asyncio-task.html
Python中異步協程的使用方法介紹:https://blog.csdn.net/freeking101/article/details/88119858
python 協程詳解及I/O多路複用,I/O異步:https://blog.csdn.net/u014028063/article/details/81408395
Python協程深入了解:https://www.cnblogs.com/zhaof/p/7631851.html
asyncio 進階:Python黑魔法 --- 異步IO( asyncio) 協程:https://www.cnblogs.com/dhcn/p/9033628.html
談談Python協程技術的演進:https://www.freebuf.com/company-information/153421.html
最後推薦一下《流暢的Python》,這本書中 第16章 協程的部分介紹的非常詳細
《流暢的Python》pdf 下載下傳位址:https://download.csdn.net/download/freeking101/10993120
gevent 是 python 的一個并發架構,以微線程 greenlet 為核心,使用了 epoll 事件監聽機制以及諸多其他優化而變得高效。
異步 IO
在 IO 程式設計( 廖雪峰 Python IO 程式設計 :https://www.liaoxuefeng.com/wiki/1016959663602400/1017606916795776) 一節中,我們已經知道,CPU的速度遠遠快于磁盤、網絡等IO。在一個線程中,CPU執行代碼的速度極快,然而,一旦遇到IO操作,如讀寫檔案、發送網絡資料時,就需要等待IO操作完成,才能繼續進行下一步操作。這種情況稱為同步IO。
在IO操作的過程中,目前線程被挂起,而其他需要CPU執行的代碼就無法被目前線程執行了。
因為一個 IO 操作就阻塞了目前線程,導緻其他代碼無法執行,是以我們必須使用多線程或者多程序來并發執行代碼,為多個使用者服務。每個使用者都會配置設定一個線程,如果遇到IO導緻線程被挂起,其他使用者的線程不受影響。
多線程和多程序的模型雖然解決了并發問題,但是系統不能無上限地增加線程。由于系統切換線程的開銷也很大,是以,一旦線程數量過多,CPU的時間就花線上程切換上了,真正運作代碼的時間就少了,結果導緻性能嚴重下降。
由于我們要解決的問題是CPU高速執行能力和IO裝置的龜速嚴重不比對,多線程和多程序隻是解決這一問題的一種方法。
另一種解決IO問題的方法是異步IO。當代碼需要執行一個耗時的IO操作時,它隻發出IO指令,并不等待IO結果,然後就去執行其他代碼了。一段時間後,當IO傳回結果時,再通知CPU進行處理。
消息模型 其實早在應用在桌面應用程式中了。一個 GUI 程式的主線程就負責不停地讀取消息并處理消息。所有的鍵盤、滑鼠等消息都被發送到GUI程式的消息隊列中,然後由GUI程式的主線程處理。
由于GUI 線程處理鍵盤、滑鼠等消息的速度非常快,是以使用者感覺不到延遲。某些時候,GUI線程在一個消息處理的過程中遇到問題導緻一次消息處理時間過長,此時,使用者會感覺到整個GUI程式停止響應了,敲鍵盤、點滑鼠都沒有反應。這種情況說明在消息模型中,處理一個消息必須非常迅速,否則,主線程将無法及時處理消息隊列中的其他消息,導緻程式看上去停止響應。
消息模型 是 如何解決 同步IO 必須等待IO操作這一問題的呢 ?
在消息處理過程中,當遇到 IO 操作時,代碼隻負責發出IO請求,不等待IO結果,然後直接結束本輪消息處理,進入下一輪消息處理過程。當IO操作完成後,将收到一條“IO完成”的消息,處理該消息時就可以直接擷取IO操作結果。
在 “發出IO請求” 到收到 “IO完成” 的這段時間裡,同步IO模型下,主線程隻能挂起,但異步IO模型下,主線程并沒有休息,而是在消息循環中繼續處理其他消息。這樣,在異步IO模型下,一個線程就可以同時處理多個IO請求,并且沒有切換線程的操作。對于大多數IO密集型的應用程式,使用異步IO将大大提升系統的多任務處理能力。
協程 (Coroutines)
在學習異步IO模型前,我們先來了解協程,協程 又稱 微線程,纖程,英文名 Coroutine。
子程式( 又叫 函數 ) 和 協程
- 子程式 在 所有語言中都是層級調用。比如: A 調用 B,B 在執行過程中又調用了 C,C 執行完畢傳回,B 執行完畢傳回,最後是 A 執行完畢。是以 子程式 即 函數 的調用是通過棧實作的,一個線程就是執行一個子程式。子程式調用總是一個入口,一次傳回,調用順序是明确的。
- 協程的調用 和 子程式 不同。協程 看上去也是 子程式,但執行過程中,在子程式内部可中斷,然後轉而執行别的子程式,在适當的時候再傳回來接着執行。
注意,在一個 子程式中中斷,去執行其他子程式,不是函數調用,有點類似CPU的中斷。比如:子程式 A 和 B :
def A():
print('1')
print('2')
print('3')
def B():
print('x')
print('y')
print('z')
假設由協程執行,在執行 A 的過程中,可以随時中斷,去執行 B,B 也可能在執行過程中中斷再去執行 A,結果可能是:
1
2
x
y
3
z
但是在 A 中是沒有調用 B 的,是以 協程的調用 比 函數調用 了解起來要難一些。
看起來 A、B 的執行有點像多線程,但 協程 的特點在于是一個線程執行。
協程 和 多線程比,協程有何優勢?
- 1. 最大的優勢就是協程極高的執行效率。因為 子程式 切換不是線程切換,而是由程式自身控制,是以,沒有線程切換的開銷,和多線程比,線程數量越多,協程的性能優勢就越明顯。
- 2. 第二大優勢就是不需要多線程的鎖機制,因為隻有一個線程,也不存在同時寫變量沖突,在協程中控制共享資源不加鎖,隻需要判斷狀态就好了,是以執行效率比多線程高很多。
因為協程是一個線程執行,那怎麼利用多核CPU呢?
最簡單的方法是 多程序 + 協程,既充分利用多核,又充分發揮協程的高效率,可獲得極高的性能。
Python 對 協程 的支援 是通過 generator (生成器)實作的
在 generator 中,我們不但可以通過 for 循環來疊代,還可以不斷調用 next() 函數擷取由 yield 語句傳回的下一個值。
但是 Python 的 yield 不但可以傳回一個值,它還可以接收調用者發出的參數。
來看例子:
傳統的 生産者-消費者 模型是一個線程寫消息,一個線程取消息,通過鎖機制控制隊列和等待,但一不小心就可能死鎖。如果改用協程,生産者生産消息後,直接通過 yield 跳轉到消費者開始執行,待消費者執行完畢後,切換回生産者繼續生産,效率極高:
#!/usr/bin/python3
# -*- coding: utf-8 -*-
# @Author :
# @File : text.py
# @Software : PyCharm
# @description : XXX
def consumer():
r = ''
while True:
n = yield r
if not n:
return
print('[CONSUMER] Consuming %s...' % n)
r = '200 OK'
def produce(c):
c.send(None)
n = 0
while n < 5:
n = n + 1
print('[PRODUCER] Producing %s...' % n)
r = c.send(n)
print('[PRODUCER] Consumer return: %s' % r)
c.close()
c = consumer()
produce(c)
執行結果:
[PRODUCER] Producing 1...
[CONSUMER] Consuming 1...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 2...
[CONSUMER] Consuming 2...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 3...
[CONSUMER] Consuming 3...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 4...
[CONSUMER] Consuming 4...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 5...
[CONSUMER] Consuming 5...
[PRODUCER] Consumer return: 200 OK
注意到 consumer函數 是一個 generator,把一個 consumer 傳入 produce 後:
- 首先調用 c.send(None) 啟動生成器;
- 然後,一旦生産了東西,通過 c.send(n) 切換到 consumer 執行;
- consumer 通過 yield拿到消息,處理,又通過yield把結果傳回;
- produce 拿到 consumer 處理的結果,繼續生産下一條消息;
- produce 決定不生産了,通過 c.close() 關閉 consumer,整個過程結束。
整個流程無鎖,由一個線程執行,
produce
和
consumer
協作完成任務,是以稱為 “協程”,而非線程的搶占式多任務。
最後套用 Donald Knuth 的一句話總結協程的特點:“子程式就是協程的一種特例。”
參考源碼:https://github.com/michaelliao/learn-python3/blob/master/samples/async/coroutine.py
在 Python 中,異步函數 通常 被稱作 協程
建立一個協程僅僅隻需使用 async 關鍵字,或者使用 @asyncio.coroutine 裝飾器。下面的任一代碼,都可以作為協程工作,形式上也是等同的:
import asyncio
# 方式 1
async def ping_server(ip):
pass
# 方式 2
@asyncio.coroutine
def load_file(path):
pass
上面這兩個 特殊的函數,在調用時會傳回協程對象。熟悉 JavaScript 中 Promise 的同學,可以把這個傳回對象當作跟 Promise 差不多。調用他們中的任意一個,實際上并未立即運作,而是傳回一個協程對象,然後将其傳遞到 Eventloop 中,之後再執行。
- 如何判斷一個 函數是不是協程 ? asyncio 提供了 asyncio.iscoroutinefunction(func) 方法。
- 如何判斷一個 函數傳回的是不是協程對象 ? 可以使用 asyncio.iscoroutine(obj) 。
用 asyncio 提供的 @asyncio.coroutine 可以把一個 generator 标記為 coroutine 類型,然後在 coroutine 内部用 yield from 調用另一個 coroutine 實作異步操作。
Python 3.5 開始引入了新的文法 async 和 await
為了簡化并更好地辨別異步 IO,從 Python 3.5 開始引入了新的文法 async 和 await,可以讓 coroutine 的代碼更簡潔易讀。
async / await 是 python3.5 的新文法,需使用 Python3.5 版本 或 以上才能正确運作。
注意:async 和 await 是針對 coroutine 的新文法,要使用新的文法,隻需要做兩步簡單的替換:
- 把 @asyncio.coroutine 替換為 async
- 把 yield from 替換為 await
Python 3.5 以前 版本原來老的文法使用 協程
import asyncio
@asyncio.coroutine
def hello():
print("Hello world!")
r = yield from asyncio.sleep(1)
print("Hello again!")
Python 3.5 以後 用新文法重新編寫如下:
import asyncio
async def hello():
print("Hello world!")
r = await asyncio.sleep(1)
print("Hello again!")
在過去幾年内,異步程式設計由于某些好的原因得到了充分的重視。雖然它比線性程式設計難一點,但是效率相對來說也是更高。
比如,利用 Python 的 異步協程 (async coroutine) ,在送出 HTTP 請求後,就沒必要等待請求完成再進一步操作,而是可以一邊等着請求完成,一邊做着其他工作。這可能在邏輯上需要多些思考來保證程式正确運作,但是好處是可以利用更少的資源做更多的事。
即便邏輯上需要多些思考,但實際上在 Python 語言中,異步程式設計的文法和執行并不難。跟 Javascript 不一樣,現在 Python 的異步協程已經執行得相當好了。
對于服務端程式設計,異步性似乎是 Node.js 流行的一大原因。我們寫的很多代碼,特别是那些諸如網站之類的高 I/O 應用,都依賴于外部資源。這可以是任何資源,包括從遠端資料庫調用到 POST 一個 REST 請求。一旦你請求這些資源的任一一個,你的代碼在等待資源響應時便無事可做 (譯者注:如果沒有異步程式設計的話)。
有了異步程式設計,在等待這些資源響應的過程中,你的代碼便可以去處理其他的任務。
Python async / await 手冊
Python 部落:Python async/await 手冊:https://python.freelycode.com/contribution/detail/57
知乎:從 0 到 1,Python 異步程式設計的演進之路( 通過爬蟲示範進化之路 ):https://zhuanlan.zhihu.com/p/25228075
async / await 的使用
async 用來聲明一個函數是協程,然後使用 await 調用這個協程, await 必須在函數内部,這個函數通常也被聲明為另一個協程。await 的目的是等待協程控制流的傳回。yield 的目的 是 暫停并挂起函數的操作。
正常的函數在執行時是不會中斷的,是以你要寫一個能夠中斷的函數,就需要添加 async 關鍵。
- async 用來聲明一個函數為異步函數,異步函數的特點是能在函數執行過程中挂起,去執行其他異步函數,等到挂起條件(假設挂起條件是sleep(5))消失後,也就是5秒到了再回來執行。
- await 可以将耗時等待的操作挂起,讓出控制權( await 文法來挂起自身的協程 )。比如:異步程式執行到某一步時需要等待的時間很長,就将此挂起,去執行其他的異步程式。await 後面隻能跟 異步程式 或 有 __await__ 屬性 的 對象,因為異步程式與一般程式不同。
假設有兩個異步函數 async a,async b,a 中的某一步有 await,當程式碰到關鍵字 await b() 後,異步程式挂起後去執行另一個異步b程式,就是從函數内部跳出去執行其他函數,當挂起條件消失後,不管b是否執行完,要馬上從b程式中跳出來,回到原程式執行原來的操作。如果 await 後面跟的 b 函數不是異步函數,那麼操作就隻能等 b 執行完再傳回,無法在 b 執行的過程中傳回。如果要在 b 執行完才傳回,也就不需要用 await 關鍵字了,直接調用 b 函數就行。是以這就需要 await 後面跟的是 異步函數了。在一個異步函數中,可以不止一次挂起,也就是可以用多個 await 。
看下 Python 中常見的幾種函數形式:
# 1. 普通函數
def function():
return 1
# 2. 生成器函數
def generator():
yield 1
# 在3.5過後,我們可以使用async修飾将普通函數和生成器函數包裝成異步函數和異步生成器。
# 3. 異步函數(協程)
async def async_function():
return 1
# 4. 異步生成器
async def async_generator():
yield 1
通過類型判斷可以驗證函數的類型
import types
# 1. 普通函數
def function():
return 1
# 2. 生成器函數
def generator():
yield 1
# 在3.5過後,我們可以使用async修飾将普通函數和生成器函數包裝成異步函數和異步生成器。
# 3. 異步函數(協程)
async def async_function():
return 1
# 4. 異步生成器
async def async_generator():
yield 1
print(type(function) is types.FunctionType)
print(type(generator()) is types.GeneratorType)
print(type(async_function()) is types.CoroutineType)
print(type(async_generator()) is types.AsyncGeneratorType)
直接調用異步函數不會傳回結果,而是傳回一個coroutine對象:
print(async_function())
# <coroutine object async_function at 0x102ff67d8>
協程 需要通過其他方式來驅動,是以可以使用這個協程對象的 send 方法給協程發送一個值:
print(async_function().send(None))
不幸的是,如果通過上面的調用會抛出一個異常:StopIteration: 1
因為 生成器 / 協程 在正常傳回退出時會抛出一個 StopIteration 異常,而原來的傳回值會存放在 StopIteration 對象的 value 屬性中,通過以下捕獲可以擷取協程真正的傳回值:
try:
async_function().send(None)
except StopIteration as e:
print(e.value)
# 1
通過上面的方式來建立一個 run 函數來驅動協程函數,在協程函數中,可以通過 await 文法來挂起自身的協程,并等待另一個 協程 完成直到傳回結果:
def run(coroutine):
try:
coroutine.send(None)
except StopIteration as e:
return 'run() : return {0}'.format(e.value)
async def async_function():
return 1
async def await_coroutine():
result = await async_function()
print('await_coroutine() : print {0} '.format(result))
ret_val = run(await_coroutine())
print(ret_val)
要注意的是,await 文法隻能出現在通過 async 修飾的函數中,否則會報 SyntaxError 錯誤。
而且 await 後面的對象需要是一個 Awaitable,或者實作了相關的協定。
檢視 Awaitable 抽象類的代碼,表明了隻要一個類實作了__await__方法,那麼通過它構造出來的執行個體就是一個 Awaitable:
class Awaitable(metaclass=ABCMeta):
__slots__ = ()
@abstractmethod
def __await__(self):
yield
@classmethod
def __subclasshook__(cls, C):
if cls is Awaitable:
return _check_methods(C, "__await__")
return NotImplemented
而且可以看到,Coroutine類 也繼承了 Awaitable,而且實作了 send,throw 和 close 方法。是以 await 一個調用異步函數傳回的協程對象是合法的。
class Coroutine(Awaitable):
__slots__ = ()
@abstractmethod
def send(self, value):
...
@abstractmethod
def throw(self, typ, val=None, tb=None):
...
def close(self):
...
@classmethod
def __subclasshook__(cls, C):
if cls is Coroutine:
return _check_methods(C, '__await__', 'send', 'throw', 'close')
return NotImplemented
接下來是異步生成器,來看一個例子:
假如我要到一家超市去購買洋芋,而超市貨架上的洋芋數量是有限的:
class Potato:
@classmethod
def make(cls, num, *args, **kws):
potatos = []
for i in range(num):
potatos.append(cls.__new__(cls, *args, **kws))
return potatos
all_potatos = Potato.make(5)
現在我想要買50個洋芋,每次從貨架上拿走一個洋芋放到籃子:
def take_potatos(num):
count = 0
while True:
if len(all_potatos) == 0:
sleep(.1)
else:
potato = all_potatos.pop()
yield potato
count += 1
if count == num:
break
def buy_potatos():
bucket = []
for p in take_potatos(50):
bucket.append(p)
對應到代碼中,就是疊代一個生成器的模型,顯然,當貨架上的洋芋不夠的時候,這時隻能夠死等,而且在上面例子中等多長時間都不會有結果(因為一切都是同步的),也許可以用多程序和多線程解決,而在現實生活中,更應該像是這樣的:
import asyncio
import random
class Potato:
@classmethod
def make(cls, num, *args, **kws):
potatos = []
for i in range(num):
potatos.append(cls.__new__(cls, *args, **kws))
return potatos
all_potatos = Potato.make(5)
async def take_potatos(num):
count = 0
while True:
if len(all_potatos) == 0:
await ask_for_potato()
potato = all_potatos.pop()
yield potato
count += 1
if count == num:
break
async def ask_for_potato():
await asyncio.sleep(random.random())
all_potatos.extend(Potato.make(random.randint(1, 10)))
async def buy_potatos():
bucket = []
async for p in take_potatos(50):
bucket.append(p)
print(f'Got potato {id(p)}...')
def main():
loop = asyncio.get_event_loop()
res = loop.run_until_complete(buy_potatos())
loop.close()
if __name__ == '__main__':
main()
當貨架上的洋芋沒有了之後,可以詢問超市請求需要更多的洋芋,這時候需要等待一段時間直到生産者完成生産的過程。
當生産者完成和傳回之後,這是便能從 await 挂起的地方繼續往下跑,完成消費的過程。而這整一個過程,就是一個異步生成器疊代的流程。
用 asyncio 運作這段代碼,結果是這樣的:
Got potato 4338641384...
Got potato 4338641160...
Got potato 4338614736...
Got potato 4338614680...
Got potato 4338614568...
Got potato 4344861864...
Got potato 4344843456...
Got potato 4344843400...
Got potato 4338641384...
Got potato 4338641160...
...
既然是異步的,在請求之後不一定要死等,而是可以做其他事情。比如除了洋芋,我還想買番茄,這時隻需要在事件循環中再添加一個過程:
def main():
import asyncio
loop = asyncio.get_event_loop()
res = loop.run_until_complete(asyncio.wait([buy_potatos(), buy_tomatos()]))
loop.close()
再來運作這段代碼:
Got potato 4423119312...
Got tomato 4423119368...
Got potato 4429291024...
Got potato 4421640768...
Got tomato 4429331704...
Got tomato 4429331760...
Got tomato 4423119368...
Got potato 4429331760...
Got potato 4429331704...
Got potato 4429346688...
Got potato 4429346072...
Got tomato 4429347360...
...
看下 AsyncGenerator 的定義,它需要實作 __aiter__ 和 __anext__ 兩個核心方法,以及 asend,athrow,aclose 方法。
class AsyncGenerator(AsyncIterator):
__slots__ = ()
async def __anext__(self):
...
@abstractmethod
async def asend(self, value):
...
@abstractmethod
async def athrow(self, typ, val=None, tb=None):
...
async def aclose(self):
...
@classmethod
def __subclasshook__(cls, C):
if cls is AsyncGenerator:
return _check_methods(C, '__aiter__', '__anext__',
'asend', 'athrow', 'aclose')
return NotImplemented
異步生成器是在 3.6 之後才有的特性,同樣的還有異步推導表達式,是以在上面的例子中,也可以寫成這樣:
bucket = [p async for p in take_potatos(50)]
類似的,還有 await 表達式:
result = [await fun() for fun in funcs if await condition()]
除了函數之外,類執行個體的普通方法也能用 async 文法修飾:
class ThreeTwoOne:
async def begin(self):
print(3)
await asyncio.sleep(1)
print(2)
await asyncio.sleep(1)
print(1)
await asyncio.sleep(1)
return
async def game():
t = ThreeTwoOne()
await t.begin()
print('start')
執行個體方法的調用同樣是傳回一個 coroutine:
function = ThreeTwoOne.begin
method = function.__get__(ThreeTwoOne, ThreeTwoOne())
import inspect
assert inspect.isfunction(function)
assert inspect.ismethod(method)
assert inspect.iscoroutine(method())
同理 還有類方法:
class ThreeTwoOne:
@classmethod
async def begin(cls):
print(3)
await asyncio.sleep(1)
print(2)
await asyncio.sleep(1)
print(1)
await asyncio.sleep(1)
return
async def game():
await ThreeTwoOne.begin()
print('start')
根據PEP 492中,async 也可以應用到 上下文管理器中,__aenter__ 和 __aexit__ 需要傳回一個 Awaitable:
class GameContext:
async def __aenter__(self):
print('game loading...')
await asyncio.sleep(1)
async def __aexit__(self, exc_type, exc, tb):
print('game exit...')
await asyncio.sleep(1)
async def game():
async with GameContext():
print('game start...')
await asyncio.sleep(2)
在3.7版本,contextlib 中會新增一個 asynccontextmanager 裝飾器來包裝一個實作異步協定的上下文管理器:
from contextlib import asynccontextmanager
@asynccontextmanager
async def get_connection():
conn = await acquire_db_connection()
try:
yield
finally:
await release_db_connection(conn)
async 修飾符也能用在 __call__ 方法上:
class GameContext:
async def __aenter__(self):
self._started = time()
print('game loading...')
await asyncio.sleep(1)
return self
async def __aexit__(self, exc_type, exc, tb):
print('game exit...')
await asyncio.sleep(1)
async def __call__(self, *args, **kws):
if args[0] == 'time':
return time() - self._started
async def game():
async with GameContext() as ctx:
print('game start...')
await asyncio.sleep(2)
print('game time: ', await ctx('time'))
asyncio
asyncio
是 Python 3.4 版本引入的标準庫,直接内置了對 異步 IO 的支援。
asyncio 官方隻實作了比較底層的協定,比如TCP,UDP。是以諸如 HTTP 協定之類都需要借助第三方庫,比如 aiohttp 。
雖然異步程式設計的生态不夠同步程式設計的生态那麼強大,但是如果有高并發的需求不妨試試,下面說一下比較成熟的異步庫
aiohttp:異步 http client/server架構。github位址: https://github.com/aio-libs/aiohttp
sanic:速度更快的類 flask web架構。github位址:https://github.com/channelcat/sanic
uvloop 快速,内嵌于 asyncio 事件循環的庫,使用 cython 基于 libuv 實作。github位址: https://github.com/MagicStack/uvloop
asyncio
的程式設計模型就是一個 消息循環。我們從
asyncio
子產品中直接擷取一個
EventLoop
的引用,然後把需要執行的協程扔到
EventLoop
中執行,就實作了 異步IO。
python 用asyncio
子產品實作異步程式設計,該子產品最大特點就是,隻存在一個線程
由于隻有一個線程,就不可能多個任務同時運作。asyncio 是 "多任務合作" 模式(cooperative multitasking),允許異步任務交出執行權給其他任務,等到其他任務完成,再收回執行權繼續往下執行
asyncio 子產品在單線程上啟動一個事件循環(event loop),時刻監聽新進入循環的事件,加以處理,并不斷重複這個過程,直到異步任務結束。
什麼是事件循環?
單線程就意味着所有的任務需要在單線程上排隊執行,也就是前一個任務沒有執行完成,後一個任務就沒有辦法執行。在CPU密集型的任務之中,這樣其實還行,但是如果我們的任務都是IO密集型的呢?也就是我們大部分的任務都是在等待網絡的資料傳回,等待磁盤檔案的資料,這就會造成CPU一直在等待這些任務的完成再去執行下一個任務。
有沒有什麼辦法能夠讓單線程的任務執行不這麼笨呢?其實我們可以将這些需要等待IO裝置的任務挂在一邊嘛!這時候,如果我們的任務都是需要等待的任務,那麼單線程在執行時遇到一個就把它挂起來,這裡可以通過一個資料結構(例如隊列)将這些處于執行等待狀态的任務放進去,為什麼是執行等待狀态呢?因為它們正在執行但是又不得不等待例如網絡資料的傳回等等。直到将所有的任務都放進去之後,單線程就可以開始它的接連不斷的表演了:有沒有任務完成的小夥伴呀!快來我這裡執行!
此時如果有某個任務完成了,它會得到結果,于是發出一個信号:我完成了。那邊還在循環追問的單線程終于得到了答複,就會去看看這個任務有沒有綁定什麼回調函數呀?如果綁定了回調函數就進去把回調函數給執行了,如果沒有,就将它所在的任務恢複執行,并将結果傳回。
asyncio 就是一個 協程庫
- (1)事件循環 (event loop)。事件循環需要實作兩個功能,一是順序執行協程代碼;二是完成協程的排程,即一個協程“暫停”時,決定接下來執行哪個協程。
- (2)協程上下文的切換。基本上Python 生成器的 yeild 已經能完成切換,Python3中還有特定文法支援協程切換。
Python 的異步IO:API
官方文檔:https://docs.python.org/zh-cn/3/library/asyncio.html
Python 的 asyncio 是使用 async/await 文法編寫并發代碼的标準庫。Python3.7 這個版本,asyncio又做了比較大的調整,把這個庫的 API 分為了 高層級API 和 低層級API,并引入asyncio.run() 這樣的進階方法,讓編寫異步程式更加簡潔。
這裡先從全局認識 Python 這個異步IO庫。
asyncio 的 高層級 API 主要提高如下幾個方面:
- 并發地運作Python協程并完全控制其執行過程;
- 執行網絡IO和IPC;
- 控制子程序;
- 通過隊列實作分布式任務;
- 同步并發代碼。
asyncio 的 低層級API 用以支援開發異步庫和架構:
- 建立和管理事件循環(event loop),提供異步的API用于網絡,運作子程序,處理作業系統信号等;
- 通過 transports 實作高效率協定;
- 通過 async/await 文法橋架基于回調的庫和代碼。
asyncio 進階 API
高層級API讓我們更友善的編寫基于asyncio的應用程式。這些API包括:
(1)協程和任務
協程通過 async/await 文法進行聲明,是編寫異步應用的推薦方式。曆史的
@asyncio.coroutine
和
yield from
已經被棄用,并計劃在Python 3.10中移除。協程可以通過
asyncio.run(coro, *, debug=False)
函數運作,該函數負責管理事件循環并完結異步生成器。它應該被用作asyncio程式的主入口點,相當于main函數,應該隻被調用一次。
任務被用于并發排程協程,可用于網絡爬蟲的并發。使用
asyncio.create_task()
就可以把一個協程打包為一個任務,該協程會自動安排為很快運作。
協程,任務和Future都是可等待對象。其中,Future是低層級的可等待對象,表示一個異步操作的最終結果。
(2)流
流是用于網絡連接配接的高層級的使用 async/await的原語。流允許在不使用回調或低層級協定和傳輸的情況下發送和接收資料。異步讀寫TCP有用戶端函數
asyncio.open_connection()
和 服務端函數
asyncio.start_server()
。它還支援 Unix Sockets:
asyncio.open_unix_connection()
和
asyncio.start_unix_server()
。
(3)同步原語
asyncio同步原語的設計類似于threading子產品的原語,有兩個重要的注意事項:
asyncio原語不是線程安全的,是以它們不應該用于OS線程同步(而是用threading)
這些同步原語的方法不接受逾時參數; 使用
asyncio.wait_for()
函數執行逾時操作。
asyncio具有以下基本同步原語:
- Lock
- Event
- Condition
- Semaphore
- BoundedSemaphore
(4)子程序
asyncio提供了通過 async/await 建立和管理子程序的API。不同于Python标準庫的subprocess,asyncio的子程序函數都是異步的,并且提供了多種工具來處理這些函數,這就很容易并行執行和監視多個子程序。建立子程序的方法主要有兩個:
coroutine asyncio.create_subprocess_exec()
coroutine asyncio.create_subprocess_shell()
(5)隊列
asyncio 隊列的設計類似于标準子產品queue的類。雖然asyncio隊列不是線程安全的,但它們被設計為專門用于 async/await 代碼。需要注意的是,asyncio隊列的方法沒有逾時參數,使用
asyncio.wait_for()
函數進行逾時的隊列操作。
因為和标注子產品queue的類設計相似,使用起來跟queue無太多差異,隻需要在對應的函數前面加 await 即可。asyncio 隊列提供了三種不同的隊列:
- class asyncio.Queue 先進先出隊列
- class asyncio.PriorityQueue 優先隊列
- class asyncio.LifoQueue 後進先出隊列
(6)異常
asyncio提供了幾種異常,它們是:
- TimeoutError,
- CancelledError,
- InvalidStateError,
- SendfileNotAvailableError
- IncompleteReadError
- LimitOverrunError
asyncio低級API
低層級API為編寫基于asyncio的庫和架構提供支援,有意編寫異步庫和架構的大牛們需要熟悉這些低層級API。主要包括:
(1)事件循環
事件循環是每個asyncio應用程式的核心。 事件循環運作異步任務和回調,執行網絡IO操作以及運作子程序。
應用程式開發人員通常應該使用進階asyncio函數,例如
asyncio.run()
,并且很少需要引用循環對象或調用其方法。
Python 3.7 新增了
asyncio.get_running_loop()
函數。
(2)Futures
Future對象用于将基于低層級回調的代碼與高層級的 async/await 代碼進行橋接。
Future表示異步操作的最終結果。 不是線程安全的。
Future是一個可等待對象。 協程可以等待Future對象,直到它們有結果或異常集,或者直到它們被取消。
通常,Futures用于啟用基于低層級回調的代碼(例如,在使用asyncio傳輸實作的協定中)以與高層級 async/await 代碼進行互操作。
(3)傳輸和協定(Transports和Protocols)
Transport 和 Protocol由低層級事件循環使用,比如函數
loop.create_connection()
。它們使用基于回調的程式設計風格,并支援網絡或IPC協定(如HTTP)的高性能實作。
在最進階别,傳輸涉及位元組的傳輸方式,而協定确定要傳輸哪些位元組(在某種程度上何時傳輸)。
換種方式說就是:傳輸是套接字(或類似的I/O端點)的抽象,而協定是從傳輸的角度來看的應用程式的抽象。
另一種觀點是傳輸和協定接口共同定義了一個使用網絡I/O和程序間I/O的抽象接口。
傳輸和協定對象之間始終存在1:1的關系:協定調用傳輸方法來發送資料,而傳輸調用協定方法來傳遞已接收的資料。
大多數面向連接配接的事件循環方法(例如
loop.create_connection()
)通常接受protocol_factory參數,該參數用于為接受的連接配接建立Protocol對象,由Transport對象表示。 這些方法通常傳回(傳輸,協定)元組。
(4)政策(Policy)
事件循環政策是一個全局的按程序劃分的對象,用于控制事件循環的管理。 每個事件循環都有一個預設政策,可以使用政策API對其進行更改和自定義。
政策定義了上下文的概念,并根據上下文管理單獨的事件循環。 預設政策将上下文定義為目前線程。
通過使用自定義事件循環政策,可以自定義
get_event_loop()
,
set_event_loop()
和
new_event_loop()
函數的行為。
(5)平台支援
asyncio子產品設計為可移植的,但由于平台的底層架構和功能,某些平台存在細微的差異和限制。在Windows平台,有些是不支援的,比如
loop.create_unix_connection()
and
loop.create_unix_server()
。而Linux和比較新的macOS全部支援。
總結
Python 3.7 通過對 asyncio 分組使得它的架構更加清晰,普通寫異步IO的應用程式隻需熟悉高層級API,需要寫異步IO的庫和架構時才需要了解低層級的API。
生産者 --- 消費者
Python 分布與并行 asyncio實作生産者消費者模型:https://blog.csdn.net/weixin_43594279/article/details/111243453
示例 1:
# coding=utf-8
import asyncio
async def consumer(n, q):
print('consumer {}: starting'.format(n))
while True:
print('consumer {}: waiting for item'.format(n))
item = await q.get()
print('consumer {}: has item {}'.format(n, item))
if item is None:
# None is the signal to stop.
q.task_done()
break
else:
await asyncio.sleep(0.01 * item)
q.task_done()
print('consumer {}: ending'.format(n))
async def producer(q, num_workers):
print('producer: starting')
# Add some numbers to the queue to simulate jobs
for i in range(num_workers * 3):
await q.put(i)
print('producer: added task {} to the queue'.format(i))
# Add None entries in the queue
# to signal the consumers to exit
print('producer: adding stop signals to the queue')
for i in range(num_workers):
await q.put(None)
print('producer: waiting for queue to empty')
await q.join()
print('producer: ending')
async def main(num_consumers=1):
q = asyncio.Queue(maxsize=num_consumers)
consumer_list = [
# asyncio.create_task(consumer(i, q)) for i in range(num_consumers)
asyncio.ensure_future(consumer(i, q)) for i in range(num_consumers)
]
# produce_list = [asyncio.create_task(producer(q, num_consumers))]
produce_list = [asyncio.ensure_future(producer(q, num_consumers))]
task_list = consumer_list + produce_list
for item in task_list:
await item
if __name__ == '__main__':
asyncio.run(main(num_consumers=3))
pass
示例 2:
Python 的異步IO程式設計例子
以 Python 3.7 上的 asyncio 為例講解如何使用 Python 的異步 IO。
建立第一個協程
Python 3.7 推薦使用 async/await 文法來聲明協程,來編寫異步應用程式。我們來建立第一個協程函數:首先列印一行“你好”,等待1秒鐘後再列印 "大家同好"。
import asyncio
async def say_hi():
print('你好')
await asyncio.sleep(1)
print('大家同好')
asyncio.run(say_hi())
"""
你好
大家同好
"""
say_hi() 函數通過 async 聲明為協程函數,較之前的修飾器聲明更簡潔明了。
在實踐過程中,什麼功能的函數要用 async 聲明為協程函數呢?就是那些能發揮異步IO性能的函數,比如讀寫檔案、讀寫網絡、讀寫資料庫,這些都是浪費時間的IO操作,把它們協程化、異步化進而提高程式的整體效率(速度)。
say_hi() 函數是通過
asyncio.run()
來運作的,而不是直接調用這個函數(協程)。因為,直接調用并不會把它加入排程日程,而隻是簡單的傳回一個協程對象:
print(say_hi()) # <coroutine object say_hi at 0x000001264DB3FCC0>
真正運作一個協程
那麼,如何真正運作一個協程呢?
asyncio 提供了三種機制:
- (1)asyncio.run() 函數,這是異步程式的主入口,相當于C語言中的main函數。
- (2)用 await 等待協程,比如上例中的
。await asyncio.sleep(1)
再看下面的例子,我們定義了協程
say_delay()
,在 main() 協程中調用兩次,第一次延遲1秒後列印“你好”,第二次延遲2秒後列印 "大家同好"。這樣我們通過 await 運作了兩個協程。
import asyncio
import datetime
async def say_delay(msg=None, delay=None):
await asyncio.sleep(delay)
print(msg)
async def main():
print(f'begin at {datetime.datetime.now().replace(microsecond=0)}')
await say_delay('你好', 2)
await say_delay('大家同好', 1)
print(f'end at {datetime.datetime.now().replace(microsecond=0)}')
asyncio.run(main())
'''
begin at 2020-12-19 00:55:01
你好
大家同好
end at 2020-12-19 00:55:04
'''
從起止時間可以看出,兩個協程是順序執行的,總共耗時1+2=3秒。
- (3)通過
函數并發運作作為 asyncio 任務(Task) 的多個協程。下面,我們用 create_task() 來修改上面的 main() 協程,進而讓兩個 say_delay() 協程并發運作:asyncio.create_task()
import asyncio
import datetime
async def say_delay(msg=None, delay=None):
await asyncio.sleep(delay)
print(msg)
async def main():
task_1 = asyncio.create_task(say_delay('你好', 2))
task_2 = asyncio.create_task(say_delay('大家同好', 1))
print(f'begin at {datetime.datetime.now().replace(microsecond=0)}')
await task_1
await task_2
print(f'end at {datetime.datetime.now().replace(microsecond=0)}')
asyncio.run(main())
'''
begin at 2020-12-19 00:58:20
大家同好
你好
end at 2020-12-19 00:58:22
'''
從運作結果的起止時間可以看出,兩個協程是并發執行的了,總耗時等于最大耗時2秒。
是一個很有用的函數,在爬蟲中它可以幫助我們實作大量并發去下載下傳網頁。在 Python 3.6中與它對應的是
asyncio.create_task()
。
ensure_future()
生産者 --- 消費者
示例 代碼:
# coding=utf-8
import asyncio
async def consumer(n, q):
print('consumer {}: starting'.format(n))
while True:
print('consumer {}: waiting for item'.format(n))
item = await q.get()
print('consumer {}: has item {}'.format(n, item))
if item is None:
# None is the signal to stop.
q.task_done()
break
else:
await asyncio.sleep(0.01 * item)
q.task_done()
print('consumer {}: ending'.format(n))
async def producer(q, num_workers):
print('producer: starting')
# Add some numbers to the queue to simulate jobs
for i in range(num_workers * 3):
await q.put(i)
print('producer: added task {} to the queue'.format(i))
# Add None entries in the queue
# to signal the consumers to exit
print('producer: adding stop signals to the queue')
for i in range(num_workers):
await q.put(None)
print('producer: waiting for queue to empty')
await q.join()
print('producer: ending')
async def main(num_consumers=1):
q = asyncio.Queue(maxsize=num_consumers)
consumer_list = [
asyncio.create_task(consumer(i, q)) for i in range(num_consumers)
]
produce_list = [asyncio.create_task(producer(q, num_consumers))]
task_list = consumer_list + produce_list
for item in task_list:
await item
if __name__ == '__main__':
asyncio.run(main(num_consumers=3))
pass
可等待對象(awaitables)
可等待對象,就是可以在 await 表達式中使用的對象,前面我們已經接觸了兩種可等待對象的類型:協程 和 任務,還有一個是低層級的 Future。
asyncio 子產品的許多 API 都需要傳入可等待對象,比如 run(), create_task() 等等。
(1)協程
協程是可等待對象,可以在其它協程中被等待。協程兩個緊密相關的概念是:
- 協程函數:通過 async def 定義的函數;
- 協程對象:調用協程函數傳回的對象。
運作上面這段程式,結果為:
co is
now is 1548512708.2026224
now is 1548512708.202648
可以看到,直接運作協程函數 whattime()得到的co是一個協程對象,因為協程對象是可等待的,是以通過 await 得到真正的目前時間。now2是直接await 協程函數,也得到了目前時間的傳回值。
(2)任務
前面我們講到,任務是用來排程協程的,以便并發執行協程。當一個協程通過
asyncio.create_task()
被打包為一個 任務,該協程将自動加入排程隊列中,但是還未執行。
create_task() 的基本使用前面例子已經講過。它傳回的 task 通過 await 來等待其運作完。如果,我們不等待,會發生什麼?“準備立即運作”又該如何了解呢?先看看下面這個例子:
運作這段代碼的情況是這樣的:首先,1秒鐘後列印一行,這是第13,14行代碼運作的結果:
calling:0, now is 09:15:15
接着,停頓1秒後,連續列印4行:
calling:1, now is 09:15:16
calling:2, now is 09:15:16
calling:3, now is 09:15:16
calling:4, now is 09:15:16
從這個結果看,
asyncio.create_task()
産生的4個任務,我們并沒有
await
,它們也執行了。關鍵在于第18行的
await
,如果把這一行去掉或是 sleep 的時間小于1秒(比whattime()裡面的sleep時間少即可),就會隻看到第一行的輸出結果而看不到後面四行的輸出。這是因為,main() 不 sleep 或 sleep 少于1秒鐘,main() 就在 whattime() 還未來得及列印結果(因為,它要sleep 1秒)就退出了,進而整個程式也退出了,就沒有 whattime() 的輸出結果。
再來了解一下 “準備立即執行” 這個說法。它的意思就是,create_task() 隻是打包了協程并加入排程隊列還未執行,并準備立即執行,什麼時候執行呢?在 “主協程”(調用create_task()的協程)挂起的時候,這裡的“挂起”有兩個方式:
- 一是,通過 await task 來執行這個任務;
- 另一個是,主協程通過 await sleep 挂起,事件循環就去執行task了。
我們知道,asyncio 是通過事件循環實作異步的。在主協程 main()裡面,沒有遇到 await 時,事件就是執行 main() 函數,遇到 await 時,事件循環就去執行别的協程,即 create_task() 生成的 whattime()的4個任務,這些任務一開始就是 await sleep 1秒。這時候,主協程和4個任務協程都挂起了,CPU空閑,事件循環等待協程的消息。
如果 main() 協程隻 sleep了 0.1秒,它就先醒了,給事件循環發消息,事件循環就來繼續執行 main() 協程,而 main() 後面已經沒有代碼,就退出該協程,退出它也就意味着整個程式退出,4個任務就沒機會列印結果;
如果 main()協程sleep時間多餘1秒,那麼4個任務先喚醒,就會得到全部的列印結果;
如果main()的18行sleep等于1秒時,和4個任務的sleep時間相同,也會得到全部列印結果。這是為什麼呢?
我猜想是這樣的:4個任務生成在前,第18行的sleep在後,事件循環的消息響應可能有個先進先出的順序。後面深入asyncio的代碼專門研究一下這個猜想正确與否。
示例:
# -*- coding: utf-8 -*-
"""
@File : aio_test.py
@Author : XXX
@Time : 2020/12/25 23:54
"""
import asyncio
import datetime
async def hi(msg=None, sec=None):
print(f'enter hi(), {msg} @{datetime.datetime.now().replace(microsecond=0)}')
await asyncio.sleep(sec)
print(f'leave hi(), {msg} @{datetime.datetime.now().replace(microsecond=0)}')
return sec
async def main_1():
print(f'main() begin at {datetime.datetime.now().replace(microsecond=0)}')
tasks = []
for i in range(1, 5):
tsk = asyncio.create_task(hi(i, i))
tasks.append(tsk)
for tsk in tasks:
ret_val = await tsk
print(f'ret_val:{ret_val}')
print(f'main() end at {datetime.datetime.now().replace(microsecond=0)}')
async def main_2():
# ***** 注意:main_2 中睡眠了2秒,導緻睡眠時間大于2秒的協程沒有執行完成 *****
print(f'main() begin at {datetime.datetime.now().replace(microsecond=0)}')
tasks = []
for i in range(1, 5):
tsk = asyncio.create_task(hi(i, i))
tasks.append(tsk)
await asyncio.sleep(2)
print(f'main() end at {datetime.datetime.now().replace(microsecond=0)}')
async def main_3():
# ***** 注意:main_3方法并沒有實作并發執行,隻是順序執行 *****
print(f'main() begin at {datetime.datetime.now().replace(microsecond=0)}')
tasks = []
for i in range(1, 5):
tsk = asyncio.create_task(hi(i, i))
await tsk
print(f'main() end at {datetime.datetime.now().replace(microsecond=0)}')
print('*' * 50)
asyncio.run(main_1())
print('*' * 50)
asyncio.run(main_2())
print('*' * 50)
asyncio.run(main_3())
print('*' * 50)
(3)Future
它是一個低層級的可等待對象,表示一個異步操作的最終結果。目前,我們寫應用程式還用不到它,暫不學習。
asyncio異步IO協程總結
協程就是我們異步操作的片段。通常,寫程式都會把全部功能分成很多不同功能的函數,目的是為了結構清晰;進一步,把那些涉及耗費時間的IO操作(讀寫檔案、資料庫、網絡)的函數通過 async def 異步化,就是異步程式設計。
那些異步函數(協程函數)都是通過消息機制被事件循環管理排程着,整個程式的執行是單線程的,但是某個協程A進行IO時,事件循環就去執行其它協程非IO的代碼。當事件循環收到協程A結束IO的消息時,就又回來執行協程A,這樣事件循環不斷在協程之間轉換,充分利用了IO的閑置時間,進而并發的進行多個IO操作,這就是異步IO。
寫異步IO程式時記住一個準則:需要IO的地方異步。其它地方即使用了協程函數也是沒用的。
不 使用 asyncio 的 消息循環 讓協程運作
先看下 不使用
asyncio 的
消息循環 怎麼 調用 協程,讓協程 運作:
async def func_1():
print("func_1 start")
print("func_1 end")
async def func_2():
print("func_2 start")
print("func_2 a")
print("func_2 b")
print("func_2 c")
print("func_2 end")
f_1 = func_1()
print(f_1)
f_2 = func_2()
print(f_2)
try:
print('f_1.send')
f_1.send(None)
except StopIteration as e:
# 這裡也是需要去捕獲StopIteration方法
pass
try:
print('f_2.send')
f_2.send(None)
except StopIteration as e:
pass
運作結果:
<coroutine object func_1 at 0x0000020121A07C40>
<coroutine object func_2 at 0x0000020121B703C0>
f_1.send
func_1 start
func_1 end
f_2.send
func_2 start
func_2 a
func_2 b
func_2 c
func_2 end
示例代碼2:
async def test(x):
return x * 2
print(test(100))
try:
# 既然是協程,我們像之前yield協程那樣
test(100).send(None)
except BaseException as e:
print(type(e))
ret_val = e.value
print(ret_val)
示例代碼3:
def simple_coroutine():
print('-> start')
x = yield
print('-> recived', x)
sc = simple_coroutine()
next(sc)
try:
sc.send('zhexiao')
except BaseException as e:
print(e)
對上述例子的分析:yield 的右邊沒有表達式,是以這裡預設産出的值是None。剛開始先調用了next(...)是因為這個時候生成器還沒有啟動,沒有停在yield那裡,這個時候也是無法通過send發送資料。是以當我們通過 next(...)激活協程後 ,程式就會運作到x = yield,這裡有個問題我們需要注意, x = yield這個表達式的計算過程是先計算等号右邊的内容,然後在進行指派,是以當激活生成器後,程式會停在yield這裡,但并沒有給x指派。當我們調用 send 方法後 yield 會收到這個值并指派給 x,而當程式運作到協程定義體的末尾時和用生成器的時候一樣會抛出StopIteration異常
如果協程沒有通過 next(...) 激活(同樣我們可以通過send(None)的方式激活),但是我們直接send,會提示如下錯誤:
最先調用 next(sc) 函數這一步通常稱為“預激”(prime)協程 (即,讓協程執行到第一個 yield 表達式,準備好作為活躍的協程使用)。
協程在運作過程中有四個狀态:
- GEN_CREATE: 等待開始執行
- GEN_RUNNING: 解釋器正在執行,這個狀态一般看不到
- GEN_SUSPENDED: 在yield表達式處暫停
- GEN_CLOSED: 執行結束
通過下面例子來檢視協程的狀态:
示例代碼4:(使用協程計算移動平均值)
def averager():
total = 0.0
count = 0
avg = None
while True:
num = yield avg
total += num
count += 1
avg = total / count
# run
ag = averager()
# 預激協程
print(next(ag)) # None
print(ag.send(10)) # 10
print(ag.send(20)) # 15
這裡是一個死循環,隻要不停 send 值 給 協程,可以一直計算下去。
解釋:
- 1. 調用 next(ag) 函數後,協程會向前執行到 yield 表達式,産出 average 變量的初始值 None。
- 2. 此時,協程在 yield 表達式處暫停。
- 3. 使用 send() 激活協程,把發送的值賦給 num,并計算出 avg 的值。
- 4. 使用 print 列印出 yield 傳回的資料。
單步 調試 上面程式。
使用 asyncio 的 消息循環 讓協程運作
使用 asyncio 異步 IO 調用 協程
示例代碼 1:
import asyncio
async def func_1():
print("func_1 start")
print("func_1 end")
# await asyncio.sleep(1)
async def func_2():
print("func_2 start")
print("func_2 a")
print("func_2 b")
print("func_2 c")
print("func_2 end")
# await asyncio.sleep(1)
f_1 = func_1()
print(f_1)
f_2 = func_2()
print(f_2)
# 擷取 EventLoop:
loop = asyncio.get_event_loop()
tasks = [func_1(), func_2()]
# 執行 coroutine
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
示例代碼 2:
import asyncio
import time
start = time.time()
def tic():
return 'at %1.1f seconds' % (time.time() - start)
async def gr1():
# Busy waits for a second, but we don't want to stick around...
print('gr1 started work: {}'.format(tic()))
# 暫停兩秒,但不阻塞時間循環,下同
await asyncio.sleep(2)
print('gr1 ended work: {}'.format(tic()))
async def gr2():
# Busy waits for a second, but we don't want to stick around...
print('gr2 started work: {}'.format(tic()))
await asyncio.sleep(2)
print('gr2 Ended work: {}'.format(tic()))
async def gr3():
print("Let's do some stuff while the coroutines are blocked, {}".format(tic()))
await asyncio.sleep(1)
print("Done!")
# 事件循環
ioloop = asyncio.get_event_loop()
# tasks中也可以使用 asyncio.ensure_future(gr1())..
tasks = [
ioloop.create_task(gr1()),
ioloop.create_task(gr2()),
ioloop.create_task(gr3())
]
ioloop.run_until_complete(asyncio.wait(tasks))
ioloop.close()
"""
結果:
gr1 started work: at 0.0 seconds
gr2 started work: at 0.0 seconds
Let's do some stuff while the coroutines are blocked, at 0.0 seconds
Done!
gr2 Ended work: at 2.0 seconds
gr1 ended work: at 2.0 seconds
"""
多個
coroutine
可以封裝成一組 Task 然後并發執行。
-
asyncio.wait(...)
協程的參數是一個由 future 或 協程 構成的可疊代對象;wait 會分别
把各個協程包裝進一個 Task 對象。最終的結果是,wait 處理的所有對象都通過某種方式變成 Future 類的執行個體。wait 是協程函數,是以傳回的是一個協程或生成器對象。
-
方法的參數是一個 future 或 協程。如果是協程,ioloop.run_until_complete
方法與 wait 函數一樣,把協程包裝進一個 Task 對象中。run_until_complete
- 在 asyncio 包中,future和協程關系緊密,因為可以使用 yield from 從 asyncio.Future 對象中産出結果。這意味着,如果 foo 是協程函數(調用後傳回協程對象),抑或是傳回Future 或 Task 執行個體的普通函數,那麼可以這樣寫:res = yield from foo()。這是 asyncio 包的 API 中很多地方可以互換協程與期物的原因之一。 例如上面的例子中 tasks 可以改寫成協程清單:
tasks = [gr1(), gr(2), gr(3)]
詳細的各個類說明,類方法,傳參,以及方法傳回的是什麼類型都可以在官方文檔上仔細研讀,多讀幾遍,方有體會。
示例代碼 3:
import asyncio
import time
import aiohttp
import async_timeout
msg = "http://www.nationalgeographic.com.cn/photography/photo_of_the_day/{}.html"
headers = {
'User-Agent': 'Mozilla/5.0 (Windows; U; Windows NT 6.1; en-US; rv:1.9.1.6) Gecko/20091201 Firefox/3.5.6'
}
urls = [msg.format(i) for i in range(5400, 5500)]
async def fetch(session, url):
with async_timeout.timeout(10):
async with session.get(url) as response:
return response.status
async def main(url):
async with aiohttp.ClientSession() as session:
status = await fetch(session, url)
return status
if __name__ == '__main__':
start = time.time()
loop = asyncio.get_event_loop()
tasks = [main(url) for url in urls]
# 傳回一個清單,内容為各個tasks的傳回值
status_list = loop.run_until_complete(asyncio.gather(*tasks))
print(len([status for status in status_list if status == 200]))
end = time.time()
print("cost time:", end - start)
示例代碼 4:
用
asyncio
實作
Hello world
代碼如下:
import asyncio
@asyncio.coroutine
def hello():
print("Hello world!")
# 異步調用 asyncio.sleep(1):
r = yield from asyncio.sleep(1)
print("Hello again!")
# 擷取 EventLoop:
loop = asyncio.get_event_loop()
# 執行 coroutine
loop.run_until_complete(hello())
loop.close()
或者直接使用新文法 async 和 await
import asyncio
async def hello():
print("Hello world!")
# 異步調用 asyncio.sleep(1):
r = await asyncio.sleep(1)
print("Hello again!")
# 擷取 EventLoop:
loop = asyncio.get_event_loop()
# 執行 coroutine
loop.run_until_complete(hello())
loop.close()
@asyncio.coroutine
把一個 generator 标記為 coroutine類型,然後,我們就把這個
coroutine
扔到
EventLoop
中執行。
hello()
會首先列印出
Hello world!
,然後,
yield from
文法可以讓我們友善地調用另一個
generator
。由于
asyncio.sleep()
也是一個
coroutine
,是以線程不會等待
asyncio.sleep()
,而是直接中斷并執行下一個消息循環。當
asyncio.sleep()
傳回時,線程就可以從
yield from
拿到傳回值(此處是
None
),然後接着執行下一行語句。
把
asyncio.sleep(1)
看成是一個耗時1秒的IO操作,在此期間,主線程并未等待,而是去執行
EventLoop
中其他可以執行的
coroutine
了,是以可以實作并發執行。
我們用 Task 封裝兩個
coroutine
試試:
import threading
import asyncio
async def hello():
print('1 : Hello world! (%s)' % threading.currentThread())
await asyncio.sleep(5)
print('2 : Hello again! (%s)' % threading.currentThread())
loop = asyncio.get_event_loop()
tasks = [hello(), hello()]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
觀察執行過程:
1 : Hello world! (<_MainThread(MainThread, started 12200)>)
1 : Hello world! (<_MainThread(MainThread, started 12200)>)
( 暫停約 5 秒 )
2 : Hello again! (<_MainThread(MainThread, started 12200)>)
2 : Hello again! (<_MainThread(MainThread, started 12200)>)
由列印的目前線程名稱可以看出,兩個
coroutine
是由同一個線程并發執行的。
如果把
asyncio.sleep()
換成真正的IO操作,則多個
coroutine
就可以由一個線程并發執行。
我們用
asyncio
的異步網絡連接配接來擷取 sina、sohu 和 163 的網站首頁:
import asyncio
async def wget(host):
print('wget %s...' % host)
connect = asyncio.open_connection(host, 80)
reader, writer = await connect
header = 'GET / HTTP/1.0\r\nHost: %s\r\n\r\n' % host
writer.write(header.encode('utf-8'))
await writer.drain()
while True:
line = await reader.readline()
if line == b'\r\n':
break
print('%s header > %s' % (host, line.decode('utf-8').rstrip()))
# Ignore the body, close the socket
writer.close()
loop = asyncio.get_event_loop()
tasks = [wget(host) for host in ['www.sina.com.cn', 'www.sohu.com', 'www.163.com']]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
執行結果如下:
wget www.sohu.com...
wget www.sina.com.cn...
wget www.163.com...
(等待一段時間)
(列印出sohu的header)
www.sohu.com header > HTTP/1.1 200 OK
www.sohu.com header > Content-Type: text/html
...
(列印出sina的header)
www.sina.com.cn header > HTTP/1.1 200 OK
www.sina.com.cn header > Date: Wed, 20 May 2015 04:56:33 GMT
...
(列印出163的header)
www.163.com header > HTTP/1.0 302 Moved Temporarily
www.163.com header > Server: Cdn Cache Server V2.0
可見 3 個連接配接 由一個線程通過
coroutine
并發完成。
參考源碼:
async_hello.py:https://github.com/michaelliao/learn-python3/blob/master/samples/async/async_hello.py
async_wget.py:https://github.com/michaelliao/learn-python3/blob/master/samples/async/async_wget.py
示例代碼 5: ( 協程 的 傳回值 )
一個協程裡可以啟動另外一個協程,并等待它完成傳回結果,采用 await 關鍵字
import asyncio
async def outer():
print('in outer')
print('waiting for result1')
result1 = await phase1()
print('waiting for result2')
result2 = await phase2(result1)
return (result1, result2)
async def phase1():
print('in phase1')
return 'result1'
async def phase2(arg):
print('in phase2')
return 'result2 derived from {}'.format(arg)
event_loop = asyncio.get_event_loop()
try:
return_value = event_loop.run_until_complete(outer())
print('return value: {!r}'.format(return_value))
finally:
event_loop.close()
運作結果:
in outer
waiting for result1
in phase1
waiting for result2
in phase2
return value: ('result1', 'result2 derived from result1')
前面都是關于 asyncio 的例子,那麼除了asyncio,就沒有其他協程庫了嗎?asyncio 作為 python 的标準庫,自然受到很多青睐,但它有時候還是顯得太重量了,尤其是提供了許多複雜的輪子和協定,不便于使用。
你可以了解為,asyncio 是使用 async/await 文法開發的 協程庫,而不是有 asyncio 才能用 async/await,
除了 asyncio 之外,curio 和 trio 是更加輕量級的替代物,而且也更容易使用。
curio 的作者是 David Beazley,下面是使用 curio 建立 tcp server 的例子,據說這是 dabeaz 理想中的一個異步伺服器的樣子:
from curio import run, spawn
from curio.socket import *
async def echo_server(address):
sock = socket(AF_INET, SOCK_STREAM)
sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
sock.bind(address)
sock.listen(5)
print('Server listening at', address)
async with sock:
while True:
client, addr = await sock.accept()
await spawn(echo_client, client, addr)
async def echo_client(client, addr):
print('Connection from', addr)
async with client:
while True:
data = await client.recv(100000)
if not data:
break
await client.sendall(data)
print('Connection closed')
if __name__ == '__main__':
run(echo_server, ('',25000))
無論是 asyncio 還是 curio,或者是其他異步協程庫,在背後往往都會借助于 IO的事件循環來實作異步,下面用幾十行代碼來展示一個簡陋的基于事件驅動的echo伺服器:
from socket import socket, AF_INET, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR
from selectors import DefaultSelector, EVENT_READ
selector = DefaultSelector()
pool = {}
def request(client_socket, addr):
client_socket, addr = client_socket, addr
def handle_request(key, mask):
data = client_socket.recv(100000)
if not data:
client_socket.close()
selector.unregister(client_socket)
del pool[addr]
else:
client_socket.sendall(data)
return handle_request
def recv_client(key, mask):
sock = key.fileobj
client_socket, addr = sock.accept()
req = request(client_socket, addr)
pool[addr] = req
selector.register(client_socket, EVENT_READ, req)
def echo_server(address):
sock = socket(AF_INET, SOCK_STREAM)
sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
sock.bind(address)
sock.listen(5)
selector.register(sock, EVENT_READ, recv_client)
try:
while True:
events = selector.select()
for key, mask in events:
callback = key.data
callback(key, mask)
except KeyboardInterrupt:
sock.close()
if __name__ == '__main__':
echo_server(('',25000))
驗證一下:
# terminal 1
$ nc localhost 25000
hello world
hello world
# terminal 2
$ nc localhost 25000
hello world
hello world
現在知道:
- 完成 異步的代碼 不一定要用 async/await ,使用了 async/await 的代碼也不一定能做到異步,
- async/await 是協程的文法糖,使協程之間的調用變得更加清晰,
- 使用 async 修飾的函數調用時會傳回一個協程對象,
- await 隻能放在 async 修飾的函數裡面使用,await 後面必須要跟着一個 協程對象 或 Awaitable,
- await 的目的是等待協程控制流的傳回,而實作暫停并挂起函數的操作是yield。
async/await 以及 協程 是Python未來實作異步程式設計的趨勢,我們将會在更多的地方看到他們的身影,例如協程庫的 curio 和 trio,web 架構的 sanic,資料庫驅動的 asyncpg 等等。在Python 3主導的今天,作為開發者,應該及時擁抱和适應新的變化,而基于async/await的協程憑借良好的可讀性和易用性日漸登上舞台,看到這裡,你還不趕緊上車?
Python 子產品 asyncio – 協程之間的同步
Python 子產品 asyncio – 協程之間的同步:https://www.quxihuan.com/posts/python-module-asyncio-synchronization/
await 和 yield from
Python3.3 的 yield from 文法可以把生成器的操作委托給另一個生成器,生成器的調用方可以直接與子生成器進行通信:
def sub_gen():
yield 1
yield 2
yield 3
def gen():
return (yield from sub_gen())
def main():
for val in gen():
print(val)
# 1
# 2
# 3
利用這一特性,使用 yield from 能夠編寫出類似協程效果的函數調用,在3.5之前,asyncio 正是使用@asyncio.coroutine 和 yield from 文法來建立協程:https://docs.python.org/3.4/library/asyncio-task.html
@asyncio.coroutine
def compute(x, y):
print("Compute %s + %s ..." % (x, y))
yield from asyncio.sleep(1.0)
return x + y
@asyncio.coroutine
def print_sum(x, y):
result = yield from compute(x, y)
print("%s + %s = %s" % (x, y, result))
loop = asyncio.get_event_loop()
loop.run_until_complete(print_sum(1, 2))
loop.close()
然而,用 yield from 容易在表示協程和生成器中混淆,沒有良好的語義性,是以在 Python 3.5 推出了更新的 async/await 表達式來作為協程的文法。
是以類似以下的調用是等價的:
async with lock:
...
with (yield from lock):
...
######################
def main():
return (yield from coro())
def main():
return (await coro())
那麼,怎麼把生成器包裝為一個協程對象呢?這時候可以用到 types 包中的 coroutine 裝飾器(如果使用asyncio做驅動的話,那麼也可以使用 asyncio 的 coroutine 裝飾器),@types.coroutine裝飾器會将一個生成器函數包裝為協程對象:
import asyncio
import types
@types.coroutine
def compute(x, y):
print("Compute %s + %s ..." % (x, y))
yield from asyncio.sleep(1.0)
return x + y
async def print_sum(x, y):
result = await compute(x, y)
print("%s + %s = %s" % (x, y, result))
loop = asyncio.get_event_loop()
loop.run_until_complete(print_sum(1, 2))
loop.close()
盡管兩個函數分别使用了新舊文法,但他們都是協程對象,也分别稱作 native coroutine 以及 generator-based coroutine,是以不用擔心文法問題。
下面觀察一個 asyncio 中 Future 的例子:
import asyncio
future = asyncio.Future()
async def test_1():
await asyncio.sleep(1)
future.set_result('data')
async def test_2():
print(await future)
loop = asyncio.get_event_loop()
tasks_list = [test_1(), test_2()]
loop.run_until_complete(asyncio.wait(tasks_list))
loop.close()
兩個協程在事件循環中,協程 test_1 在執行第一句後挂起自身切到 asyncio.sleep,而協程 test_2 一直等待 future 的結果,讓出事件循環,計時器結束後 test_1 執行第二句并設定了 future 的值,被挂起的 test_2 恢複執行,列印出 future 的結果 'data' 。
future 可以被 await 證明了 future 對象是一個 Awaitable,進入 Future 類的源碼可以看到有一段代碼顯示了 future 實作了__await__ 協定:
class Future:
...
def __iter__(self):
if not self.done():
self._asyncio_future_blocking = True
yield self # This tells Task to wait for completion.
assert self.done(), "yield from wasn't used with future"
return self.result() # May raise too.
if compat.PY35:
__await__ = __iter__ # make compatible with 'await' expression
當執行 await future 這行代碼時,future中的這段代碼就會被執行,首先future檢查它自身是否已經完成,如果沒有完成,挂起自身,告知目前的 Task(任務)等待 future 完成。
當 future 執行 set_result 方法時,會觸發以下的代碼,設定結果,标記 future 已經完成:
def set_result(self, result):
...
if self._state != _PENDING:
raise InvalidStateError('{}: {!r}'.format(self._state, self))
self._result = result
self._state = _FINISHED
self._schedule_callbacks()
最後 future 會排程自身的回調函數,觸發 Task._step() 告知 Task 驅動 future 從之前挂起的點恢複執行,不難看出,future 會執行下面的代碼:
class Future:
...
def __iter__(self):
...
assert self.done(), "yield from wasn't used with future"
return self.result() # May raise too.
最終傳回結果給調用方。
Yield from
調用協程 的方式有有很多,yield from 就是其中的一種。這種方式在 Python3.3 中被引入,在 Python3.5 中以 async/await 的形式進行了優化。yield from 表達式的使用方式如下:
import asyncio
@asyncio.coroutine
def get_json(client, url):
file_content = yield from load_file('/Users/scott/data.txt')
正如所看到的,yield from 被使用在用 @asyncio.coroutine 裝飾的函數内,如果想把 yield from 在這個函數外使用,将會抛出如下文法錯誤:
File "main.py", line 1
file_content = yield from load_file('/Users/scott/data.txt')
^
SyntaxError: 'yield' outside function
為了避免文法錯誤,yield from 必須在調用函數的内部使用(這個調用函數通常被裝飾為協程)。
Async / await
較新的文法是使用 async/await 關鍵字。 async 從 Python3.5 開始被引進,跟 @asyncio.coroutine 裝飾器一樣,用來聲明一個函數是一個協程。隻要把它放在函數定義之前,就可以應用到函數上,使用方式如下:
async def ping_server(ip):
# ping code here...
實際調用這個函數時,使用 await 而不用 yield from ,當然,使用方式依然差不多:
async def ping_local(ip):
return await ping_server('192.168.1.1')
再強調一遍,跟 yield from 一樣,不能在函數外部使用 await ,否則會抛出文法錯誤。 (譯者注: async 用來聲明一個函數是協程,然後使用 await調用這個協程, await 必須在函數内部,這個函數通常也被聲明為另一個協程)
Python3.5 對這兩種調用協程的方法都提供了支援,但是推薦 async/await 作為首選。
Event Loop
如果你還不知道如何開始和操作一個 Eventloop ,那麼上面有關協程所說的都起不了多大作用。 Eventloop 在執行異步函數時非常重要,重要到隻要執行協程,基本上就得利用 Eventloop 。
Eventloop 提供了相當多的功能:
- 注冊,執行 和 取消 延遲調用(異步函數)
- 建立 用戶端 與 服務端 傳輸用于通信
- 建立 子程式 和 通道 跟 其他的程式 進行通信
- 指定 函數 的 調用 到 線程池
Eventloop 有相當多的配置和類型可供使用,但大部分程式隻需要如下方式預定函數即可:
import asyncio
async def speak_async():
print('OMG asynchronicity!')
loop = asyncio.get_event_loop()
loop.run_until_complete(speak_async())
loop.close()
有意思的是代碼中的最後三行,首先擷取預設的 Eventloop ( asyncio.get_event_loop() ),然後預定和運作異步任務,并在完成後結束循環。
loop.run_until_complete() 函數實際上是阻塞性的,也就是在所有異步方法完成之前,它是不會傳回的。但因為我們隻在一個線程中運作這段代碼,它沒法再進一步擴充,即使循環仍在運作。
可能你現在還沒覺得這有多大的用處,因為我們通過調用其他 IO 來結束 Eventloop 中的阻塞(譯者注:也就是在阻塞時進行其他 IO ),但是想象一下,如果在網頁伺服器上,把整個程式都封裝在異步函數内,到時就可以同時運作多個異步請求了。
也可以将 Eventloop 的線程中斷,利用它去處理所有耗時較長的 IO 請求,而主線程處理程式邏輯或者使用者界面。
一個案例
讓我們實際操作一個稍大的案例。下面這段代碼就是一個非常漂亮的異步程式,它先從 Reddit 抓取 JSON 資料,解析它,然後列印出當天來自 /r/python,/r/programming 和 /r/compsci 的置頂帖。
所示的第一個方法 get_json() ,由 get_reddit_top() 調用,然後隻建立一個 GET 請求到适當的網址。當這個方法和 await 一起調用後, Eventloop 便能夠繼續為其他的協程服務,同時等待 HTTP 響應達到。一旦響應完成, JSON 資料就傳回到 get_reddit_top() ,得到解析并列印出來。
import signal
import sys
import asyncio
import aiohttp
import json
loop = asyncio.get_event_loop()
client = aiohttp.ClientSession(loop=loop)
async def get_json(client, url):
async with client.get(url) as response:
assert response.status == 200
return await response.read()
async def get_reddit_top(subreddit, client):
data1 = await get_json(client, 'https://www.reddit.com/r/' + subreddit + '/top.json?sort=top&t=day&limit=5')
j = json.loads(data1.decode('utf-8'))
for i in j['data']['children']:
score = i['data']['score']
title = i['data']['title']
link = i['data']['url']
print(str(score) + ': ' + title + ' (' + link + ')')
print('DONE:', subreddit + '\n')
def signal_handler(signal, frame):
loop.stop()
client.close()
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
asyncio.ensure_future(get_reddit_top('python', client))
asyncio.ensure_future(get_reddit_top('programming', client))
asyncio.ensure_future(get_reddit_top('compsci', client))
loop.run_forever()
注意,如果多次運作這段代碼,列印出來的 subreddit 資料在順序上會有些許變化。這是因為每當我們調用一次代碼都會釋放對線程的控制,容許線程去處理另一個 HTTP 調用。這将導緻誰先獲得響應,誰就先列印出來。
結論
即使 Python 内置的異步操作沒有 Javascript 那麼順暢,但這并不意味着就不能用它來把應用變得更有趣、更有效率。隻要花半個小時的時間去了解它的來龍去脈,你就會感覺把異步操作應用到你的程式中将會是多美好的一件事。
aiohttp
asyncio
可以實作單線程并發IO操作。如果僅用在用戶端,發揮的威力不大。如果把
asyncio
用在伺服器端,例如Web伺服器,由于HTTP連接配接就是IO操作,是以可以用 單線程 +
coroutine
實作多使用者的高并發支援。
asyncio
實作了TCP、UDP、SSL等協定,
aiohttp
則是基于
asyncio
實作的 HTTP 架構。
我們先安裝
aiohttp
:pip install aiohttp
然後編寫一個HTTP伺服器,分别處理以下URL:
-
- 首頁傳回/
b'
Index
;'
-
- 根據URL參數傳回文本/hello/{name}
。hello, %s!
代碼如下:
import asyncio
from aiohttp import web
async def index(request):
await asyncio.sleep(0.5)
return web.Response(body=b'<h1>Index</h1>')
async def hello(request):
await asyncio.sleep(0.5)
text = '<h1>hello, %s!</h1>' % request.match_info['name']
return web.Response(body=text.encode('utf-8'))
async def init(loop):
app = web.Application(loop=loop)
app.router.add_route('GET', '/', index)
app.router.add_route('GET', '/hello/{name}', hello)
srv = await loop.create_server(app.make_handler(), '127.0.0.1', 8000)
print('Server started at http://127.0.0.1:8000...')
return srv
loop = asyncio.get_event_loop()
loop.run_until_complete(init(loop))
loop.run_forever()
注意
aiohttp
的初始化函數
init()
也是一個
coroutine
,
loop.create_server()
則利用
asyncio
建立TCP服務。
參考源碼:aio_web.py :https://github.com/michaelliao/learn-python3/blob/master/samples/async/aio_web.py
一切從爬蟲開始
【續篇】Python 協程之從放棄到死亡再到重生:https://www.secpulse.com/archives/64912.html
從一個簡單的爬蟲開始,這個爬蟲很簡單,通路指定的URL,并且擷取内容并計算長度,這裡我們給定5個URL。第一版的代碼十分簡單,順序擷取每個URL的内容,當第一個請求完成、計算完長度後,再開始第二個請求。
spider_normal.py
# filename: spider_normal.py
import time
import requests
targets = [
"https://lightless.me/archives/python-coroutine-from-start-to-boom.html",
"https://github.com/aio-libs",
"https://www.python.org/dev/peps/pep-0380/",
"https://www.baidu.com/",
"https://www.zhihu.com/",
]
def spider():
results = {}
for url in targets:
r = requests.get(url)
length = len(r.content)
results[url] = length
return results
def show_results(results):
for url, length in results.items():
print("Length: {:^7d} URL: {}".format(length, url))
def main():
start_time = time.time()
results = spider()
print("Use time: {:.2f}s".format(time.time() - start_time))
show_results(results)
if __name__ == '__main__':
main()
我們多運作幾次看看結果。
大約需要花費14-16秒不等,這段代碼并沒有什麼好看的,我們把關注點放在後面的代碼上。現在我們使用多線程來改寫這段代碼。
# filename: spider_thread.py
import time
import threading
import requests
final_results = {}
targets = [
"https://lightless.me/archives/python-coroutine-from-start-to-boom.html",
"https://github.com/aio-libs",
"https://www.python.org/dev/peps/pep-0380/",
"https://www.baidu.com/",
"https://www.zhihu.com/",
]
def show_results(results):
for url, length in results.items():
print("Length: {:^7d} URL: {}".format(length, url))
def spider(url):
r = requests.get(url)
length = len(r.content)
final_results[url] = length
def main():
ts = []
start_time = time.time()
for url in targets:
t = threading.Thread(target=spider, args=(url,))
ts.append(t)
t.start()
for t in ts:
t.join()
print("Use time: {:.2f}s".format(time.time() - start_time))
show_results(final_results)
if __name__ == '__main__':
main()
我們多運作幾次看看結果。
從這兩段代碼中,已經可以看出并發對于處理任務的好處了,但是使用原生的
threading
子產品還是略顯麻煩,Python已經給我們内置了一個處理并發任務的庫
concurrent
,我們借用這個庫修改一下我們的代碼,之是以修改成這個庫的原因還有一個,那就是引出我們後面會談到的
Future
。
# filename: spider_thread.py
import time
from concurrent import futures
import requests
final_results = {}
targets = [
"https://lightless.me/archives/python-coroutine-from-start-to-boom.html",
"https://github.com/aio-libs",
"https://www.python.org/dev/peps/pep-0380/",
"https://www.baidu.com/",
"https://www.zhihu.com/",
]
def show_results(results):
for url, length in results.items():
print("Length: {:^7d} URL: {}".format(length, url))
def spider(url):
r = requests.get(url)
length = len(r.content)
final_results[url] = length
return True
def main():
start_time = time.time()
with futures.ThreadPoolExecutor(10) as executor:
res = executor.map(spider, targets)
print("Use time: {:.2f}s".format(time.time() - start_time))
show_results(final_results)
if __name__ == '__main__':
main()
執行一下,會發現耗時與上一個版本一樣,穩定在10s左右。
可以看到我們調用了
concurrent
庫中的
futures
,那麼到底什麼是
futures
?簡單的講,這個對象代表一種異步的操作,可以表示為一個需要延時進行的操作,當然這個操作的狀态可能已經完成,也有可能尚未完成,如果你寫JS的話,可以了解為是類似
Promise
的對象。在Python中,标準庫中其實有兩個
Future
類,一個是
concurrent.futures.Future
,另外一個是
asyncio.Future
,這兩個類很類似,不完全相同,這些實作差異以及API的差異我們先按下暫且不談,有興趣的同學可以參考下相關的文檔。
Future
是我們後面讨論的
asyncio
異步程式設計的基礎,是以這裡多說兩句。
Future
代表的是一個未來的某一個時刻一定會執行的操作(可能已經執行完成了,但是無論如何他一定有一個确切的運作時間),一般情況下使用者無需手動從零開始建立一個Future,而是應當借助架構中的API生成。比如調用
concurrent.futures.Executor.submit()
時,架構會為"異步操作"進行一個排期,來決定何時運作這個操作,這時候就會生成一個
Future
對象。
現在,我們來看看如何使用
asyncio
進行異步程式設計,與多線程程式設計不同的是,多個協程總是運作在同一個線程中的,一旦其中的一個協程發生阻塞行為,那麼整個線程都被阻塞,進而所有的協程都無法繼續運作。
asyncio.Future
和
asyncio.Task
都可以看做是一個異步操作,後者是前者的子類,
BaseEventLoop.create_task()
會接收一個協程作為參數,并且對這個任務的運作時間進行排期,傳回一個
asyncio.Task
類的執行個體,這個對象也是對于協程的一層包裝。如果想擷取
asyncio.Future
的執行結果,應當使用
yield from
來擷取,這樣控制權會被自動交還給EventLoop,我們無需處理"等待
Future
或
Task
運作完成"這個操作。于是就有了一個很愉悅的程式設計方式,如果一個函數A是協程、或傳回
Task
或
Future
的執行個體的函數,就可以通過
result = yield from A()
來擷取傳回值。下面我們就使用
asyncio
和
aiohttp
來改寫我們的爬蟲。
import asyncio
import time
import aiohttp
final_results = {}
targets = [
"https://lightless.me/archives/python-coroutine-from-start-to-boom.html",
"https://github.com/aio-libs",
"https://www.python.org/dev/peps/pep-0380/",
"https://www.baidu.com/",
"https://www.zhihu.com/",
]
def show_results(results):
for url, length in results.items():
print("Length: {:^7d} URL: {}".format(length, url))
async def get_content(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
content = await resp.read()
return len(content)
async def spider(url):
length = await get_content(url)
final_results[url] = length
return True
def main():
loop = asyncio.get_event_loop()
cor = [spider(url) for url in targets]
start_time = time.time()
result = loop.run_until_complete(asyncio.gather(*cor))
print("Use time: {:.2f}s".format(time.time() - start_time))
show_results(final_results)
print("loop result: ", result)
if __name__ == '__main__':
main()
結果非常驚人
這裡可能有同學會問為什麼沒看到
yield from
以及
@asyncio.coroutine
,那是因為在Python3.5以後,增加了
async def
和
awiat
文法,等效于
@asyncio.coroutine
和
yield from
,詳情可以參考上一篇文章。在
main()
函數中,我們先擷取一個可用的事件循環,緊接着将生成好的協程任務添加到這個循環中,并且等待執行完成。在每個
spider()
中,執行到
await
的時候,會交出控制權(如果不明白請向前看一下委托生成器的部分),并且切到其他的協程繼續運作,等到
get_content()
執行完成傳回後,那麼會恢複
spider()
協程的執行。
get_content()
函數中隻是通過
async with
調用
aiohttp
庫的最基本方法擷取頁面内容,并且傳回了長度,僅此而已。
在修改為協程版本後,爬蟲性能有了巨大的提升,從最初了15s,到10s,再到現在的2s左右,簡直是質的飛躍。這隻是一個簡單的爬蟲程式,相比多線程,性能提高了近5倍,如果是其他更加複雜的大型程式,也許性能提升會更多。
asyncio
這套異步程式設計架構,通過簡單的事件循環以及協程機制,在需要等待的情況下主動交出控制權,切換到其他協程進行運作。到這裡就會有人問,為什麼要将
requests
替換為
aiohttp
,能不能用
requests
?答案是不能,還是我們前面提到過的,在協程中,一切操作都要避免阻塞,禁止所有的阻塞型調用,因為所有的協程都是運作在同一個線程中的!
requests
庫是阻塞型的調用,當在等待I/O時,并不能将控制權轉交給其他協程,甚至還會将目前線程阻塞,其他的協程也無法運作。如果你在異步程式設計的時候需要用到一些其他的異步元件,可以到https://github.com/aio-libs/這裡找找,也許就有你需要的異步庫。
關于
asyncio
的異步程式設計資料目前來說還不算很多,官方文檔應該算是相當不錯的參考文獻了,其中非常推薦的兩部分是:Develop with asyncio和Tasks and coroutines,各位同學有興趣的話可以自行閱讀。
asyncio
這個異步架構中包含了非常多的内容,甚至還有
TCP Server/Client
的相關内容,如果想要掌握
asyncio
這個異步程式設計架構,還需要多加練習。順帶一提,
asyncio
非常容易與其他的架構整合,例如
tornado
已經有實作了
asyncio.AbstractEventLoop
的接口的類
AsyncIOMainLoop
,還有人将
asyncio
內建到QT的事件循環中了,可以說是非常的靈活了。
Python 協程總結
Python 之是以能夠處理網絡 IO 高并發,是因為借助了高效的IO模型,能夠最大限度的排程IO,然後事件循環使用協程處理IO,協程遇到IO操作就将控制權抛出,那麼在IO準備好之前的這段事件,事件循環就可以使用其他的協程處理其他事情,然後協程在使用者空間,并且是單線程的,是以不會像多線程,多程序那樣頻繁的上下文切換,因而能夠節省大量的不必要性能損失。
注: 不要再協程裡面使用time.sleep之類的同步操作,因為協程再單線程裡面,是以會使得整個線程停下來等待,也就沒有協程的優勢了
了解
協程,又稱為微線程,看上去像是子程式,但是它和子程式又不太一樣,它在執行的過程中,可以在中斷目前的子程式後去執行别的子程式,再傳回來執行之前的子程式,但是它的相關資訊還是之前的。
優點:
- 極高的執行效率,因為子程式切換而不是線程切換,沒有了線程切換的開銷;
- 不需要多線程的鎖機制,因為隻有一個線程在執行;
如果要充分利用CPU多核,可以通過使用多程序+協程的方式
使用
打開 asyncio 的源代碼,可以發現asyncio中的需要用到的檔案如下:
下面的則是接下來要總結的檔案
檔案 | 解釋 |
---|---|
base_events | 基礎的事件,提供了BaseEventLoop事件 |
coroutines | 提供了封裝成協程的類 |
events | 提供了事件的抽象類,比如BaseEventLoop繼承了AbstractEventLoop |
futures | 提供了Future類 |
tasks | 提供了Task類和相關的方法 |
coroutines
函數 | 解釋 |
---|---|
coroutine(func) | 為函數加上裝飾器 |
iscoroutinefunction(func) | 判斷函數是否使用了裝飾器 |
iscoroutine(obj) | 判斷該對象是否是裝飾器 |
如果在函數使用了
coroutine
裝飾器,就可以通過
yield from
去調用
async def
聲明的函數,如果已經使用
async def
聲明,就沒有必要再使用裝飾器了,這兩個功能是一樣的。
import asyncio
@asyncio.coroutine
def hello_world():
print("Hello World!")
async def hello_world2():
print("Hello World2!")
print('------hello_world------')
print(asyncio.iscoroutinefunction(hello_world))
print('------hello_world2------')
print(asyncio.iscoroutinefunction(hello_world2))
print('------event loop------')
loop = asyncio.get_event_loop()
# 一直阻塞該函數調用到函數傳回
loop.run_until_complete(hello_world())
loop.run_until_complete(hello_world2())
loop.close()
上面的代碼分别使用到了
coroutine
裝飾器和
async def
,其運作結果如下:
------hello_world------
True
------hello_world2------
True
------event loop------
Hello World!
Hello World2!
注意:不可以直接調用協程,需要一個
event loop
去調用。
如果想要在一個函數中去得到另外一個函數的結果,可以使用
yield from
或者
await
,例子如下:
import asyncio
async def compute(x, y):
print("Compute %s + %s ..." % (x, y))
await asyncio.sleep(1.0)
return x + y
async def print_sum(x, y):
result = await compute(x, y)
print("%s + %s = %s" % (x, y, result))
loop = asyncio.get_event_loop()
loop.run_until_complete(print_sum(1, 2))
loop.close()
函數
print_sum
會一直等到函數
compute
傳回結果,執行過程如下:
base_events
這個檔案裡面漏出來的隻有
BaseEventLoop
一個類,它的相關方法如下:
函數 | 解釋 |
---|---|
create_future() | 建立一個future對象并且綁定到事件上 |
create_task() | 建立一個任務 |
run_forever() | 除非調用stop,否則事件會一直運作下去 |
run_until_complete(future) | 直到 future 對象執行完畢,事件才停止 |
stop() | 停止事件 |
close() | 關閉事件 |
is_closed() | 判斷事件是否關閉 |
time() | 傳回事件運作時的時間 |
call_later(delay, callback, *args) | 設定一個回調函數,并且可以設定延遲的時間 |
call_at(when, callback, *args) | 同上,但是設定的是絕對時間 |
call_soon(callback, *args) | 馬上調用 |
events
函數 | 解釋 |
---|---|
get_event_loop() | 傳回一個異步的事件 |
... | ... |
傳回的就是BaseEventLoop的對象。
future
Future類的相關方法如下:
方法 | 解釋 |
---|---|
cancel() | 取消掉future對象 |
cancelled() | 傳回是否已經取消掉 |
done() | 如果future已經完成則傳回true |
result() | 傳回future執行的結果 |
exception() | 傳回在future中設定了的exception |
add_done_callback(fn) | 當future執行時執行回調函數 |
remove_done_callback(fn) | 删除future的所有回調函數 |
set_result(result) | 設定future的結果 |
set_exception(exception) | 設定future的異常 |
設定 future 的例子如下:
import asyncio
async def slow_operation(future):
await asyncio.sleep(1) # 睡眠
future.set_result('Future is done!') # future設定結果
loop = asyncio.get_event_loop()
future = asyncio.Future() # 建立future對象
asyncio.ensure_future(slow_operation(future)) # 建立任務
loop.run_until_complete(future) # 阻塞直到future執行完才停止事件
print(future.result())
loop.close()
run_until_complete
方法在内部通過調用了future的
add_done_callback
,當執行future完畢的時候,就會通知事件。
下面這個例子則是通過使用future的
add_done_callback
方法實作和上面例子一樣的效果:
import asyncio
async def slow_operation(future):
await asyncio.sleep(1)
future.set_result('Future is done!')
def got_result(future):
print(future.result())
loop.stop() # 關閉事件
loop = asyncio.get_event_loop()
future = asyncio.Future()
asyncio.ensure_future(slow_operation(future))
future.add_done_callback(got_result) # future執行完畢就執行該回調
try:
loop.run_forever()
finally:
loop.close()
一旦
slow_operation
函數執行完畢的時候,就會去執行
got_result
函數,裡面則調用了關閉事件,是以不用擔心事件會一直執行。
task
Task類是Future的一個子類,也就是Future中的方法,task都可以使用,類方法如下:
方法 | 解釋 |
---|---|
current_task(loop=None) | 傳回指定事件中的任務,如果沒有指定,則預設目前事件 |
all_tasks(loop=None) | 傳回指定事件中的所有任務 |
cancel() | 取消任務 |
并行執行三個任務的例子:
import asyncio
async def factorial(name, number):
f = 1
for i in range(2, number + 1):
print("Task %s: Compute factorial(%s)..." % (name, i))
await asyncio.sleep(1)
f *= i
print("Task %s: factorial(%s) = %s" % (name, number, f))
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.gather(
factorial("A", 2),
factorial("B", 3),
factorial("C", 4),
))
loop.close()
執行結果為
Task A: Compute factorial(2)...Task B: Compute factorial(2)...Task C: Compute factorial(2)...Task A: factorial(2) = 2Task B: Compute factorial(3)...Task C: Compute factorial(3)...Task B: factorial(3) = 6Task C: Compute factorial(4)...Task C: factorial(4) = 24
可以發現,ABC同時執行,直到future執行完畢才退出。
下面一些方法是和task相關的方法
方法 | 解釋 |
---|---|
as_completed(fs, *, loop=None, timeout=None) | 傳回是協程的疊代器 |
ensure_future(coro_or_future, *, loop=None) | 排程執行一個 coroutine object:并且它封裝成future。傳回任務對象 |
async(coro_or_future, *, loop=None) | 丢棄的方法,推薦使用ensure_future |
wrap_future(future, *, loop=None) | Wrap a concurrent.futures.Future object in a Future object. |
gather(*coros_or_futures, loop=None, return_exceptions=False) | 從給定的協程或者future對象數組中傳回future彙總的結果 |
sleep(delay, result=None, *, loop=None) | 建立一個在給定時間(以秒為機關)後完成的協程 |
shield(arg, *, loop=None) | 等待future,屏蔽future被取消 |
wait(futures, *, loop=None, timeout=None, return_when=ALL_COMPLETED) | 等待由序列futures給出的Futures和協程對象完成。協程将被包裹在任務中。傳回含兩個集合的Future:(done,pending) |
wait_for(fut, timeout, *, loop=None) | 等待單個Future或coroutine object完成逾時。如果逾時為None,則阻止直到future完成 |