天天看点

consul API,使用API来管理 consul

本次测试了下面几个 api

consul_ip="10.0.26.104"

service_name="base-gateway"

查看consul集群所有 node :

curl "http://${consul_ip}:8500/v1/catalog/nodes" | jq
           
consul API,使用API来管理 consul

查看consul集群所有 service:

curl "http://${consul_ip}:8500/v1/internal/ui/services?dc=dc1" | jq
           

查看consul集群内某个service下的服务详情(服务下的ID可以从这里查看):

curl "http://${consul_ip}:8500/v1/health/service/${service_name}?dc=dc1" | jq
           

删除consul集群内某个ID:

curl -XPUT "http://${consul_ip}:8500/v1/agent/service/deregister/${service_id}"
           

查看consul集群内所有 critical (失效)状态的

ID: "http://${consul_ip}:8500/v1/health/state/critical" | jq
           

查看consul集群内所有 passing (有效)状态的

ID: "http://${consul_ip}:8500/v1/health/state/critical" | jq
           

=============================================

现在可以通过这些API拿到这些信息,那么接下来撸一段儿代码,来利用起来。当前碰到了这种情况,某些地方的 consul 管理页面不在一个内网,且对面没有windows,不给代理。于是就突发奇想,研究一个linux端的管理工具。

使用python写的,为了直接可以放在 centos 上执行,不安装任何依赖包,于是下面代码使用了 python2 ,模块用了已经接近淘汰的 urllib和 urllib2 , 代码如下:

vim manage_consul.py

# -*- coding: utf-8 -*-
import sys
import json
import urllib,urllib2


class ManageConsulServiceNode(object):
    """使用 consul 接口管理服务注册情况"""
    def __init__(self, consul_ip):
        """基本数据"""
        self.consul_ip = consul_ip
        self.get_consul_nodes_url = "http://{consul_ip}:8500/v1/catalog/nodes"
        self.get_consul_service_url = "http://{consul_ip}:8500/v1/internal/ui/services?dc=dc1"
        self.get_consul_service_name_url = "http://{consul_ip}:8500/v1/health/service/{service_name}?dc=dc1"
        self.put_consul_service_id_url = "http://{consul_ip}:8500/v1/agent/service/deregister/{service_id}"
        self.query_consul_critical_url = "http://{consul_ip}:8500/v1/health/state/critical"

    def consul_ip_list(self):
        """获取consul集群节点ip"""
        ip_list = []
        get_consul_nodes_url = self.get_consul_nodes_url.format(consul_ip=self.consul_ip)
        request = urllib2.Request(get_consul_nodes_url)
        response = urllib2.urlopen(request)
        json_data = json.loads(response.read())
        for data_dict in json_data:
            consul_ip = data_dict["Address"]
            ip_list.append(consul_ip)
        return ip_list

    def get_consul_service_status(self):
        """获取consul内所有服务"""
        service_list = []
        service_dict = {}
        get_consul_service_url = self.get_consul_service_url.format(consul_ip=self.consul_ip)
        request = urllib2.Request(get_consul_service_url)
        response = urllib2.urlopen(request)
        json_data = json.loads(response.read())
        for service_dict in json_data:
            service_dict["service_name"] = service_dict["Name"]
            service_dict["service_critical"] = service_dict["ChecksCritical"]
            service_dict["service_passing"] = service_dict["ChecksPassing"]/2
            service_list.append(service_dict)
        return service_list

    def get_consul_service_info(self):
        """获取各服务内节点状态,并打印"""
        print("")
        print("--- consul 集群中当前服务如下: ---")
        print("\t当前 consul 集群节点: {0}\n".format(self.consul_ip_list()))
        for service_name_dict in self.get_consul_service_status():
            service_name = service_name_dict["service_name"]
            if "consul" in service_name:
                continue
            head_mess = "\t\033[1;32m服务名: {0:^20} -存活的节点数: {1}, -失效的节点数: {2}\033[0m".format(
                                                                                           service_name_dict["service_name"],
                                                                                           service_name_dict["service_critical"],
                                                                                           service_name_dict["service_passing"],
                                                                                           chr(250)
                                                                                           )
            print(head_mess)
            get_consul_service_url = self.get_consul_service_name_url.format(consul_ip=self.consul_ip,service_name=service_name)
            request = urllib2.Request(get_consul_service_url)
            response = urllib2.urlopen(request)
            json_data = json.loads(response.read())
            for service_info in json_data:
                service_id = service_info["Service"]["ID"]
                service_ipaddr = service_info["Service"]["Address"]
                service_port = service_info["Service"]["Port"]
                service_status = service_info["Checks"][1]["Status"]
                boday_mess = "\t\t服务ID: {0:^60} 服务地址: {1}:{2} 状态: {3}".format(
                                                                                  service_id,
                                                                                  service_ipaddr,
                                                                                  service_port,
                                                                                  service_status,
                                                                                  chr(250)
                                                                                  )
                print boday_mess
            print("")

    def deregister_consul_service_id(self, service_id):
        """发送 PUT 请求注销 consul 中服务 id"""
        for consul_ip in self.consul_ip_list():
            print("--- 清理{0}节点的: {1}".format(consul_ip, service_id))
            put_consul_service_id_url = self.put_consul_service_id_url.format(consul_ip=consul_ip,service_id=service_id)
            request = urllib2.Request(put_consul_service_id_url)
            request.get_method = lambda: 'PUT'
            ret = urllib2.urlopen(request).read()
            print(ret)

    def deregister_consul_all_critical_id(self, deregister_service=None):
        """注销 consul 内所有 critical 状态的服务 ID"""
        query_consul_critical_url = self.query_consul_critical_url.format(consul_ip=self.consul_ip)
        request = urllib2.Request(query_consul_critical_url)
        response = urllib2.urlopen(request)
        json_data = json.loads(response.read())
        for critical_id_dict in json_data:
            if deregister_service is None:
                critical_id = critical_id_dict["ServiceID"]
                self.deregister_consul_service_id(critical_id)
            else:
                critical_service_name = critical_id_dict["ServiceName"]
                if deregister_service == critical_service_name:
                    critical_id = critical_id_dict["ServiceID"]
                    self.deregister_consul_service_id(critical_id)


