python官方下载地址:https://pypi.org/project/py-eureka-client/
由于公司使用微服务架构,所以在部署服务时需要使用eureka,但整个项目是使用python开发的,所以学习了一波py-eureka-client模块,相较spring cloud下的java版eureka,代码更直观更好理解,适合我这种从没有接触过啥是微服务的新手,跟着官方提供的tutorial,半小时即可撸完代码。如果想进一步知道每一步都在干什么,则可以再花一个小时去了解一下什么是微服务,相信了解清楚eureka具体是什么作用之后,再回头看代码,你会有更深刻的认识。
在此贴一个我自己认为写的不错的入门级资料:https://zhuanlan.zhihu.com/p/48705394
首先,eureka client主要功能有两个:1.服务注册(registry);2.服务发现(discovery)
一、初始化eureka server对象
根据eureka server地址初始化一个eureka server的实例对象,有三种方法:
- eureka_client.init()
- eureka_client.init_registry_client()
- eureka_client.init_discovery_client()
其中,方法一会同时初始化注册和发现两个功能,如果只需要使用单个功能,则用方法二、三即可。
import py_eureka_client.eureka_client as eureka_client
# 服务发现只需要提供一个eureka_server地址参数即可
eureka_client.init_discovery_client("http://192.168.3.116:8761/eureka/, http://192.168.3.116:8762/eureka/")
二、服务注册
服务注册就是当我们的服务部署上线时,要把“服务部署在哪(几)台机器上,开放哪个端口,用啥传输协议等balabala”一些信息注册登记到Eureka server中去,这个Eureka server就好像是号码百事通一样,不需要你把每一个服务部署信息都牢记,服务更改了信息也不需要通知到每一个用户,因为有Eureka(服务注册中心)这个大管家统一管理。
注册的环节实际在初始化的环节提供用于服务发现时的"app_name"以及端口号完成的,最主要java的eureka中每30s发送一次heartbeat证明服务没有挂掉的功能,都已经封装好了,不需要我们去实现,是不是很方便!
import py_eureka_client.eureka_client as eureka_client
your_rest_server_port = 9090
# The flowing code will register your server to eureka server and also start to send heartbeat every 30 seconds
eureka_client.init_registry_client(eureka_server="http://your-eureka-server-peer1,http://your-eureka-server-peer2",
app_name="your_app_name",
instance_port=your_rest_server_port)
三、服务发现
服务发现,是为了当我们需要请求某一个服务的时候,我们不知道这个服务部署在哪,即不知道ip、port等信息,无法直接发送请求,所以这时候就需要通过Eureka server获得该服务的注册信息,拿到该信息以后再请求我们需要的服务。
发现的代码也很简洁,只需要调用eureka_client.do_service()方法即可完成对服务的请求,是不是很方便!当然,由于高度封装会致使我们搞不清楚到底发生了什么,所以来仔细看看源码都做了些什么事情!
def do_service(app_name="", service="", return_type="string",
prefer_ip=False, prefer_https=False,
method="GET", headers=None,
data=None, timeout=_DEFAULT_TIME_OUT,
cafile=None, capath=None, cadefault=False, context=None):
# 若已经初始化eureka client,则会得到一个实例cli
cli = get_discovery_client()
if cli is None:
raise Exception("Discovery Client has not initialized. ")
# 调用该实例的do_service方法
return cli.do_service(app_name=app_name, service=service, return_type=return_type,
prefer_ip=prefer_ip, prefer_https=prefer_https,
method=method, headers=headers,
data=data, timeout=timeout,
cafile=cafile, capath=capath,
cadefault=cadefault, context=context)
入参:
- app_name:需要发现的服务名(注册到eureka server时用的app_name)
- service:具体在该服务下需要做的操作,即url
- return_type:即请求服务后服务端返回报文response以什么形式返回,默认是string
- method,headers,data:发送请求的标准格式
def do_service(self, app_name="", service="", return_type="string",
prefer_ip=False, prefer_https=False,
method="GET", headers=None,
data=None, timeout=_DEFAULT_TIME_OUT,
cafile=None, capath=None, cadefault=False, context=None):
def walk_using_urllib(url):
req = http_client.Request(url)
req.get_method = lambda: method
heads = headers if headers is not None else {}
for k, v in heads.items():
req.add_header(k, v)
res_txt = http_client.load(req, data=data, timeout=timeout, cafile=cafile, capath=capath, cadefault=cadefault, context=context)
if return_type.lower() in ("json", "dict", "dictionary"):
return json.loads(res_txt)
else:
return res_txt
# 但这段程序里实际执行的是self.walk_nodes()方法。
return self.walk_nodes(app_name, service, prefer_ip, prefer_https, walk_using_urllib)
首先在实例的do_service方法下又定义了一个walk_using_urllib(url)方法,该方法会通过构建好的url以及method,headers,data等标准请求参数完成对服务的请求,并获得返回报文response。
def walk_nodes(self, app_name="", service="", prefer_ip=False, prefer_https=False, walker=None):
assert app_name is not None and app_name != "", "application_name should not be null"
assert inspect.isfunction(walker) or inspect.ismethod(walker), "walker must be a method or function"
error_nodes = []
app_name = app_name.upper()
# 根据app_name从eureka server获得一个服务实例(选择策略默认是随机)
node = self.__get_available_service(app_name)
while node is not None:
try:
# 根据获得的服务实例node,生成初步的url
url = self.__generate_service_url(node, prefer_ip, prefer_https)
# 再跟上服务具体操作service得到完整的url
if service.startswith("/"):
url = url + service[1:]
else:
url = url + service
_logger.debug("service url::" + url)
# 此处的walker就是上一步传入的walk_using_urllib()方法
return walker(url)
except (http_client.HTTPError, http_client.URLError):
_logger.warn("do service %s in node [%s] error, use next node." % (service, node.instanceId))
error_nodes.append(node.instanceId)
# 若该服务实例被占用或者连不通,则会重新调用__get_availabel_service方法
# 直到所有的服务实例都不可用,则抛出错误,这步实现了负载均衡
node = self.__get_available_service(app_name, error_nodes)
raise http_client.URLError("Try all up instances in registry, but all fail")
walk_nodes最重要的四个点就是:
- __get_available_service(app_name)用于获取服务实例(ip:port)
- __generate_service_url()根据服务实例以及参数service,构建请求的url
- 传入参数walker,即之前提到的walk_using_urllib方法,执行对服务的请求工作
- 实现了负载均衡的功能