天天看點

python協程實時輸出_Python協程&asyncio&異步程式設計

Python協程&asyncio&異步程式設計

1.協程

協程是微線程,是一種使用者态上下文切換技術,通過一個線程實作代碼塊互相切換執行

實作協程有這麼幾種方法:

greenlet,早期的子產品

yield 關鍵字

asyncio python3.4引入的

async、await關鍵字 python3.5 主流[推薦]

1.1 greenlet實作協程

pip install greenlet

# -*- coding: utf-8 -*-

from greenlet import greenlet

def func1():

print(1) # 第1步:輸出1

gr2.switch() # 第2步:跳到func2函數

print(2) # 第5步:輸出2

gr2.switch() # 第6步:跳到func2函數

def func2():

print(3) # 第3步:輸出3

gr1.switch() # 第4步:跳到func1函數

print(4) # 第7步:輸出4

gr1 = greenlet(func1)

gr2 = greenlet(func2)

gr1.switch() # 第1步:去執行func1函數

1.2 yield關鍵字

# -*- coding: utf-8 -*-

def func1():

yield 1

yield from func2()

yield 2

def func2():

yield 3

yield 4

f1 = func1()

for item in f1:

print(item)

1.3 asyncio

Python3.4以及之後

# -*- coding: utf-8 -*-

import asyncio

@asyncio.coroutine

def func1():

print(1)

yield from asyncio.sleep(2) # 遇到IO耗時操作時,自動切換到tasks中的其它任務

print(2)

@asyncio.coroutine

def func2():

print(3)

yield from asyncio.sleep(2) # 遇到IO耗時操作時,自動切換到tasks中的其它任務

print(4)

tasks = [

asyncio.ensure_future(func1()),

asyncio.ensure_future(func2()),

]

loop = asyncio.get_event_loop()

loop.run_until_complete(asyncio.wait(tasks))

注意:遇到IO阻塞自動切換

1.4 async和await關鍵字

Python3.5及以後

# -*- coding: utf-8 -*-

import asyncio

async def func1():

print(1)

await asyncio.sleep(2) # 遇到IO耗時操作時,自動切換到tasks中的其它任務

print(2)

async def func2():

print(3)

await asyncio.sleep(2) # 遇到IO耗時操作時,自動切換到tasks中的其它任務

print(4)

tasks = [

asyncio.ensure_future(func1()),

asyncio.ensure_future(func2()),

]

loop = asyncio.get_event_loop()

loop.run_until_complete(asyncio.wait(tasks))

1.5 一般用greenlet或者async&await關鍵字

2.協程的意義

在一個線程中遇到IO等待時間,不讓線程一直白白等待,而是讓線程利用空閑時間去點其它的事

# -*- coding: utf-8 -*-

import asyncio

import aiohttp

async def fetch(session, url):

print("發送請求")

async with session.get(url, verify_ssl=False) as response:

content = await response.content.read()

file_name = url.rsplit("/")[-1]

with open(file_name, mode="wb") as file_object:

file_object.write(content)

print("下載下傳完成", url)

async def main():

async with aiohttp.ClientSession() as session:

url_list = [

"http://ww1.sinaimg.cn/mw600/00745YaMgy1gedxxa59lyj30kk10p77m.jpg",

"http://ww1.sinaimg.cn/mw600/00745YaMgy1gedxrlhlhaj30kk0dpmxj.jpg",

"http://ww1.sinaimg.cn/mw600/00745YaMgy1gedxrlrw4tj30kk0pp78u.jpg"

]

tasks = [asyncio.create_task(fetch(session, url)) for url in url_list]

await asyncio.wait(tasks)

if __name__ == "__main__":

asyncio.run(main())

3.異步程式設計

3.1 事件循環

了解成為一個死循環

"""

# 僞代碼

任務清單 = [任務1, 任務2, 任務3,...]

while True:

可執行的任務清單,已完成的任務清單 = 去任務清單中檢查所有任務,将'可執行'和'已完成'的任務傳回

for 就緒任務 in 可執行的任務清單:

執行就緒任務

for 已完成任務 in 已完成的任務清單:

在任務清單中 移除已完成任務

任務清單中的任務都已完成 則終止循環

"""

import asyncio

# 去生成或擷取一個事件循環

loop = asyncio.get_event_loop()

# 把任務放到任務清單

loop.run_until_complete(asyncio.wait(tasks))

3.2 快速上手

協程函數 定義函數的時候 async def 函數名

協程對象 執行協程函數()得到協程對象

async def func():

pass

result = func()

注意:運作協程函數得到協程對象,函數内部代碼不會執行

