天天看点

python协程实现一万并发_python并发编程(三)—— 异步协程(asyncio,aiohttp)Hello Word!Indexhello,%s!

python协程实现一万并发_python并发编程(三)—— 异步协程(asyncio,aiohttp)Hello Word!Indexhello,%s!

概念

协程(Coroutine):是单线程下的并发,又称微线程,纤程。

协程是一种用户态的轻量级线程,即协程是由用户程序自己控制调度的,实现单线程下的并发。

子程序就是协程的一种特例

子程序的概念:程序可以互相调用,就是说,在一段程序a的中间,可以设置它执行另外的一段程序b,然后再回来继续执行本段程序a后边的部分,在其中调用的那另外的一段程序b就是这一程序a的子程序。

注意:

python的线程属于内核级别的,即由操作系统控制调度的抢占式多任务(如单线程遇到io或执行时间过长就会被迫交出cpu执行权限,切换其他线程运行)

单线程内开启协程,一旦遇到io,就会从应用程序级别(而非操作系统)控制切换,以此来提升效率(非io操作的切换与效率无关)

优点:

协程的切换开销更小,属于程序级别的切换,操作系统完全感知不到,因而更加轻量级

不需要多线程的锁机制,因而效率更高

单线程内就可以实现并发的效果,最大限度地利用cpu

缺点:

协程的本质是单线程下,无法利用多核,但可以通过开启多进程,每个进程内开启多个线程,每个线程内开启协程

协程指的是单个线程,因而一旦协程出现阻塞,将会阻塞整个线程

协程特点:

必须在只有一个单线程里实现并发

修改共享数据不需加锁

协程拥有自己的寄存器上下文和栈

一个协程遇到IO操作自动切换到其它协程

实现原理:

协程调度切换时,将寄存器上下文和栈保存到其他地方,在切回来的时候,恢复先前保存的寄存器上下文和栈。因此协程能保留上一次调用时的状态,即所有局部状态的一个特定组合,每次过程重入时,就相当于进入上一次调用的状态。

Python对协程的支持是通过generator实现的。

在generator中,我们不但可以通过for循环来迭代,还可以不断调用next()函数获取由yield语句返回的下一个值。

但是Python的yield不但可以返回一个值,它还可以接收调用者发出的参数。

def consumer():

r = ''

while True:

n = yield r

if not n:

return

print('[CONSUMER] Consuming%s...' % n)

r = '200 OK'

def produce(c):

c.send(None)

n = 0

while n < 5:

n = n + 1

print('[PRODUCER] Producing%s...' % n)

r = c.send(n)

print('[PRODUCER] Consumer return:%s' % r)

c.close()

c = consumer()

produce(c)

模块:

python中对于协程有两个模块,greenlet和gevent,但更适用于Unix/Linux下

Greenlet(greenlet的执行顺序需要我们手动控制)

gevent(自动切换,由于切换是在IO操作时自动完成,所以gevent需要修改Python自带的一些标准库,这一过程在启动时通过monkey patch完成)

asyncio

gevent多被用在python3以前,对windows的兼容性不佳,而且这几年都没更新了,所以就使用asyncio模块来实现协程

python的asyncio模块可以实现异步网络操作、并发、协程。当一个任务需要等待IO结果的时候,可以挂起当前任务,转而去执行其他任务。类似于JS单线程,通过Promise对象,async和wait方法来实现异步执行。使用await可以将耗时等待的操作挂起,让出控制权,从而实现异步处理。如果一个对象可以在 await语句中使用,那么它就是 可等待对象 。

可等待对象有三种主要类型: 协程(coroutine), 任务(task) 和 Future

协程函数: 定义形式为 async def 的函数;

协程对象: 调用协程函数所返回的对象

import asyncio

async def do_some_work(x):

print("waiting:",x)

# 当协程执行的时候遇到 await,时间循环就会将本协程挂起,转而去执行别的协程,直到其他的协程挂起或执行完毕

await asyncio.sleep(x)

return "Done after {}s".format(x)

def callback(task):

print('Status:', task.result())

# 使用async关键字定义的函数就是一个协程对象

coroutine = do_some_work(2) #

# 创建一个事件loop

loop = asyncio.get_event_loop() #<_WindowsSelectorEventLoop running=False closed=False debug=False>

# stop(),run_forever(),close(),...

# 创建一个task对象,task是Future类的子类,对 coroutine 对象的进一步封装。

# task对象保存了协程运行后的状态(running、finished),用于未来获取协程的结果。

# 三种创建task的方式:

task = loop.create_task(coroutine) #

task = asyncio.ensure_future(coroutine) #低层级

task = asyncio.create_task(coroutine) #高层级,python3.7+

# 为task绑定回调方法

task.add_done_callback(callback)

# cancelled(),done(),result(),exception(),remove_done_callback(callback),...

# 将协程注册到事件循环 loop 中,然后启动

loop.run_until_complete(task) # 与task.result()的结果一致

# 如果没有显式地声明task,那么该方法会将coroutine封装成task对象 #loop.run_until_complete(coroutine)

多任务并行

coroutine asyncio.wait(aws, *, loop=None, timeout=None, return_when=ALL_COMPLETED)

并发运行 aws 指定的 可等待对象 ,并阻塞线程直到满足 return_when 指定的条件(FIRST_COMPLETED,FIRST_EXCEPTION,ALL_COMPLETED),返回两个 Task/Future 集合: (done, pending)。

# 将任务先放置在任务队列中,然后将任务队列传给 `asynicio.wait` 方法,就可以同时并行运行队列中的任务

tasks = [asyncio.ensure_future(do_some_work(i)) for i in range(5)]

loop = asyncio.get_event_loop()

