一、為什麼使用rpc。
1)相比uwsgi,使用rpc的長連接配接可以不需要頻繁建立連接配接,提高傳輸效率。
2)rpc支援同步和異步,對于不需要等待傳回的消息可以不等待傳回繼續運作,減少用戶端等待時間。
3)使用rpc入口是我們自己定義的,可以根據不同消息類型定制不同的政策。
二、設計思路
使用統一入口,采用django的url resolve比對,然後完成調用,不改變django rest接口的開發模式。
服務端處理采用同步異步分離,異步任務用單獨的程序處理,并為異步任務制定處理政策:
1)對于同步任務,仍然需要立即調用傳回。
2)對于異步任務,可以進行任務分級:
一級是重要任務,屬于系統能力不足時必須優先保障的;
二級任務,在系統能力足夠時仍然需要執行,一旦能力不足,優先保障一級任務;
3)對異步任務,制定執行政策:
一是必須執行的任務,這部分任務即使積壓也有一條條全部執行完成;
二是隻需要執行最後一條的,常見于更新資訊,對于積壓多條的同一消息,丢棄前面的,保留最後一條;
三是可丢棄的,遇到性能不足,這一類消息不執行,直接丢棄。
三、 grpc的proto檔案
syntax = "proto3";
package rpc;
service RPCServer {
rpc handel(Input) returns (Output){}
}
message Input {
string params = 1;
}
message Output {
string content = 1;
}
入參為Input,傳回為Output,所有接口調用都走這邊。
四、用戶端調用
import grpc
import time
import json
import traceback
import threading
import uuid
from datetime import datetime
from . import data_pb2, data_pb2_grpc
_HOST = ''
_PORT = ''
CHANNEL = grpc.insecure_channel(_HOST + ':' + _PORT)
class ManoEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, datetime):
return str(obj)
if isinstance(obj, uuid.UUID):
return str(obj)
return json.JSONEncoder.default(self, obj)
def mano_encode(data):
return json.dumps(data, cls=ManoEncoder)
def call_rpc(url, headers, resource, content, logger):
try:
params = json.dumps({
'url': url,
'headers': headers,
'method': resource['method'],
'content': content
})
timeout = resource.get('timeout', 5)
client = data_pb2_grpc.RPCServerStub(CHANNEL)
response = client.handel.future(data_pb2.Input(params=params), timeout)
while not response.done():
time.sleep(0.01)
result = json.loads(response.result().content)
print(result['status_code'])
return result['status_code'], mano_encode(result['data'])
except Exception as err:
logger.error(traceback.format_exc())
logger.error('call url %s failed, msg is %s' % (url, err.message))
return '409', err.message
入參params需包含:rest url,頭資訊headers,rest類型,以及request body;
結果采用異步擷取,不持續占用連接配接,對于不需要結果的,可以不等待,這邊沒寫。
五、服務端實作
import os
import django
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "*.settings")
django.setup()
import grpc
import json
import time
import random
import traceback
import threading
import uuid
import logging
from datetime import datetime
from concurrent import futures
from multiprocessing import Process, Queue, Value
from Queue import Queue as ManoQueue
from . import data_pb2, data_pb2_grpc
from django.urls import get_resolver
from django.utils.functional import cached_property
_ONE_DAY_IN_SECONDS = 60 * 60 * 24
_HOST = '[::]'
_PORT = '12330'
_PROCESS_COUNT = 2
RESOLVER = get_resolver()
logger = logging.getLogger(__name__)
message_queue = Queue() # 異步任務隊列,用于程序通信
status_level2 = Value('I', 1) # 二級隊列狀态,用于程序通信
class ManoEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, datetime):
return str(obj)
if isinstance(obj, uuid.UUID):
return str(obj)
return json.JSONEncoder.default(self, obj)
def mano_encode(data):
return json.dumps(data, cls=ManoEncoder)
class RPCServer(data_pb2_grpc.RPCServerServicer):
def handel(self, request, context):
input_info = json.loads(request.params)
if input_info.get('reply', True) is True: # reply為True代表同步,否則異步
res_url = input_info['url']
headers = input_info['headers']
method = input_info['method']
content = input_info['content']
status_code, data = self.call_sync(res_url, headers, method, content)
return data_pb2.Output(content=mano_encode({'data': data, 'status_code': status_code}))
else:
if input_info['queue_detail']['level'] == 2 and not status_level2:
data = 'queue of status level2 is not active'
status_code = '409'
else:
message_queue.put(request.params)
data = 'success'
status_code = '201'
return data_pb2.Output(content=mano_encode({'data': data, 'status_code': status_code}))
@staticmethod
def call_sync(res_url, headers, method, content):
try:
resp_status, resp_body = call_inner(res_url, headers, method, content, logger)
return resp_status, resp_body
except Exception as err:
logger.error(traceback.format_exc())
logger.error('call url %s failed, msg is %s' % (res_url, err.message))
return '409', err.message
def main(): # rpc 服務主程序
bind_address = '%s:%s' % (_HOST, _PORT)
_run_server(bind_address) # 啟動rpc程序
_run_queue_process() # 啟動異步任務處理程序
def _run_server(bind_address):
grpc_server = grpc.server(futures.ThreadPoolExecutor(max_workers=100, ))
data_pb2_grpc.add_RPCServerServicer_to_server(RPCServer(), grpc_server)
grpc_server.add_insecure_port(bind_address)
grpc_server.start()
def _run_queue_process():
worker = Process(target=_handle_no_wait_request, args=(message_queue, status_level2,))
worker.start()
worker.join()
def _handle_no_wait_request(q, status_2): # 異步任務分類
first_order_queue = [ManoQueue(maxsize=0), list()]
second_order_queue = [ManoQueue(maxsize=1000), list()]
mano_queue = [first_order_queue, second_order_queue]
thread_pool = futures.ThreadPoolExecutor(max_workers=50)
threading.Thread(target=_start_message_monitor, args=(q, mano_queue, status_2,)).start() # 根據政策進行異步任務分類
while True:
num_threads = len(thread_pool._threads)
if num_threads < 50:
input_info = _get_request(mano_queue) # 擷取本次需執行的任務,每個隊列機會均等
res_url = input_info['url']
headers = input_info['headers']
method = input_info['method']
content = input_info['content']
thread_pool.submit(RPCServer.call_sync, res_url, headers, method, content) # 交給工作線程
logger.info('handle success')
else:
logger.info('process busy')
time.sleep(0.1)
def _start_message_monitor(q, mano_queue, status_2):
while True:
data = q.get()
_handel_by_queue(data, mano_queue, status_2)
def _get_request(mano_queue):
active_index = _get_active_queue(mano_queue)
if active_index:
index = random.choice(active_index)
i, k = int(index.split('_')[0]), int(index.split('_')[1])
q = mano_queue[i][k]
if isinstance(q, ManoQueue):
request_info = json.loads(q.get())
else:
request_info = json.loads(q.pop(0))
else:
request_info = {}
return request_info
def _get_active_queue(mano_queue):
active_index = []
if not mano_queue[0][0].empty():
active_index.append('0_0')
if not mano_queue[1][0].empty():
active_index.append('1_0')
if len(mano_queue[0][1]) != 0:
active_index.append('0_1')
if len(mano_queue[1][1]) != 0:
active_index.append('1_1')
return active_index
def _handel_by_queue(data, mano_queue, status_2): # 根據請求級别進行消息分類
input_info = json.loads(data)
level = input_info['queue_detail']['level']
policy = input_info['queue_detail']['limit_policy']
if level == 1:
_handel_by_policy(mano_queue[0], policy, data)
elif level == 2:
request_queue = mano_queue[1]
_handel_by_policy(mano_queue[1], policy, data)
if request_queue[0].qsize() > 0.8 * request_queue[0].maxsize:
status_2.value = 0
elif request_queue[0].qsize() < 0.6 * request_queue[0].maxsize:
status_2.value = 1
def _handel_by_policy(request_queue, policy, data): # 根據請求政策進行消息分類
if policy == 'execute': # 必須執行的異步任務
request_queue[0].put(data)
elif policy == 'last': # 阻塞時可以隻執行最後一次的異步任務
try:
while True:
request_queue[1].remove(data)
except ValueError:
request_queue[1].append(data)
else: # 阻塞時可以丢棄的異步任務
if request_queue[0].qsize < request_queue[0].maxsize * 0.6:
request_queue[0].put(data) # 先丢棄前面的
def call_inner(res_url, headers, method, content, logger):
logger.info('[call_inner] url is %s' % res_url)
url, params = get_url_and_params(res_url)
meta = get_meta(headers)
request = Request(url=url, full_url=res_url, params=params, content=content, meta=meta, method=method)
resolver_match = RESOLVER.resolve(url) # URL 比對
callback, callback_args, callback_kwargs = resolver_match
call_method = getattr(callback.view_class(), method.lower())
if not method:
return '404', 'not support this operate'
try:
if callback_kwargs:
result = call_method(request, '', **callback_kwargs)
else:
result = call_method(request)
except BaseException as err:
logger.error(traceback.format_exc())
logger.error('call url %s failed, msg is %s' % (res_url, err.message))
return '409', err.message
return str(result.status_code), result.data
def get_url_and_params(full_url):
params = {}
if '?' in full_url:
url, params_str = full_url.split('?')[0], full_url.split('?')[1]
for key_value in params_str.split('&'):
key, value = key_value.split('=')[0], key_value.split('=')[1]
params[key] = value
else:
url = full_url
return url, params
def get_meta(headers):
meta = {}
# custom
return meta
class Request(object):
def __init__(self, **kwargs):
self.data = self.get_content(kwargs['content'])
self.query_params = kwargs['params']
self.path = kwargs['url']
self.full_path = kwargs['full_url']
self.FILES = {}
self.META = kwargs['meta']
self.COOKIES = {}
self._request = InnerOBJ(kwargs['method'])
@staticmethod
def get_content(content):
if not content:
req_data = {}
else:
req_data = content if isinstance(content, dict) else json.loads(content)
return req_data
def __str__(self):
return '<Request> %s' % self.path
@cached_property
def GET(self):
return self.query_params
def get_full_path(self):
return self.full_path
class InnerOBJ(object):
def __init__(self, method):
self.method = method.upper()
if __name__ == '__main__':
main()