要運作協程函數内部代碼,必須要将協程對象交給事件循環來處理

import asyncio

async def func():

print("Hello World")

result = func()

# 去生成或擷取一個事件循環

loop = asyncio.get_event_loop()

# 把任務放到任務清單

loop.run_until_complete(result)

# asyncio.run(result) Python3.7之後,這句可以代替上面兩句

3.3 await

await + 可等待對象(協程對象、Future、Task對象、-->IO等待)

示例1:

import asyncio

async def func():

print("start")

response = await asyncio.sleep(2)

print("end", response)

asyncio.run(func)

示例2:

# -*- coding: utf-8 -*-

import asyncio

async def others():

print("start")

await asyncio.sleep(2)

print("end")

return "傳回值"

async def func():

print("開始執行func")

response = await others()

print("IO請求結束,結果為: ", response)

asyncio.run(func())

示例3:

# -*- coding: utf-8 -*-

import asyncio

async def others():

print("start")

await asyncio.sleep(2)

print("end")

return "傳回值"

async def func():

print("開始執行func")

response1 = await others()

print("IO請求結束,結果為: ", response1)

response2 = await others()

print("IO請求結束,結果為: ", response2)

asyncio.run(func())

await就是等待對象的值得到對應的結果之後再繼續往下走

3.4 Task對象

在事件循環中添加多個任務

Tasks用語并發排程協程,通過asyncio.create_task(協程對象),這樣可以讓協程加入事件循環中等待被排程執行。除了使用asyncio.create_task(協程對象)還可以使用較低級的loop.create_task()或ensure_future()函數。不建議手動執行個體化Task對象。

注意:asyncio.create_task(協程對象)是在Python3.7時加入的。在Python3.7前可以使用較低級的asyncio.ensure_future()函數。

示例1:

import asyncio

async def func():

print("1")

await asyncio.sleep(2)

print("2")

return "傳回值"

async def main():

print("開始執行main")

# 建立Task對象,将目前執行func函數任務添加到事件循環

task1 = asyncio.create_task(func())

task2 = asyncio.create_task(func())

print("main結束")

# 當執行協程遇到IO操作時,會自動化切換到其它任務(task2)

# 此處的await是等待相對應的協程全部執行完畢并擷取結果

ret1 = await task1

ret2 = await task2

print(ret1, ret2)

asyncio.run(main())

示例2:

# -*- coding: utf-8 -*-

import asyncio

async def func():

print("1")

await asyncio.sleep(2)

print("2")

return "傳回值"

async def main():

print("開始執行main")

# 建立Task對象,将目前執行func函數任務添加到事件循環

task_list = [

asyncio.create_task(func(), name="n1"),

asyncio.create_task(func(), name="n2")

]

print("main結束")

# 當執行協程遇到IO操作時,會自動化切換到其它任務(task2)

# 此處的await是等待相對應的協程全部執行完畢并擷取結果

# done是集合,是上面兩個任務的傳回值

done, pending = await asyncio.wait(task_list, timeout=None)

print(done)

asyncio.run(main())

示例3:

import asyncio

async def func():

print("1")

await asyncio.sleep(2)

print("2")

return "傳回值"

task_list = [

func(),

func()

]

done, pending = asyncio.run(asyncio.wait(task_list))

print(done)

3.5 asyncio.Future對象

Task繼承Future,Task對象内部的await結果處理基于Future對象來着的

示例1:

import asyncio

async def main():

# 擷取目前事件循環

loop = asyncio.get_running_loop()

# 建立一個future對象,這個對象什麼都不幹

fut = asyncio.create_future()

# 等待任務最終結果(Future對象),沒有結果則會一直等下去

await fut

asyncio.run(main())

示例2:

import asyncio

async def set_after(fut):

await asyncio.sleep(2)

fut.set_result("success")

async def main():

# 擷取目前事件循環

loop = asyncio.get_running_loop()

# 建立一個future對象,這個對象什麼都不幹,沒綁定任何行為,則這個任務不知道什麼時候結束

fut = asyncio.create_future()

# 建立一個任務(Task對象),綁定了set_after函數,函數在2s之後,會給fut指派

# 即手動給fut設定結果,那麼fut就可以結束了

await loop.create_task(set_after(fut))

# 等待任務最終結果(Future對象),沒有結果則會一直等下去

data = await fut

print(data)

asyncio.run(main())

3.6 concurrent.future.Future對象

使用線程池或程序池實作異步操作時用到的對象

交叉使用,協程異步程式設計 + MySQL(不支援)[線程、程序做異步程式設計]

import time

import asyncio

import concurrent.futures

def func():

# 某個耗時操作

time.sleep(2)

return "success"

