天天看點

使用django+rpc進行服務内部互動

一、為什麼使用rpc。

1)相比uwsgi,使用rpc的長連接配接可以不需要頻繁建立連接配接,提高傳輸效率。

2)rpc支援同步和異步,對于不需要等待傳回的消息可以不等待傳回繼續運作,減少用戶端等待時間。

3)使用rpc入口是我們自己定義的,可以根據不同消息類型定制不同的政策。

二、設計思路

使用統一入口,采用django的url resolve比對,然後完成調用,不改變django rest接口的開發模式。

服務端處理采用同步異步分離,異步任務用單獨的程序處理,并為異步任務制定處理政策:

1)對于同步任務,仍然需要立即調用傳回。

2)對于異步任務,可以進行任務分級:

      一級是重要任務,屬于系統能力不足時必須優先保障的;

      二級任務,在系統能力足夠時仍然需要執行,一旦能力不足,優先保障一級任務;

3)對異步任務,制定執行政策:

      一是必須執行的任務,這部分任務即使積壓也有一條條全部執行完成;

      二是隻需要執行最後一條的,常見于更新資訊,對于積壓多條的同一消息,丢棄前面的,保留最後一條;

      三是可丢棄的,遇到性能不足,這一類消息不執行,直接丢棄。

三、 grpc的proto檔案

syntax = "proto3";
package rpc;
service RPCServer {
  rpc handel(Input) returns (Output){}
}

message Input {
  string params = 1;
}

message Output {
  string content = 1;
}      

入參為Input,傳回為Output,所有接口調用都走這邊。

四、用戶端調用

import grpc
import time
import json
import traceback
import threading
import uuid
from datetime import datetime

from . import data_pb2, data_pb2_grpc

_HOST = ''
_PORT = ''
CHANNEL = grpc.insecure_channel(_HOST + ':' + _PORT)


class ManoEncoder(json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj, datetime):
            return str(obj)
        if isinstance(obj, uuid.UUID):
            return str(obj)
        return json.JSONEncoder.default(self, obj)


def mano_encode(data):
    return json.dumps(data, cls=ManoEncoder)


def call_rpc(url, headers, resource, content, logger):
    try:
        params = json.dumps({
            'url': url,
            'headers': headers,
            'method': resource['method'],
            'content': content
        })
        timeout = resource.get('timeout', 5)
        client = data_pb2_grpc.RPCServerStub(CHANNEL)
        response = client.handel.future(data_pb2.Input(params=params), timeout)
        while not response.done():
            time.sleep(0.01)
        result = json.loads(response.result().content)
        print(result['status_code'])
        return result['status_code'], mano_encode(result['data'])
    except Exception as err:
        logger.error(traceback.format_exc())
        logger.error('call url %s failed, msg is %s' % (url, err.message))
        return '409', err.message      

入參params需包含:rest url,頭資訊headers,rest類型,以及request body;

結果采用異步擷取,不持續占用連接配接,對于不需要結果的,可以不等待,這邊沒寫。

五、服務端實作

import os
import django

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "*.settings")
django.setup()

import grpc
import json
import time
import random
import traceback
import threading
import uuid
import logging
from datetime import datetime

from concurrent import futures
from multiprocessing import Process, Queue, Value
from Queue import Queue as ManoQueue

from . import data_pb2, data_pb2_grpc
from django.urls import get_resolver
from django.utils.functional import cached_property

_ONE_DAY_IN_SECONDS = 60 * 60 * 24
_HOST = '[::]'
_PORT = '12330'
_PROCESS_COUNT = 2
RESOLVER = get_resolver()
logger = logging.getLogger(__name__)

message_queue = Queue()  # 異步任務隊列,用于程序通信
status_level2 = Value('I', 1)  # 二級隊列狀态,用于程序通信


class ManoEncoder(json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj, datetime):
            return str(obj)
        if isinstance(obj, uuid.UUID):
            return str(obj)
        return json.JSONEncoder.default(self, obj)


def mano_encode(data):
    return json.dumps(data, cls=ManoEncoder)


class RPCServer(data_pb2_grpc.RPCServerServicer):
    def handel(self, request, context):
        input_info = json.loads(request.params)
        if input_info.get('reply', True) is True:  # reply為True代表同步,否則異步
            res_url = input_info['url']
            headers = input_info['headers']
            method = input_info['method']
            content = input_info['content']
            status_code, data = self.call_sync(res_url, headers, method, content)
            return data_pb2.Output(content=mano_encode({'data': data, 'status_code': status_code}))
        else:
            if input_info['queue_detail']['level'] == 2 and not status_level2:
                data = 'queue of status level2 is not active'
                status_code = '409'
            else:
                message_queue.put(request.params)
                data = 'success'
                status_code = '201'
            return data_pb2.Output(content=mano_encode({'data': data, 'status_code': status_code}))

    @staticmethod
    def call_sync(res_url, headers, method, content):
        try:
            resp_status, resp_body = call_inner(res_url, headers, method, content, logger)
            return resp_status, resp_body
        except Exception as err:
            logger.error(traceback.format_exc())
            logger.error('call url %s failed, msg is %s' % (res_url, err.message))
            return '409', err.message


def main():  # rpc 服務主程序
    bind_address = '%s:%s' % (_HOST, _PORT)
    _run_server(bind_address)  # 啟動rpc程序
    _run_queue_process()  # 啟動異步任務處理程序


