概念
協程(Coroutine):是單線程下的并發,又稱微線程,纖程。
協程是一種使用者态的輕量級線程,即協程是由使用者程式自己控制排程的,實作單線程下的并發。
子程式就是協程的一種特例
子程式的概念:程式可以互相調用,就是說,在一段程式a的中間,可以設定它執行另外的一段程式b,然後再回來繼續執行本段程式a後邊的部分,在其中調用的那另外的一段程式b就是這一程式a的子程式。
注意:
python的線程屬于核心級别的,即由作業系統控制排程的搶占式多任務(如單線程遇到io或執行時間過長就會被迫交出cpu執行權限,切換其他線程運作)
單線程内開啟協程,一旦遇到io,就會從應用程式級别(而非作業系統)控制切換,以此來提升效率(非io操作的切換與效率無關)
優點:
協程的切換開銷更小,屬于程式級别的切換,作業系統完全感覺不到,因而更加輕量級
不需要多線程的鎖機制,因而效率更高
單線程内就可以實作并發的效果,最大限度地利用cpu
缺點:
協程的本質是單線程下,無法利用多核,但可以通過開啟多程序,每個程序内開啟多個線程,每個線程内開啟協程
協程指的是單個線程,因而一旦協程出現阻塞,将會阻塞整個線程
協程特點:
必須在隻有一個單線程裡實作并發
修改共享資料不需加鎖
協程擁有自己的寄存器上下文和棧
一個協程遇到IO操作自動切換到其它協程
實作原理:
協程排程切換時,将寄存器上下文和棧儲存到其他地方,在切回來的時候,恢複先前儲存的寄存器上下文和棧。是以協程能保留上一次調用時的狀态,即所有局部狀态的一個特定組合,每次過程重入時,就相當于進入上一次調用的狀态。
Python對協程的支援是通過generator實作的。
在generator中,我們不但可以通過for循環來疊代,還可以不斷調用next()函數擷取由yield語句傳回的下一個值。
但是Python的yield不但可以傳回一個值,它還可以接收調用者發出的參數。
def consumer():
r = ''
while True:
n = yield r
if not n:
return
print('[CONSUMER] Consuming%s...' % n)
r = '200 OK'
def produce(c):
c.send(None)
n = 0
while n < 5:
n = n + 1
print('[PRODUCER] Producing%s...' % n)
r = c.send(n)
print('[PRODUCER] Consumer return:%s' % r)
c.close()
c = consumer()
produce(c)
子產品:
python中對于協程有兩個子產品,greenlet和gevent,但更适用于Unix/Linux下
Greenlet(greenlet的執行順序需要我們手動控制)
gevent(自動切換,由于切換是在IO操作時自動完成,是以gevent需要修改Python自帶的一些标準庫,這一過程在啟動時通過monkey patch完成)
asyncio
gevent多被用在python3以前,對windows的相容性不佳,而且這幾年都沒更新了,是以就使用asyncio子產品來實作協程
python的asyncio子產品可以實作異步網絡操作、并發、協程。當一個任務需要等待IO結果的時候,可以挂起目前任務,轉而去執行其他任務。類似于JS單線程,通過Promise對象,async和wait方法來實作異步執行。使用await可以将耗時等待的操作挂起,讓出控制權,進而實作異步處理。如果一個對象可以在 await語句中使用,那麼它就是 可等待對象 。
可等待對象有三種主要類型: 協程(coroutine), 任務(task) 和 Future
協程函數: 定義形式為 async def 的函數;
協程對象: 調用協程函數所傳回的對象
import asyncio
async def do_some_work(x):
print("waiting:",x)
# 當協程執行的時候遇到 await,時間循環就會将本協程挂起,轉而去執行别的協程,直到其他的協程挂起或執行完畢
await asyncio.sleep(x)
return "Done after {}s".format(x)
def callback(task):
print('Status:', task.result())
# 使用async關鍵字定義的函數就是一個協程對象
coroutine = do_some_work(2) #
# 建立一個事件loop
loop = asyncio.get_event_loop() #<_WindowsSelectorEventLoop running=False closed=False debug=False>
# stop(),run_forever(),close(),...
# 建立一個task對象,task是Future類的子類,對 coroutine 對象的進一步封裝。
# task對象儲存了協程運作後的狀态(running、finished),用于未來擷取協程的結果。
# 三種建立task的方式:
task = loop.create_task(coroutine) #
task = asyncio.ensure_future(coroutine) #低層級
task = asyncio.create_task(coroutine) #高層級,python3.7+
# 為task綁定回調方法
task.add_done_callback(callback)
# cancelled(),done(),result(),exception(),remove_done_callback(callback),...
# 将協程注冊到事件循環 loop 中,然後啟動
loop.run_until_complete(task) # 與task.result()的結果一緻
# 如果沒有顯式地聲明task,那麼該方法會将coroutine封裝成task對象 #loop.run_until_complete(coroutine)
多任務并行
coroutine asyncio.wait(aws, *, loop=None, timeout=None, return_when=ALL_COMPLETED)
并發運作 aws 指定的 可等待對象 ,并阻塞線程直到滿足 return_when 指定的條件(FIRST_COMPLETED,FIRST_EXCEPTION,ALL_COMPLETED),傳回兩個 Task/Future 集合: (done, pending)。
# 将任務先放置在任務隊列中,然後将任務隊列傳給 `asynicio.wait` 方法,就可以同時并行運作隊列中的任務
tasks = [asyncio.ensure_future(do_some_work(i)) for i in range(5)]
loop = asyncio.get_event_loop()
dones, pendings = loop.run_until_complete(asyncio.wait(tasks))
for task in dones:
print("Task ret:", task.result())
嵌套協程
即一個協程中await了另外一個協程
async def main():
tasks = [asyncio.ensure_future(do_some_work(i)) for i in range(5)]
return await asyncio.gather(*tasks) #按清單順序傳回
# return await asyncio.wait(tasks) #不按順序
loop = asyncio.get_event_loop()
results = loop.run_until_complete(main())
for result in results:
print("Task ret:",result)
async def main():
tasks = [asyncio.ensure_future(do_some_work(i)) for i in range(5)]
for task in asyncio.as_completed(tasks): #as_complete
result = await task
print("Task ret: {}".format(result))
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
協程停止
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(asyncio.wait(tasks))
except KeyboardInterrupt as e:
print(asyncio.Task.all_tasks())
for task in asyncio.Task.all_tasks():
print(task.cancel())
loop.stop()
loop.run_forever()
finally:
loop.close()
aiohttp
僅僅将涉及 IO 操作的代碼封裝到 async 修飾的方法裡面是不可行的,不是真正的異步執行!我們必須要使用支援異步操作的請求方式才可以實作真正的異步,是以這裡就需要 aiohttp 派上用場了
asyncio可以實作單線程并發IO操作。如果僅用在用戶端,發揮的威力不大。如果把asyncio用在伺服器端,例如Web伺服器,由于HTTP連接配接就是IO操作,是以可以用單線程+coroutine實作多使用者的高并發支援。
asyncio實作了TCP、UDP、SSL等協定,aiohttp則是基于asyncio實作的HTTP架構。
# 用aiohttp封裝好的阻塞API,異步運作一個簡單的伺服器
import asyncio
from aiohttp import web
routes = web.RouteTableDef()
@routes.get('/')
async def index(request):
await asyncio.sleep(0.5)
return web.Response(body=b'
Hello Word!
', content_type='text/html', charset='utf-8')
@routes.get('/data')
async def json_data(request):
await asyncio.sleep(2)
return web.json_response({
'name': 'start a server',
'info':{
"author":"MerelySmile",
"date":"2019"
}
})
def init():
app = web.Application()
app.add_routes(routes)
web.run_app(app)
# run_app()提供了一個簡單的阻塞API來運作Application,用于異步啟動Application
# 監聽http://0.0.0.0:8080,server位址是localhost:8080
init()
# 原生的方法,執行個體化一個AppRunner
# 官方文檔:https://aiohttp.readthedocs.io/en/stable/web_advanced.html#aiohttp-web-app-runners
import asyncio
from aiohttp import web
async def index(request):
await asyncio.sleep(0.5)
return web.Response(body=b'
Index
', content_type='text/html', charset='utf-8')
async def hello(request):
await asyncio.sleep(0.5)
text = '
hello,%s!
' % request.match_info['name']
return web.Response(body=text.encode('utf-8'),content_type='text/html')
async def init():
app = web.Application()
app.router.add_get('/', index)
app.router.add_get('/hello/{name}', hello)
runner = web.AppRunner(app) # app.make_handler()方法過期了,改用APPrunner
await runner.setup()
site = web.TCPSite(runner, '127.0.0.1', 8000)
await site.start()
print('Server started at http://127.0.0.1:8000...')
return site
loop = asyncio.get_event_loop()
loop.run_until_complete(init())
loop.run_forever()
# 基本使用
import aiohttp
url = 'https://api.github.com/some/endpoint'
headers = {'content-type': 'application/json'}
conn = aiohttp.ProxyConnector(proxy="http://some.proxy.com")
async with aiohttp.ClientSession(connector=conn) as session:
async with session.get(url,headers=headers,timeout=30) as resp:
assert r.status == 200
print(await resp.text())
異步請求(asyncio+aiohttp)
# 一個簡單的執行個體
import asyncio
import aiohttp
async def get(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
result = await resp.text()
return result
async def request():
url = 'http://127.0.0.1:8080'
print('Waiting for', url)
result = await get(url)
print('Get response from', url, 'Result:', result)
tasks = [asyncio.ensure_future(request()) for _ in range(5)]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
多程序+協程
因為協程是一個線程執行,通過多程序+協程,既充分利用多核,又充分發揮協程的高效率,可獲得極高的性能。
使用aiomultiprocess子產品(python3.6+),基于 multiprocessing 和 asyncio 實作
看了一會源碼,debug了兩遍,暈了呀,瞎猜一下吧,大概就是通過multiprocessing來建立一個上下文Contexts,執行個體化Manager來共享狀态,使用Queue來實作程序間的通信。建立繼承自Process的工作程序PoolWorker,注冊loop來儲存每個worker,就相當于将協程注冊到事件循環 loop 中,然後再建立task來儲存協程的執行結果。點到為止了,真想要看懂的話,還需要去推敲一下multiprocessing.managers。
# 程序池 + 異步協程
import asyncio
import aiohttp
from aiomultiprocess import Pool
async def get(url):
print('Waiting for', url)
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
result = await resp.text()
print('Result:', result)
return result
async def request():
url = 'http://127.0.0.1:8080'
urls = [url for _ in range(5)]
async with Pool() as pool:
result = await pool.map(get, urls)
return result
if __name__ == '__main__':
coroutine = request()
task = asyncio.ensure_future(coroutine)
loop = asyncio.get_event_loop()
loop.run_until_complete(task)