async def main():

loop = asyncio.get_running_loop()

# 1.Run in the default loop's excutor (預設ThreadPoolExecutor)

# 第一步:内部會先調用ThreadPoolExecutor的submit方法去線程池中申請一個線程去執行func函數,并傳回一個concurrent.futures.Future對象

# 第二步:調用asyncio.wrap_future将concurrent.futures.Future對象包裝為調用asyncio.Future對象

# 因為concurrent.futures.Future對象不支援await文法,需要包裝為asyncio.Future對象才能使用

fut = loop.run_in_excutor(None, func)

result = await fut

print("default thread pool", result)

# 2.Run in a custom thread pool

with concurrent.futures.ThreadPoolExecutor() as pool:

result = await loop.run_in_excutor(pool, func)

print("custom thread pool", result)

# 3.Run in a custom process pool

with concurrent.futures.ProcessPoolExecutor() as pool:

result = await loop.run_in_excutor(pool, func)

print("custom process pool", result)

asyncio.run(main())

案例:asyncio + 不支援的子產品

import asyncio

import requests

async def download_image(url):

# f發送網絡請求下載下傳圖檔,(遇到下載下傳圖檔的網絡IO請求,自動切換到其它任務)

print("開始下載下傳", url)

loop = asyncio.get_running_loop()

# requests子產品不支援異步操作,是以就使用線程池來配合實作了

future = loop.run_in_executor(None, requests.get, url)

response = await future

print("下載下傳完成")

# 圖檔儲存到本地檔案

file_name = url.rsplit("/")[-1]

with open(file_name, mode="wb") as file_object:

file_object.write(response.content)

def main():

url_list = [

"http://ww1.sinaimg.cn/mw600/00745YaMgy1gedxxa59lyj30kk10p77m.jpg",

"http://ww1.sinaimg.cn/mw600/00745YaMgy1gedxrlhlhaj30kk0dpmxj.jpg",

"http://ww1.sinaimg.cn/mw600/00745YaMgy1gedxrlrw4tj30kk0pp78u.jpg"

]

tasks = [download_image(url) for url in url_list]

loop = asyncio.get_event_loop()

loop.run_until_complete(asyncio.wait(tasks))

if __name__ == "__main__":

main()

3.7 異步疊代器

什麼是異步疊代器

實作了__aiter__()和__anext__()方法的對象。anext__必須傳回一個awaitable對象,async for 會處理異步疊代器的__anext()方法所傳回的可等待對象,知道其引發一個StopAsyncIteration異常。

什麼是異步可疊代對象

可在async for 中使用的對象。必須通過它的__aiter__()傳回一個asynchronous iterator

import asyncio

class Reader(object):

"""自定義異步疊代器(同時也是異步可疊代對象)"""

def __init__(self):

self.count = 0

async def readline(self):

# await asyncio.sleep(2)

self.count += 1

if self.count == 100:

return None

return self.count

def __aiter__(self):

return self

async def __anext__(self):

val = await self.readline()

if val == None:

raise StopAsyncIteration

return val

async def func():

obj = Reader()

# async for隻能寫在協程函數内

async for item in obj:

print(item)

asyncio.run(func())

3.8 異步上下文管理器

此種對象通過定義__aenter__()方法和__aexit__()方法來對async with語句中的環境進行控制

import asyncio

class AsyncContexManager(object):

def __init__(self, ):

self.conn = conn

async def do_something(self):

# 異步操作資料庫

return 666

async def __aenter__(self):

# 異步連接配接資料庫

self.conn = await asyncio.sleep(2)

return self

async def __anext__(self):

# 異步關閉資料庫

await asyncio.sleep(2)

async def func():

# async with 隻能在協程函數中使用

async with AsyncContexManager() as f:

result = await f.do_something()

print(result)

asyncio.run(func())

4.uvloop

是asyncio的事件循環的替代方案。事件循環 > asyncio的事件循環,性能比肩go

pip install uvloop

import asyncio

import uvloop

asyncio.set_event_loop_policy(uvloop.EventLoopPolicy)

# 編寫asyncio的代碼,與之前寫的代碼一緻

# 内部的事件循環會自動變化為uvloop

asyncio.run(...)

注意:一個asgi-->uvicorn 内部使用的是uvloop

5.實戰案例

5.1 異步操作redis

在使用Python代碼操作redis,連接配接、操作、斷開都是網絡IO

pip install aioredis

示例1:

import asyncio

import aioredis

async def execute(address, password):

print("開始執行", address)

# 網絡IO 建立redis連接配接

redis = await aioredis.create_redis(address, password=password)

