概念
协程(Coroutine):是单线程下的并发,又称微线程,纤程。
协程是一种用户态的轻量级线程,即协程是由用户程序自己控制调度的,实现单线程下的并发。
子程序就是协程的一种特例
子程序的概念:程序可以互相调用,就是说,在一段程序a的中间,可以设置它执行另外的一段程序b,然后再回来继续执行本段程序a后边的部分,在其中调用的那另外的一段程序b就是这一程序a的子程序。
注意:
python的线程属于内核级别的,即由操作系统控制调度的抢占式多任务(如单线程遇到io或执行时间过长就会被迫交出cpu执行权限,切换其他线程运行)
单线程内开启协程,一旦遇到io,就会从应用程序级别(而非操作系统)控制切换,以此来提升效率(非io操作的切换与效率无关)
优点:
协程的切换开销更小,属于程序级别的切换,操作系统完全感知不到,因而更加轻量级
不需要多线程的锁机制,因而效率更高
单线程内就可以实现并发的效果,最大限度地利用cpu
缺点:
协程的本质是单线程下,无法利用多核,但可以通过开启多进程,每个进程内开启多个线程,每个线程内开启协程
协程指的是单个线程,因而一旦协程出现阻塞,将会阻塞整个线程
协程特点:
必须在只有一个单线程里实现并发
修改共享数据不需加锁
协程拥有自己的寄存器上下文和栈
一个协程遇到IO操作自动切换到其它协程
实现原理:
协程调度切换时,将寄存器上下文和栈保存到其他地方,在切回来的时候,恢复先前保存的寄存器上下文和栈。因此协程能保留上一次调用时的状态,即所有局部状态的一个特定组合,每次过程重入时,就相当于进入上一次调用的状态。
Python对协程的支持是通过generator实现的。
在generator中,我们不但可以通过for循环来迭代,还可以不断调用next()函数获取由yield语句返回的下一个值。
但是Python的yield不但可以返回一个值,它还可以接收调用者发出的参数。
def consumer():
r = ''
while True:
n = yield r
if not n:
return
print('[CONSUMER] Consuming%s...' % n)
r = '200 OK'
def produce(c):
c.send(None)
n = 0
while n < 5:
n = n + 1
print('[PRODUCER] Producing%s...' % n)
r = c.send(n)
print('[PRODUCER] Consumer return:%s' % r)
c.close()
c = consumer()
produce(c)
模块:
python中对于协程有两个模块,greenlet和gevent,但更适用于Unix/Linux下
Greenlet(greenlet的执行顺序需要我们手动控制)
gevent(自动切换,由于切换是在IO操作时自动完成,所以gevent需要修改Python自带的一些标准库,这一过程在启动时通过monkey patch完成)
asyncio
gevent多被用在python3以前,对windows的兼容性不佳,而且这几年都没更新了,所以就使用asyncio模块来实现协程
python的asyncio模块可以实现异步网络操作、并发、协程。当一个任务需要等待IO结果的时候,可以挂起当前任务,转而去执行其他任务。类似于JS单线程,通过Promise对象,async和wait方法来实现异步执行。使用await可以将耗时等待的操作挂起,让出控制权,从而实现异步处理。如果一个对象可以在 await语句中使用,那么它就是 可等待对象 。
可等待对象有三种主要类型: 协程(coroutine), 任务(task) 和 Future
协程函数: 定义形式为 async def 的函数;
协程对象: 调用协程函数所返回的对象
import asyncio
async def do_some_work(x):
print("waiting:",x)
# 当协程执行的时候遇到 await,时间循环就会将本协程挂起,转而去执行别的协程,直到其他的协程挂起或执行完毕
await asyncio.sleep(x)
return "Done after {}s".format(x)
def callback(task):
print('Status:', task.result())
# 使用async关键字定义的函数就是一个协程对象
coroutine = do_some_work(2) #
# 创建一个事件loop
loop = asyncio.get_event_loop() #<_WindowsSelectorEventLoop running=False closed=False debug=False>
# stop(),run_forever(),close(),...
# 创建一个task对象,task是Future类的子类,对 coroutine 对象的进一步封装。
# task对象保存了协程运行后的状态(running、finished),用于未来获取协程的结果。
# 三种创建task的方式:
task = loop.create_task(coroutine) #
task = asyncio.ensure_future(coroutine) #低层级
task = asyncio.create_task(coroutine) #高层级,python3.7+
# 为task绑定回调方法
task.add_done_callback(callback)
# cancelled(),done(),result(),exception(),remove_done_callback(callback),...
# 将协程注册到事件循环 loop 中,然后启动
loop.run_until_complete(task) # 与task.result()的结果一致
# 如果没有显式地声明task,那么该方法会将coroutine封装成task对象 #loop.run_until_complete(coroutine)
多任务并行
coroutine asyncio.wait(aws, *, loop=None, timeout=None, return_when=ALL_COMPLETED)
并发运行 aws 指定的 可等待对象 ,并阻塞线程直到满足 return_when 指定的条件(FIRST_COMPLETED,FIRST_EXCEPTION,ALL_COMPLETED),返回两个 Task/Future 集合: (done, pending)。
# 将任务先放置在任务队列中,然后将任务队列传给 `asynicio.wait` 方法,就可以同时并行运行队列中的任务
tasks = [asyncio.ensure_future(do_some_work(i)) for i in range(5)]
loop = asyncio.get_event_loop()
dones, pendings = loop.run_until_complete(asyncio.wait(tasks))
for task in dones:
print("Task ret:", task.result())
嵌套协程
即一个协程中await了另外一个协程
async def main():
tasks = [asyncio.ensure_future(do_some_work(i)) for i in range(5)]
return await asyncio.gather(*tasks) #按列表顺序返回
# return await asyncio.wait(tasks) #不按顺序
loop = asyncio.get_event_loop()
results = loop.run_until_complete(main())
for result in results:
print("Task ret:",result)
async def main():
tasks = [asyncio.ensure_future(do_some_work(i)) for i in range(5)]
for task in asyncio.as_completed(tasks): #as_complete
result = await task
print("Task ret: {}".format(result))
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
协程停止
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(asyncio.wait(tasks))
except KeyboardInterrupt as e:
print(asyncio.Task.all_tasks())
for task in asyncio.Task.all_tasks():
print(task.cancel())
loop.stop()
loop.run_forever()
finally:
loop.close()
aiohttp
仅仅将涉及 IO 操作的代码封装到 async 修饰的方法里面是不可行的,不是真正的异步执行!我们必须要使用支持异步操作的请求方式才可以实现真正的异步,所以这里就需要 aiohttp 派上用场了
asyncio可以实现单线程并发IO操作。如果仅用在客户端,发挥的威力不大。如果把asyncio用在服务器端,例如Web服务器,由于HTTP连接就是IO操作,因此可以用单线程+coroutine实现多用户的高并发支持。
asyncio实现了TCP、UDP、SSL等协议,aiohttp则是基于asyncio实现的HTTP框架。
# 用aiohttp封装好的阻塞API,异步运行一个简单的服务器
import asyncio
from aiohttp import web
routes = web.RouteTableDef()
@routes.get('/')
async def index(request):
await asyncio.sleep(0.5)
return web.Response(body=b'
Hello Word!
', content_type='text/html', charset='utf-8')
@routes.get('/data')
async def json_data(request):
await asyncio.sleep(2)
return web.json_response({
'name': 'start a server',
'info':{
"author":"MerelySmile",
"date":"2019"
}
})
def init():
app = web.Application()
app.add_routes(routes)
web.run_app(app)
# run_app()提供了一个简单的阻塞API来运行Application,用于异步启动Application
# 监听http://0.0.0.0:8080,server地址是localhost:8080
init()
# 原生的方法,实例化一个AppRunner
# 官方文档:https://aiohttp.readthedocs.io/en/stable/web_advanced.html#aiohttp-web-app-runners
import asyncio
from aiohttp import web
async def index(request):
await asyncio.sleep(0.5)
return web.Response(body=b'
Index
', content_type='text/html', charset='utf-8')
async def hello(request):
await asyncio.sleep(0.5)
text = '
hello,%s!
' % request.match_info['name']
return web.Response(body=text.encode('utf-8'),content_type='text/html')
async def init():
app = web.Application()
app.router.add_get('/', index)
app.router.add_get('/hello/{name}', hello)
runner = web.AppRunner(app) # app.make_handler()方法过期了,改用APPrunner
await runner.setup()
site = web.TCPSite(runner, '127.0.0.1', 8000)
await site.start()
print('Server started at http://127.0.0.1:8000...')
return site
loop = asyncio.get_event_loop()
loop.run_until_complete(init())
loop.run_forever()
# 基本使用
import aiohttp
url = 'https://api.github.com/some/endpoint'
headers = {'content-type': 'application/json'}
conn = aiohttp.ProxyConnector(proxy="http://some.proxy.com")
async with aiohttp.ClientSession(connector=conn) as session:
async with session.get(url,headers=headers,timeout=30) as resp:
assert r.status == 200
print(await resp.text())
异步请求(asyncio+aiohttp)
# 一个简单的实例
import asyncio
import aiohttp
async def get(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
result = await resp.text()
return result
async def request():
url = 'http://127.0.0.1:8080'
print('Waiting for', url)
result = await get(url)
print('Get response from', url, 'Result:', result)
tasks = [asyncio.ensure_future(request()) for _ in range(5)]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
多进程+协程
因为协程是一个线程执行,通过多进程+协程,既充分利用多核,又充分发挥协程的高效率,可获得极高的性能。
使用aiomultiprocess模块(python3.6+),基于 multiprocessing 和 asyncio 实现
看了一会源码,debug了两遍,晕了呀,瞎猜一下吧,大概就是通过multiprocessing来创建一个上下文Contexts,实例化Manager来共享状态,使用Queue来实现进程间的通信。创建继承自Process的工作进程PoolWorker,注册loop来保存每个worker,就相当于将协程注册到事件循环 loop 中,然后再创建task来保存协程的执行结果。点到为止了,真想要看懂的话,还需要去推敲一下multiprocessing.managers。
# 进程池 + 异步协程
import asyncio
import aiohttp
from aiomultiprocess import Pool
async def get(url):
print('Waiting for', url)
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
result = await resp.text()
print('Result:', result)
return result
async def request():
url = 'http://127.0.0.1:8080'
urls = [url for _ in range(5)]
async with Pool() as pool:
result = await pool.map(get, urls)
return result
if __name__ == '__main__':
coroutine = request()
task = asyncio.ensure_future(coroutine)
loop = asyncio.get_event_loop()
loop.run_until_complete(task)