dones, pendings = loop.run_until_complete(asyncio.wait(tasks))

for task in dones:

print("Task ret:", task.result())

嵌套协程

即一个协程中await了另外一个协程

async def main():

tasks = [asyncio.ensure_future(do_some_work(i)) for i in range(5)]

return await asyncio.gather(*tasks) #按列表顺序返回

# return await asyncio.wait(tasks) #不按顺序

loop = asyncio.get_event_loop()

results = loop.run_until_complete(main())

for result in results:

print("Task ret:",result)

async def main():

tasks = [asyncio.ensure_future(do_some_work(i)) for i in range(5)]

for task in asyncio.as_completed(tasks): #as_complete

result = await task

print("Task ret: {}".format(result))

loop = asyncio.get_event_loop()

loop.run_until_complete(main())

协程停止

loop = asyncio.get_event_loop()

try:

loop.run_until_complete(asyncio.wait(tasks))

except KeyboardInterrupt as e:

print(asyncio.Task.all_tasks())

for task in asyncio.Task.all_tasks():

print(task.cancel())

loop.stop()

loop.run_forever()

finally:

loop.close()

aiohttp

仅仅将涉及 IO 操作的代码封装到 async 修饰的方法里面是不可行的,不是真正的异步执行!我们必须要使用支持异步操作的请求方式才可以实现真正的异步,所以这里就需要 aiohttp 派上用场了

asyncio可以实现单线程并发IO操作。如果仅用在客户端,发挥的威力不大。如果把asyncio用在服务器端,例如Web服务器,由于HTTP连接就是IO操作,因此可以用单线程+coroutine实现多用户的高并发支持。

asyncio实现了TCP、UDP、SSL等协议,aiohttp则是基于asyncio实现的HTTP框架。

# 用aiohttp封装好的阻塞API,异步运行一个简单的服务器

import asyncio

from aiohttp import web

routes = web.RouteTableDef()

@routes.get('/')

async def index(request):

await asyncio.sleep(0.5)

return web.Response(body=b'

Hello Word!

', content_type='text/html', charset='utf-8')

@routes.get('/data')

async def json_data(request):

await asyncio.sleep(2)

return web.json_response({

'name': 'start a server',

'info':{

"author":"MerelySmile",

"date":"2019"

}

})

def init():

app = web.Application()

app.add_routes(routes)

web.run_app(app)

# run_app()提供了一个简单的阻塞API来运行Application,用于异步启动Application

# 监听http://0.0.0.0:8080,server地址是localhost:8080

init()

# 原生的方法,实例化一个AppRunner

# 官方文档:https://aiohttp.readthedocs.io/en/stable/web_advanced.html#aiohttp-web-app-runners

import asyncio

from aiohttp import web

async def index(request):

await asyncio.sleep(0.5)

return web.Response(body=b'

Index

', content_type='text/html', charset='utf-8')

async def hello(request):

await asyncio.sleep(0.5)

text = '

hello,%s!

' % request.match_info['name']

return web.Response(body=text.encode('utf-8'),content_type='text/html')

async def init():

app = web.Application()

app.router.add_get('/', index)

app.router.add_get('/hello/{name}', hello)

runner = web.AppRunner(app) # app.make_handler()方法过期了,改用APPrunner

await runner.setup()

site = web.TCPSite(runner, '127.0.0.1', 8000)

await site.start()

print('Server started at http://127.0.0.1:8000...')

return site

loop = asyncio.get_event_loop()

loop.run_until_complete(init())

loop.run_forever()

# 基本使用

import aiohttp

url = 'https://api.github.com/some/endpoint'

headers = {'content-type': 'application/json'}

conn = aiohttp.ProxyConnector(proxy="http://some.proxy.com")

async with aiohttp.ClientSession(connector=conn) as session:

async with session.get(url,headers=headers,timeout=30) as resp:

assert r.status == 200

print(await resp.text())

异步请求(asyncio+aiohttp)

# 一个简单的实例

import asyncio

import aiohttp

async def get(url):

async with aiohttp.ClientSession() as session:

async with session.get(url) as resp:

result = await resp.text()

return result

async def request():

url = 'http://127.0.0.1:8080'

print('Waiting for', url)

result = await get(url)

print('Get response from', url, 'Result:', result)

tasks = [asyncio.ensure_future(request()) for _ in range(5)]

loop = asyncio.get_event_loop()

loop.run_until_complete(asyncio.wait(tasks))

多进程+协程

因为协程是一个线程执行,通过多进程+协程,既充分利用多核,又充分发挥协程的高效率,可获得极高的性能。

使用aiomultiprocess模块(python3.6+),基于 multiprocessing 和 asyncio 实现

看了一会源码,debug了两遍,晕了呀,瞎猜一下吧,大概就是通过multiprocessing来创建一个上下文Contexts,实例化Manager来共享状态,使用Queue来实现进程间的通信。创建继承自Process的工作进程PoolWorker,注册loop来保存每个worker,就相当于将协程注册到事件循环 loop 中,然后再创建task来保存协程的执行结果。点到为止了,真想要看懂的话,还需要去推敲一下multiprocessing.managers。

# 进程池 + 异步协程

import asyncio

import aiohttp

from aiomultiprocess import Pool

async def get(url):

print('Waiting for', url)

async with aiohttp.ClientSession() as session:

async with session.get(url) as resp:

result = await resp.text()

print('Result:', result)

return result

async def request():

url = 'http://127.0.0.1:8080'

urls = [url for _ in range(5)]

async with Pool() as pool:

result = await pool.map(get, urls)

return result

if __name__ == '__main__':

coroutine = request()

task = asyncio.ensure_future(coroutine)

loop = asyncio.get_event_loop()

loop.run_until_complete(task)