本章会用两种方式来实现,原生和grpc框架来实现。
一、基础实现
1.1、Server
from multiprocessing.connection import Listener
from threading import Thread
from remote_call import RPCHandler
def rpc_server(handler, address, authkey):
sock = Listener(address, authkey=authkey)
while True:
client = sock.accept()
t = Thread(target=handler.handle_connection, args=(client,))
t.daemon = True
t.start()
# Some remote functions
def add(x, y):
return x + y
def sub(x, y):
return x - y
# Register with a handler
handler = RPCHandler()
handler.register_function(add)
handler.register_function(sub)
# Run the server
rpc_server(handler, ('localhost', 17000), authkey=b'peekaboo')
1.2、Handler
class RPCHandler:
def __init__(self):
self._functions = { }
def register_function(self, func):
self._functions[func.__name__] = func
def handle_connection(self, connection):
try:
while True:
# Receive a message
func_name, args, kwargs = pickle.loads(connection.recv())
# Run the RPC and send a response
try:
r = self._functions[func_name](*args,**kwargs)
connection.send(pickle.dumps(r))
except Exception as e:
connection.send(pickle.dumps(e))
except EOFError:
pass
1.3、Proxy
import pickle
class RPCProxy:
def __init__(self, connection):
self._connection = connection
def __getattr__(self, name):
def do_rpc(*args, **kwargs):
self._connection.send(pickle.dumps((name, args, kwargs)))
result = pickle.loads(self._connection.recv())
if isinstance(result, Exception):
raise result
return result
return do_rpc
1.4、Client
from multiprocessing.connection import Client
from chapter11.rpc_proxy import RPCProxy
c = Client(('localhost', 17000), authkey=b'peekaboo')
proxy = RPCProxy(c)
print(f'add(3, 5) = {proxy.add(3, 5)}')
print(f'sub(5, 12) = {proxy.sub(5, 12)}')
proxy.sub([1, 2], 4)
1.5、JSON实现
1.5.1、Server
import json
class RPCHandler:
def __init__(self):
self._functions = { }
def register_function(self, func):
self._functions[func.__name__] = func
def handle_connection(self, connection):
try:
while True:
# Receive a message
func_name, args, kwargs = json.loads(connection.recv())
# Run the RPC and send a response
try:
r = self._functions[func_name](*args,**kwargs)
connection.send(json.dumps(r))
except Exception as e:
connection.send(json.dumps(str(e)))
except EOFError:
pass
1.5.2、Client
import json
class RPCProxy:
def __init__(self, connection):
self._connection = connection
def __getattr__(self, name):
def do_rpc(*args, **kwargs):
self._connection.send(json.dumps((name, args, kwargs)))
result = json.loads(self._connection.recv())
return result
return do_rpc
1.6、xml
from xmlrpc.server import SimpleXMLRPCServer
class KeyValueServer:
_rpc_methods_ = ['get', 'set', 'delete', 'exists', 'keys']
def __init__(self, address):
self._data = {}
self._serv = SimpleXMLRPCServer(address, allow_none=True)
for name in self._rpc_methods_:
self._serv.register_function(getattr(self, name))
def get(self, name):
return self._data[name]
def set(self, name, value):
self._data[name] = value
def delete(self, name):
del self._data[name]
def exists(self, name):
return name in self._data
def keys(self):
return list(self._data)
def serve_forever(self):
self._serv.serve_forever()
if __name__ == '__main__':
kvserv = KeyValueServer(('', 15000))
kvserv.serve_forever()
from xmlrpc.client import ServerProxy
s = ServerProxy('https://localhost:15000', allow_none=True)
s.set('foo','bar')
s.set('spam', [1, 2, 3])
s.keys()
s.get('foo')
s.get('spam')
s.delete('spam')
s.exists('spam')
二、Grpc框架
2.1、安装
sudo python3 -m pip install grpcio
sudo python3 -m pip install grpcio-tools
python -m grpc_tools.protoc -I../../protos --python_out=. --grpc_python_out=. ../../protos/product.proto
2.2、Server
from concurrent import futures
import logging
import uuid
import grpc
import time
import product_info_pb2
import product_info_pb2_grpc
class ProductInfoServicer(product_info_pb2_grpc.ProductInfoServicer):
def __init__(self):
self.productMap = {}
def addProduct(self, request, context):
id = uuid.uuid1()
request.id = str(id)
print("addProduct:request", request)
self.productMap[str(id)] = request
response = product_info_pb2.ProductID(value = str(id))
print("addProduct:response", response)
return response
def getProduct(self, request, context):
print("getProduct:request", request)
id = request.value
response = self.productMap[str(id)]
print("getProduct:response", response)
return response
# create a gRPC server
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
# use the generated function `add_CalculatorServicer_to_server`
# to add the defined class to the server
product_info_pb2_grpc.add_ProductInfoServicer_to_server(
ProductInfoServicer(), server)
# listen on port 50051
print('Starting server. Listening on port 50051.')
server.add_insecure_port('[::]:50051')
server.start()
# since server.start() will not block,
# a sleep-loop is added to keep alive
try:
while True:
time.sleep(86400)
except KeyboardInterrupt:
server.stop(0)
2.3、Client
import grpc
import product_info_pb2
import product_info_pb2_grpc
import time;
def run():
# open a gRPC channel
channel = grpc.insecure_channel('localhost:50051')
# create a stub (client)
stub = product_info_pb2_grpc.ProductInfoStub(channel)
response = stub.addProduct(product_info_pb2.Product(name = "Apple iPhone 11", description = "Meet Apple iPhone 11. All-new dual-camera system with Ultra Wide and Night mode.", price = 699.0 ))
print("add product: response", response)
productInfo = stub.getProduct(product_info_pb2.ProductID(value = response.value))
print("get product: response", productInfo)
run()