天天看点

py-eureka-client学习笔记一、初始化eureka server对象二、服务注册三、服务发现

python官方下载地址:https://pypi.org/project/py-eureka-client/

由于公司使用微服务架构,所以在部署服务时需要使用eureka,但整个项目是使用python开发的,所以学习了一波py-eureka-client模块,相较spring cloud下的java版eureka,代码更直观更好理解,适合我这种从没有接触过啥是微服务的新手,跟着官方提供的tutorial,半小时即可撸完代码。如果想进一步知道每一步都在干什么,则可以再花一个小时去了解一下什么是微服务,相信了解清楚eureka具体是什么作用之后,再回头看代码,你会有更深刻的认识。

在此贴一个我自己认为写的不错的入门级资料:https://zhuanlan.zhihu.com/p/48705394

首先,eureka client主要功能有两个:1.服务注册(registry);2.服务发现(discovery)

一、初始化eureka server对象

根据eureka server地址初始化一个eureka server的实例对象,有三种方法:

  1. eureka_client.init()
  2. eureka_client.init_registry_client()
  3. eureka_client.init_discovery_client()

其中,方法一会同时初始化注册和发现两个功能,如果只需要使用单个功能,则用方法二、三即可。

import py_eureka_client.eureka_client as eureka_client

# 服务发现只需要提供一个eureka_server地址参数即可
eureka_client.init_discovery_client("http://192.168.3.116:8761/eureka/, http://192.168.3.116:8762/eureka/")
           

二、服务注册

服务注册就是当我们的服务部署上线时,要把“服务部署在哪(几)台机器上,开放哪个端口,用啥传输协议等balabala”一些信息注册登记到Eureka server中去,这个Eureka server就好像是号码百事通一样,不需要你把每一个服务部署信息都牢记,服务更改了信息也不需要通知到每一个用户,因为有Eureka(服务注册中心)这个大管家统一管理。 

注册的环节实际在初始化的环节提供用于服务发现时的"app_name"以及端口号完成的,最主要java的eureka中每30s发送一次heartbeat证明服务没有挂掉的功能,都已经封装好了,不需要我们去实现,是不是很方便!

import py_eureka_client.eureka_client as eureka_client

your_rest_server_port = 9090
# The flowing code will register your server to eureka server and also start to send heartbeat every 30 seconds
eureka_client.init_registry_client(eureka_server="http://your-eureka-server-peer1,http://your-eureka-server-peer2",
                                app_name="your_app_name",
                                instance_port=your_rest_server_port)
           

三、服务发现

服务发现,是为了当我们需要请求某一个服务的时候,我们不知道这个服务部署在哪,即不知道ip、port等信息,无法直接发送请求,所以这时候就需要通过Eureka server获得该服务的注册信息,拿到该信息以后再请求我们需要的服务。

发现的代码也很简洁,只需要调用eureka_client.do_service()方法即可完成对服务的请求,是不是很方便!当然,由于高度封装会致使我们搞不清楚到底发生了什么,所以来仔细看看源码都做了些什么事情!

def do_service(app_name="", service="", return_type="string",
               prefer_ip=False, prefer_https=False,
               method="GET", headers=None,
               data=None, timeout=_DEFAULT_TIME_OUT,
               cafile=None, capath=None, cadefault=False, context=None):
    # 若已经初始化eureka client,则会得到一个实例cli
    cli = get_discovery_client()
    if cli is None:
        raise Exception("Discovery Client has not initialized. ")
    # 调用该实例的do_service方法
    return cli.do_service(app_name=app_name, service=service, return_type=return_type,
                          prefer_ip=prefer_ip, prefer_https=prefer_https,
                          method=method, headers=headers,
                          data=data, timeout=timeout,
                          cafile=cafile, capath=capath,
                          cadefault=cadefault, context=context)
           

入参:

  1. app_name:需要发现的服务名(注册到eureka server时用的app_name)
  2. service:具体在该服务下需要做的操作,即url
  3. return_type:即请求服务后服务端返回报文response以什么形式返回,默认是string
  4. method,headers,data:发送请求的标准格式
def do_service(self, app_name="", service="", return_type="string",
                   prefer_ip=False, prefer_https=False,
                   method="GET", headers=None,
                   data=None, timeout=_DEFAULT_TIME_OUT,
                   cafile=None, capath=None, cadefault=False, context=None):
    def walk_using_urllib(url):
            req = http_client.Request(url)
            req.get_method = lambda: method
            heads = headers if headers is not None else {}
            for k, v in heads.items():
                req.add_header(k, v)

            res_txt = http_client.load(req, data=data, timeout=timeout, cafile=cafile, capath=capath, cadefault=cadefault, context=context)
            if return_type.lower() in ("json", "dict", "dictionary"):
                return json.loads(res_txt)
            else:
                return res_txt
        # 但这段程序里实际执行的是self.walk_nodes()方法。       
        return self.walk_nodes(app_name, service, prefer_ip, prefer_https, walk_using_urllib)
           

首先在实例的do_service方法下又定义了一个walk_using_urllib(url)方法,该方法会通过构建好的url以及method,headers,data等标准请求参数完成对服务的请求,并获得返回报文response。

def walk_nodes(self, app_name="", service="", prefer_ip=False, prefer_https=False, walker=None):
        assert app_name is not None and app_name != "", "application_name should not be null"
        assert inspect.isfunction(walker) or inspect.ismethod(walker), "walker must be a method or function"
        error_nodes = []
        app_name = app_name.upper()
        # 根据app_name从eureka server获得一个服务实例(选择策略默认是随机)
        node = self.__get_available_service(app_name)

        while node is not None:
            try:
                # 根据获得的服务实例node,生成初步的url
                url = self.__generate_service_url(node, prefer_ip, prefer_https)
                # 再跟上服务具体操作service得到完整的url
                if service.startswith("/"):
                    url = url + service[1:]
                else:
                    url = url + service
                _logger.debug("service url::" + url)
                # 此处的walker就是上一步传入的walk_using_urllib()方法
                return walker(url)
            except (http_client.HTTPError, http_client.URLError):
                _logger.warn("do service %s in node [%s] error, use next node." % (service, node.instanceId))
                error_nodes.append(node.instanceId)
                # 若该服务实例被占用或者连不通,则会重新调用__get_availabel_service方法
                # 直到所有的服务实例都不可用,则抛出错误,这步实现了负载均衡
                node = self.__get_available_service(app_name, error_nodes)

        raise http_client.URLError("Try all up instances in registry, but all fail")
           

walk_nodes最重要的四个点就是:

  1. __get_available_service(app_name)用于获取服务实例(ip:port)
  2. __generate_service_url()根据服务实例以及参数service,构建请求的url
  3. 传入参数walker,即之前提到的walk_using_urllib方法,执行对服务的请求工作
  4. 实现了负载均衡的功能

继续阅读