# 網絡IO 在redis中設定哈希值car,内部設定3個鍵值對 redis = {"car": {"key1": 1, "key2": 2, "key3": 3}}

await redis.hmset_dict("car", key1=1, key2=2, key3=3)

# 網絡IO 去redis中擷取值

result = redis.hgetall("car", encoding="utf-8")

print(result)

redis.close()

# 網絡IO關閉redis連接配接

await redis.wait_close()

print("結束", address)

asyncio.run(execute("redis://127.0.0.1:6379", "123456"))

示例2:

import asyncio

import aioredis

async def execute(address, password):

print("開始執行", address)

# 網絡IO 先去連接配接47.93.4.197:6379,遇到IO則自動切換任務,去連接配接47.93.4.198:6379

redis = await aioredis.create_redis(address, password=password)

# 網絡IO 遇到IO會自動切換任務

await redis.hmset_dict("car", key1=1, key2=2, key3=3)

# 網絡IO 遇到IO會自動切換任務

result = redis.hgetall("car", encoding="utf-8")

print(result)

redis.close()

# 網絡IO 遇到IO會自動切換任務

await redis.wait_close()

print("結束", address)

task_list = [

execute("47.93.4.197:6379", "123456"),

execute("47.93.4.198:6379", "123456"),

]

asyncio.run(asyncio.wait(task_list))

5.1 異步操作MySQL

pip install aiomysql

示例1:

import asyncio

import aiomysql

async def execute():

print("開始執行")

# 網絡IO 連接配接MySQL

conn = await aiomysql.connect(host="127.0.0.1", port=3306, user="root", password="123456", db="mysql")

# 網絡IO 建立cursor

cur = await conn.cursor()

# 網絡IO 執行sql

await cur.execute("seletc name from user")

# 網絡IO 擷取sql結果

result = await cur.fetchall()

print(result)

# 網絡IO 關閉連接配接

await cur.close()

conn.close()

asyncio.run(execute())

示例2:

import asyncio

import aiomysql

async def execute(host, password):

print("開始執行", host)

# 網絡IO 連接配接MySQL先去連接配接47.93.41.197,遇到IO則切換去連接配接47.93.41.198

conn = await aiomysql.connect(host=host, port=3306, user="root", password=password, db="mysql")

# 網絡IO 遇到IO會自動切換任務

cur = await conn.cursor()

# 網絡IO 遇到IO會自動切換任務

await cur.execute("seletc name from user")

# 網絡IO 遇到IO會自動切換任務

result = await cur.fetchall()

print(result)

# 網絡IO 遇到IO會自動切換任務

await cur.close()

conn.close()

print("結束", host)

task_list = [

execute("47.93.41.197", "123456"),

execute("47.93.41.198", "123456")

]

asyncio.run(asyncio.wait(task_list))

5.3 FastAPI架構

安裝

pip install fastapi

pip install uvicorn # (asgi 可以認為是支援異步的wsgi,内部基于uvloop)

示例:mu.py

import asyncio

import aioredis

import uvicorn

from fastapi import FastAPI

from aioredis import Redis

app = FastAPI()

REDIS_POOL = aioredis.ConnectionsPool("redis://47.193.14.198:6379", password="123", minsize=1, maxsize=10)

@app.get("/")

def index():

"""普通操作接口"""

return {"msg": "hello world"}

@app.get("/red")

async def red():

# 異步操作接口

print("請求來了")

await asyncio.sleep(3)

# 連接配接池擷取一個連接配接

conn = await REDIS_POOL.acquire()

redis = Redis(conn)

# 設定值

await redis.hmset_dict("car", key1=1, key2=2, key3=3)

# 讀取值

result = await redis.hgetall("car", encoding="utf-8")

print(result)

# 連接配接歸還連接配接池

REDIS_POOL.release(conn)

return result

if __name__ == "__main__":

uvicorn.run("mu:app", host="127.0.0.1", port=5000, log_level="info")

5.4爬蟲

pip install aiohttp

import asyncio

import aiohttp

async def fetch(session, url):

print("發送請求", url)

async with session.get(url, verify_ssl=False) as response:

text = await response.text()

print("結果:", url, len(text))

return text

async def main():

async with aiohttp.ClientSession() as session:

url_list = [

"https://python.org",

"https://www.baidu.com",

"https://tianbaoo.github.io"

]

tasks = [asyncio.create_task(fetch(session, url)) for url in url_list]

done, pending = await asyncio.wait(tasks)

print(done)

if __name__ == "__main__":

asyncio.run(main())

6.總結

最大的意義:通過一個線程利用其IO等待時間去做其他事情。

來源:oschina

連結:https://my.oschina.net/u/4409292/blog/4265011