def _run_server(bind_address):
    grpc_server = grpc.server(futures.ThreadPoolExecutor(max_workers=100, ))
    data_pb2_grpc.add_RPCServerServicer_to_server(RPCServer(), grpc_server)
    grpc_server.add_insecure_port(bind_address)
    grpc_server.start()


def _run_queue_process():
    worker = Process(target=_handle_no_wait_request, args=(message_queue, status_level2,))
    worker.start()
    worker.join()


def _handle_no_wait_request(q, status_2):  # 異步任務分類
    first_order_queue = [ManoQueue(maxsize=0), list()]
    second_order_queue = [ManoQueue(maxsize=1000), list()]
    mano_queue = [first_order_queue, second_order_queue]
    thread_pool = futures.ThreadPoolExecutor(max_workers=50)
    threading.Thread(target=_start_message_monitor, args=(q, mano_queue, status_2,)).start()  # 根據政策進行異步任務分類
    while True:
        num_threads = len(thread_pool._threads)
        if num_threads < 50:
            input_info = _get_request(mano_queue)  # 擷取本次需執行的任務,每個隊列機會均等
            res_url = input_info['url']
            headers = input_info['headers']
            method = input_info['method']
            content = input_info['content']
            thread_pool.submit(RPCServer.call_sync, res_url, headers, method, content)  # 交給工作線程
            logger.info('handle success')
        else:
            logger.info('process busy')
            time.sleep(0.1)


def _start_message_monitor(q, mano_queue, status_2):
    while True:
        data = q.get()
        _handel_by_queue(data, mano_queue, status_2)


def _get_request(mano_queue):
    active_index = _get_active_queue(mano_queue)
    if active_index:
        index = random.choice(active_index)
        i, k = int(index.split('_')[0]), int(index.split('_')[1])
        q = mano_queue[i][k]
        if isinstance(q, ManoQueue):
            request_info = json.loads(q.get())
        else:
            request_info = json.loads(q.pop(0))
    else:
        request_info = {}
    return request_info


def _get_active_queue(mano_queue):
    active_index = []
    if not mano_queue[0][0].empty():
        active_index.append('0_0')
    if not mano_queue[1][0].empty():
        active_index.append('1_0')
    if len(mano_queue[0][1]) != 0:
        active_index.append('0_1')
    if len(mano_queue[1][1]) != 0:
        active_index.append('1_1')
    return active_index


def _handel_by_queue(data, mano_queue, status_2):  # 根據請求級别進行消息分類
    input_info = json.loads(data)
    level = input_info['queue_detail']['level']
    policy = input_info['queue_detail']['limit_policy']
    if level == 1:
        _handel_by_policy(mano_queue[0], policy, data)
    elif level == 2:
        request_queue = mano_queue[1]
        _handel_by_policy(mano_queue[1], policy, data)
        if request_queue[0].qsize() > 0.8 * request_queue[0].maxsize:
            status_2.value = 0
        elif request_queue[0].qsize() < 0.6 * request_queue[0].maxsize:
            status_2.value = 1


def _handel_by_policy(request_queue, policy, data):  # 根據請求政策進行消息分類
    if policy == 'execute':  # 必須執行的異步任務
        request_queue[0].put(data)
    elif policy == 'last':  # 阻塞時可以隻執行最後一次的異步任務
        try:
            while True:
                request_queue[1].remove(data)
        except ValueError:
            request_queue[1].append(data)
    else:  # 阻塞時可以丢棄的異步任務
        if request_queue[0].qsize < request_queue[0].maxsize * 0.6:
            request_queue[0].put(data)  # 先丢棄前面的


def call_inner(res_url, headers, method, content, logger):
    logger.info('[call_inner] url is %s' % res_url)
    url, params = get_url_and_params(res_url)
    meta = get_meta(headers)
    request = Request(url=url, full_url=res_url, params=params, content=content, meta=meta, method=method)
    resolver_match = RESOLVER.resolve(url)  # URL 比對
    callback, callback_args, callback_kwargs = resolver_match
    call_method = getattr(callback.view_class(), method.lower())
    if not method:
        return '404', 'not support this operate'
    try:
        if callback_kwargs:
            result = call_method(request, '', **callback_kwargs)
        else:
            result = call_method(request)
    except BaseException as err:
        logger.error(traceback.format_exc())
        logger.error('call url %s failed, msg is %s' % (res_url, err.message))
        return '409', err.message
    return str(result.status_code), result.data


def get_url_and_params(full_url):
    params = {}
    if '?' in full_url:
        url, params_str = full_url.split('?')[0], full_url.split('?')[1]
        for key_value in params_str.split('&'):
            key, value = key_value.split('=')[0], key_value.split('=')[1]
            params[key] = value
    else:
        url = full_url
    return url, params


def get_meta(headers):
    meta = {}
    # custom
    return meta


class Request(object):
    def __init__(self, **kwargs):
        self.data = self.get_content(kwargs['content'])
        self.query_params = kwargs['params']
        self.path = kwargs['url']
        self.full_path = kwargs['full_url']
        self.FILES = {}
        self.META = kwargs['meta']
        self.COOKIES = {}
        self._request = InnerOBJ(kwargs['method'])

    @staticmethod
    def get_content(content):
        if not content:
            req_data = {}
        else:
            req_data = content if isinstance(content, dict) else json.loads(content)
        return req_data

    def __str__(self):
        return '<Request> %s' % self.path

    @cached_property
    def GET(self):
        return self.query_params

    def get_full_path(self):
        return self.full_path


class InnerOBJ(object):
    def __init__(self, method):
        self.method = method.upper()


if __name__ == '__main__':
    main()