if __name__ == "__main__":
    """按参数做相应处理"""
    parameter_sum = len(sys.argv)
    self_name = sys.argv[0]
    if parameter_sum < 2 or parameter_sum == 1:
        query_mess = "列出consul内所有服务和节点: {0} {1}".format(self_name, "[consul ip]")
        deregister_mess = "注销某个服务ID: {0} {1} {2}".format(self_name, "[consul ip]", "[service id]")
        deregister_critical_all_mess = "注销所有服务下的失效ID: {0} {1} {2}".format(self_name, "[consul ip]", "[critical]")
        deregister_critical_service_mess = "注销某个服务下的失效ID: {0} {1} {2} {3}".format(self_name, "[consul ip]", "[critical]", "[service name]")
        print(query_mess)
        print(deregister_mess)
        print(deregister_critical_all_mess)
        print(deregister_critical_service_mess)
        sys.exit(deregister_mess)

    ### 调用 consul api
    ip_parameter = sys.argv[1]
    manage_consul_service = ManageConsulServiceNode(ip_parameter)
    # 带一个参数,直接列出所有服务的节点
    if parameter_sum == 2:
        manage_consul_service.get_consul_service_info()
    # 带两个参数时,第二个参数为 critical 时清理所有失效ID,第二个参数为服务 ID 时只注销当前 ID
    if parameter_sum == 3:
        service_id = sys.argv[2]
        if service_id == "critical":
            manage_consul_service.deregister_consul_all_critical_id()
        else:
            manage_consul_service.deregister_consul_service_id(service_id)
    # 带三个参数时,第二个参数为 critical, 第三个参数为对应的 service 名称,即可注销对应 service 下的所有失效节点
    if parameter_sum == 4:
        service_name = sys.argv[3]
        manage_consul_service.deregister_consul_all_critical_id(service_name)
           

执行效果如下图

空的执行脚本,打印使用方式

consul API,使用API来管理 consul

 列出所有服务和下面的 ID

consul API,使用API来管理 consul

 清理所有已经失效的ID

consul API,使用API来管理 consul

 清理某个没有节点(没有失效的节点也可被清理):

consul API,使用API来管理 consul

 清理某个服务下的失效节点:

consul API,使用API来管理 consul

继续阅读