天天看点

Zookeeper 的数据结构,java基础操作及实现分布式锁 ,实现本地负载均衡

一、Zookeeper是什么?

分布式开源框架,解决分布式协调工具

二、Zookeeper 应用场景?

1、rpc远程调用框架 + Zookeeper 注册中心命名服务)

2、发布订阅(wathcher)

3、负载均衡

4、分布式通知(wathcher)

5、Zookeeper 分布式锁 ,redis,springCloud 也可以实现

6、使用Zookeeper 做分布式配置中心

7、分布式队列(不建议)

三、Zookeeper 数据结构?

Zookeeper 的数据结构,java基础操作及实现分布式锁 ,实现本地负载均衡

1、层次化的目录结构,命名符合常规文件系统规范(类似文件系统)

2、每个节点在zookeeper中叫做znode,并且其有一个唯一的路径标识

3、节点Znode可以包含数据和子节点(但是EPHEMERAL类型的节点不能有子节点)

节点类型

a、Znode有两种类型:

1、短暂(ephemeral)

(create -e /app1/test1 “test1” 客户端断开连接zk删除ephemeral类型节点) /

2、持久(persistent)

(create -s /app1/test2 “test2” 客户端断开连接zk不删除persistent类型节点)

b、Znode有四种形式的目录节点

(默认是persistent )

PERSISTENT                    持久化节点
PERSISTENT_SEQUENTIAL         顺序自动编号持久化节点,这种节点会根据当前已存在的节点数自动加 1
EPHEMERAL                     临时节点, 客户端session超时这类节点就会被自动删除
EPHEMERAL_SEQUENTIAL          临时自动编号节点
           

四、Zookeeper 实现原理?

1、发布订阅(wathcher)

类似于mq,Zookeeper 可以监听节点的增删改操作,一旦有新消息可以立即获取

Zookeeper 的数据结构,java基础操作及实现分布式锁 ,实现本地负载均衡

2、rpc远程调用框架 + Zookeeper 注册中心命名服务 ,分布式通知(wathcher)

服务提供者把地址注册到注册中心(Zookeeper 持久节点)

–> 消费者监听注册中心

–> 消费者获得提供者注册到注册中心的rpc调用地址

–> 消费者调用提供者的rpc接口

Zookeeper 的数据结构,java基础操作及实现分布式锁 ,实现本地负载均衡

五、Zookeeper 安装(win+linux)和图形化界面工具

点击跳转 Zookeeper 安装(win+linux)和图形化界面工具

六 java 操作Zookeeper 及 wathcher监听

pom.xml

<dependency>
			<groupId>org.apache.zookeeper</groupId>
			<artifactId>zookeeper</artifactId>
			<version>3.4.6</version>
		</dependency>
           

节点权限

// OPEN_ACL_UNSAFE : 完全开放的ACL,任何连接的客户端都可以操作该属性znode

// CREATOR_ALL_ACL : 只有创建者才有ACL权限

// READ_ACL_UNSAFE:只能读取ACL

节点类型

// PERSISTENT 持久化节点

// PERSISTENT_SEQUENTIAL 顺序自动编号持久化节点,这种节点会根据当前已存在的节点数自动加 1

// EPHEMERAL 临时节点, 客户端session超时这类节点就会被自动删除

// EPHEMERAL_SEQUENTIAL 临时自动编号节点

初次连接示例代码

package com.itmayiedu.zookeeper;

import org.apache.zookeeper.*;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

public class ZookeeperDemo {
	/**
	 * 集群连接地址
	 */
	private static final String CONNECT_ADDR = "127.0.0.1:2181";
	/**
	 * session超时时间
	 */
	private static final int SESSION_OUTTIME = 2000;
	/**
	 * 信号量,阻塞程序执行,用户等待zookeeper连接成功,发送成功信号,设置值为1
	 */
	private static final CountDownLatch countDownLatch = new CountDownLatch(1);

	public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
		ZooKeeper zk = new ZooKeeper(CONNECT_ADDR, SESSION_OUTTIME, new Watcher() {

			public void process(WatchedEvent event) {
				// 获取时间的状态
				KeeperState keeperState = event.getState();
				//事件类型
				EventType tventType = event.getType();
				// 如果是建立连接
				if (KeeperState.SyncConnected == keeperState) {
					//连接事件
					if (EventType.None == tventType) {
						// 如果建立连接成功,则发送信号量,让后阻塞程序向下执行,设置值为0
						countDownLatch.countDown();
						System.out.println("zk 建立连接");
					}
				}
			}
		});
		// 进行阻塞,当值等于0,放行
		countDownLatch.await();
		//key / value  / 权限  / 节点类型  ( key不能重复)
		String result = zk.create("/ryao", "wangsong".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
		System.out.println("result:" + result);
		//等待三秒
		Thread.sleep(3000);
		//关闭连接,临时节点数据将自动删除
		zk.close();
	}
}
           

代码封装实现增删改 + watched 监听

package com.itmayiedu.zookeeper;

import org.apache.zookeeper.*;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.data.Stat;

import java.util.concurrent.CountDownLatch;

@SuppressWarnings("ALL")
public class ZkClientWatcher implements Watcher {

	// zk连接地址
	private static final String CONNECT_ADDRES = "127.0.0.1:2181";
	// 会话超时时间
	private static final int SESSIONTIME = 2000;
	// 信号量,让zk在连接之前等待,连接成功后才能往下走.
	private static final CountDownLatch countDownLatch = new CountDownLatch(1);
    // zk对象
	private ZooKeeper zk;

	/**
	 * 连接zk
	 * @param connectAddres
	 * @param sessionTimeOut
	 */
	public void createConnection(String connectAddres, int sessionTimeOut) {
		try {
			zk = new ZooKeeper(connectAddres, sessionTimeOut, this);
			//连接等待,监听到连接成功后继续执行
			countDownLatch.await();
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	/**
	 * 创建节点
	 * @param path
	 * @param data
	 * @param persistent 节点类型
	 * @return
	 */
	public boolean createPath(String path, String data,CreateMode persistent) {
		try {
			this.exists(path, true);
			this.zk.create(path, data.getBytes(), Ids.OPEN_ACL_UNSAFE,persistent );
		} catch (Exception e) {
			e.printStackTrace();
			return false;
		}
		return true;
	}

	/**
	 * 修改节点
	 * @param path
	 * @param data
	 */
	public boolean updateNode(String path, String data) throws KeeperException, InterruptedException {
		this.exists(path, true);
		this.zk.setData(path, data.getBytes(), -1);
		return false;
	}

	/**
	 * 删除节点
	 * @param path
	 * @param data
	 */
	public boolean deleteNode(String path) throws KeeperException, InterruptedException {
		this.exists(path, true);
		this.zk.delete(path,-1);
		return false;
	}

	
	/**
	 * 判断指定节点是否存在(存在监听节点)
	 * @param path 节点路径
	 */
	public Stat exists(String path, boolean needWatch) {
		try {
			return this.zk.exists(path, needWatch);
		} catch (Exception e) {
			e.printStackTrace();
			return null;
		}
	}

	/**
	 * 监听连接 、 监听增删改
	 * @param watchedEvent
	 */
	public void process(WatchedEvent watchedEvent) {
		// 获取事件状态
		KeeperState keeperState = watchedEvent.getState();
		// 获取事件类型
		EventType eventType = watchedEvent.getType();
		// zk 路径
		String path = watchedEvent.getPath();
		//System.out.println("进入到 process() keeperState:" + keeperState + ", eventType:" + eventType + ", path:" + path);
		// 判断是否建立连接
		if (KeeperState.SyncConnected == keeperState) {
			if (EventType.None == eventType) {
				System.out.println( "zk 建立连接成功!");
				// 如果建立建立成功,让后程序往下走
				countDownLatch.countDown();
			} else if (EventType.NodeCreated == eventType) {
				System.out.println("事件通知,新增node节点" + path);
			} else if (EventType.NodeDataChanged == eventType) {
				System.out.println("事件通知,当前node节点" + path + "被修改....");
			} else if (EventType.NodeDeleted == eventType) {
				System.out.println("事件通知,当前node节点" + path + "被删除....");
			}
		}
		System.out.println("--------------------------------------------------------");
	}

	/**
	 * 测试
	 * @param args
	 * @throws InterruptedException
	 */
	public static void main(String[] args) throws KeeperException, InterruptedException {
		//获得zkClientWatcher对象
		ZkClientWatcher zkClientWatcher = new ZkClientWatcher();
		//建立连接(连接地址/超时时间)
		zkClientWatcher.createConnection(CONNECT_ADDRES, SESSIONTIME);
		//添加节点(key | val | 节点类型)
		boolean createResult = zkClientWatcher.createPath("/p17", "pa-644064", CreateMode.PERSISTENT);
		//修改节点
		zkClientWatcher.updateNode("/p17","7894561");
		//删除节点
		zkClientWatcher.deleteNode("/p17");
	}
}

           

运行结果,大家可以先注掉删除,用图形化工具查看数据

Zookeeper 的数据结构,java基础操作及实现分布式锁 ,实现本地负载均衡

七、Zookeeper - 分布式锁实现

分布式锁实现原理

1、在分布式系统环境下,一个方法在同一时间只能被一个机器的一个线程执行; 
2、高可用的获取锁与释放锁; 
3、高性能的获取锁与释放锁; 
4、具备可重入特性; 
5、具备锁失效机制,防止死锁; 
6、具备非阻塞锁特性,即没有获取到锁将直接返回获取锁失败。
           
Zookeeper 的数据结构,java基础操作及实现分布式锁 ,实现本地负载均衡

Zookeeper分布式锁步骤

1、获取锁时创建临时节点,创建成功执行代码块,执行完关闭连接,自动释放锁

2、创建失败表示锁被其他jvm线程占用,进入线程等待状态

3、等待状态时监听Zookeeper节点Node删除事件,一旦监听到表示锁被其他线程释放,让其重新强占锁资源

问题分析:

1、如果1毫秒能完成两次占锁,释放锁,就是一毫秒完成了两个用户的操作,那么时间戳依旧有几率会重复

2、生成业务订单的 count 分布式项目不在一个jvm里,都是0开始,也有几率重复

3、如果让id 唯一不重复,那么让其生成订单id 的线程延迟 1 毫秒,那么时间戳不可能会有重复数据,

4、缺点:线程延迟 1 毫秒,1秒最多只能生成1000个订单

1、pom.xml

<dependency>
			<groupId>com.101tec</groupId>
			<artifactId>zkclient</artifactId>
			<version>0.10</version>
		</dependency>
           

2、自定义分布式锁接口 Lock

package com.ws.service;

/**
 * lock 锁 定义分布式锁
 */
public interface Lock {

	/**
	 * 获取锁
	 */
	void getLock();
	/**
	 * 釋放鎖
	 */
	void unLock();
}

           

3、自定义分布式锁接口 --> 抽象类 ZookeeperAbstractLock

package com.ws.service;

import org.I0Itec.zkclient.ZkClient;

import java.util.concurrent.CountDownLatch;

/**
 * 重构重复代码,将重复代码交给子类执行
 */
public abstract class ZookeeperAbstractLock implements Lock {
	/**
	 * zk连接地址
	 */
	private static final String CONNECTSTRING = "127.0.0.1:2181";
	/**
	 * 创建zk连接
	 */
	protected ZkClient zkClient = new ZkClient(CONNECTSTRING);
	/**
	 * 创建zk锁Node节点名称
	 */
	protected static final String PATH = "/lock";
	/**
	 * 信号量
	 */
	protected CountDownLatch countDownLatch = null;

	/**
	 * 获取锁
	 */
	public void getLock() {
		if (tryLock()) {
			System.out.println("###获取锁成功#####");
		} else {
			// 等待
			waitLock();
			// 重新获取锁
			getLock();
		}
	}
	/**
	 * 释放锁
	 */
	public void unLock() {
		if (zkClient != null) {
			zkClient.close();
			System.out.println("释放锁资源");
		}
	}
	/**
	 * 是否获取锁成功,成功返回true 失败返回fasle(子类实现)
	 * @return
	 */
	abstract Boolean tryLock();
	/**
	 * 等待(子类实现)
	 */
	abstract void waitLock();
}

           

4、自定义分布式锁实现类 ZookeeperDistrbuteLock 实现 抽象类的等代和获得锁方法

package com.ws.service;

import org.I0Itec.zkclient.IZkDataListener;

import java.util.concurrent.CountDownLatch;

@SuppressWarnings("ALL")
public class ZookeeperDistrbuteLock extends ZookeeperAbstractLock {

	/**
	 * 获得锁
	 * @return
	 */
	@Override
	Boolean tryLock() {
		try {
			//创建临时节点,获得锁
			zkClient.createEphemeral(PATH);
			return true;
		} catch (Exception e) {
			return false;
		}
	}

	/**
	 * 等代锁被释放
	 */
	@Override
	void waitLock() {
		// 使用事件监听,获取到节点被删除,
		IZkDataListener iZkDataListener = new IZkDataListener() {
			// 当节点被删除,唤醒线程,强占锁资源
			public void handleDataDeleted(String dataPath) throws Exception {
				if (countDownLatch != null) {
					// 唤醒线程
					countDownLatch.countDown();
				}
			}
			// 当节点发生改变
			public void handleDataChange(String dataPath, Object data) throws Exception {

			}
		};
		// 注册节点信息监听事件通知(主要监听节点被删除)
		zkClient.subscribeDataChanges(PATH, iZkDataListener);
		if (zkClient.exists(PATH)) {
			// 创建信号量
			countDownLatch = new CountDownLatch(1);
			try {
				// 等待,一旦节点被删除唤醒线程,执行后续逻辑
				countDownLatch.await();
			} catch (Exception e) {
                e.printStackTrace();
			}
		}
		// 删除事件通知
		zkClient.unsubscribeDataChanges(PATH, iZkDataListener);
	}

}


           

5、模拟生成订单号规则

我这里延迟1000方便测试、 Thread.sleep(1000);

修改成1毫秒,可保证时间戳唯一不重复

package com.ws;

import java.text.SimpleDateFormat;
import java.util.Date;

/**
 * 生成订单号规则 使用时间戳+业务id
 */
public class OrderNumGenerator {
	// 业务ID
	private static int count = 0;
	/**
	 * 生成订单号
	 * @return
			 */
	public String getNumber() {
		try {
			Thread.sleep(1000);
		} catch (Exception e) {
			// TODO: handle exception
		}
			SimpleDateFormat simpt = new SimpleDateFormat("yyyyMMddHHmm-ss-SSS");
		return simpt.format(new Date()) + "-" + ++count;
	}
}

           

6、测试 (建议开启main方法运行,查看时间戳是否有重复)

分布式系统下锁住的代码块每次只有一个jvm的一个线程能执行了

package com.ws;

import com.ws.service.Lock;
import com.ws.service.ZookeeperDistrbuteLock;

/**
 * 订单生成调用业务逻辑
 */
public class OrderService implements Runnable {
	/**
	 * 生成订单号对象
	 */
	OrderNumGenerator orderNumGenerator = new OrderNumGenerator();
	/**
	 * 自定义分布式锁对象
	 */
	private Lock lock = new ZookeeperDistrbuteLock();
	/**
	 * 模拟每一个用户
	 */
	public void run() {
		try {
			// 上锁, 单机系统使用--synchronized (this) {
			lock.getLock();

			//==========锁住的代码块开始,每次只有一个jvm线程能运行============================
			//模拟生成订单号
			String number = orderNumGenerator.getNumber();
			System.out.println(Thread.currentThread().getName() + ",##number:" + number);
			//==========锁住的代码块结束,每次只有一个jvm线程能运行===========================
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			// 釋放鎖
			lock.unLock();
		}
	}


	/**
	 * 测试运行
	 * @param args
	 */
	public static void main(String[] args) {
		System.out.println("##模拟生成订单号开始...");
		for (int i = 0; i < 100; i++) {
			new Thread(new OrderService()).start();
		}
	}
}

           

线程延迟单位秒,查看测试结果秒数,是没重复的,业务逻辑运行时间过长会缺,但不会重复

实际延迟时间改成毫秒就ok了,保证时间戳不重复就行

Zookeeper 的数据结构,java基础操作及实现分布式锁 ,实现本地负载均衡

八、Zookeeper-负载均衡

实现流程

1、创建zk 注册地址父节点: 如member(持久节点)

2、创建zksocket 服务端,并把服务端的 ip+端口存放在 zk 注册地址下( 临时节点)

------------zksocket 临时节点名= 端口,

------------zksocket 临时节点值= socket服务ip+端口

------------多个 zksocket 服务端的临时节点在一个注册地址父节点下

3、客户端启动时获取zk连接,并获得zk 注册地址父节点下的所有子节点,并保存到list 集合

4、获取完服务列表后,需要监听服务状态,一旦其中一个服务器挂了,临时节点被自动删除,监听到删除刷新list 集合的服务列表数据

5、发送请求时根据服务列表的数据使用取模算法获取其中一个,让其轮训,也可以get(0),实现主从模式,也可以随便自定义算法的啦

Zookeeper 的数据结构,java基础操作及实现分布式锁 ,实现本地负载均衡

下面代码示例

这里使用的socket服务器,web服务也是一起,服务端启动项目把ip+端口注册到注册中心,客户端取注册中心的服务列表数据就好了

zk依赖

<dependency>
			<groupId>com.101tec</groupId>
			<artifactId>zkclient</artifactId>
			<version>0.8</version>
		</dependency>
           

服务器socket

package com.itmayiedu;

import org.I0Itec.zkclient.ZkClient;

import java.io.IOException;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.UnknownHostException;

/**
 * ServerScoekt服务端
 */
public class ZkServerScoekt implements Runnable {
	/**
	 * socket服务端口
	 */
	private int port = 0;
	/**
	 * zk地址
	 */
	private String ZK_SERVER = "127.0.0.1:2181";
	/**
	 * 构造器-启动服务设置端口号
	 * @param port
	 */
	public ZkServerScoekt(int port) {
		this.port = port;
	}

	public void run() {
		ServerSocket serverSocket = null;
		try {
			//启动socket服务
			serverSocket = new ServerSocket(port);
			//将服务信息注册到注册中心上去,key=server-端口,value= ip+端口(服务器完整地址)
			regServer();
			System.out.println("Server start port:" + port);
			Socket socket = null;
			//监听客户端消息
			while (true) {
				socket = serverSocket.accept();
				new Thread(new ServerHandler(socket)).start();
			}
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			try {
				if (serverSocket != null) {
					serverSocket.close();
				}
			} catch (Exception e2) {

			}
		}
	}


	/**
	 * 将服务信息注册到注册中心上去
	 */
	public void regServer() throws UnknownHostException {
		//获得zk连接
		ZkClient zkClient = new ZkClient(ZK_SERVER, 6000, 1000);
		//注册ip+端口临时节点名/注意,必须先创建父节点 /member,否则无法创建成功
		String path = "/member/server-" + port;
		//节点是否存在,存在删除
		if (zkClient.exists(path)) {
			zkClient.delete(path);
		}
		//获得本机ip
		String ip = InetAddress.getLocalHost().getHostAddress();
		String value=ip +":"+ port;
		//创建临时节点
		zkClient.createEphemeral(path, value);
		System.out.println("##服务注册成功###"+value);
	}


	/**
	 * 启动server服务
	 * @param args
	 * @throws IOException
	 */
	public static void main(String[] args) throws IOException {
		//socket地址
		int port = 18080;
		//创建socket服务
		ZkServerScoekt server = new ZkServerScoekt(port);
		//
		Thread thread = new Thread(server);
		thread.start();
	}
}

           

服务器监听消息打印方法类

package com.itmayiedu;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;

/**
 * 服务端接收消息
 */
public class ServerHandler implements Runnable {
	/**
	 * socket对象
	 */
	private Socket socket;

	public ServerHandler(Socket socket) {
		this.socket = socket;
	}

	public void run() {
		BufferedReader in = null;
		PrintWriter out = null;
		try {
			in = new BufferedReader(new InputStreamReader(this.socket.getInputStream()));
			out = new PrintWriter(this.socket.getOutputStream(), true);
			String body = null;
			//打印消息
			while (true) {
				body = in.readLine();
				if (body == null){
					break;
				}else{
					System.out.println("Receive : " + body);
				}
				out.println("Hello, " + body);
			}
		} catch (Exception e) {
			if (in != null) {
				try {
					in.close();
				} catch (IOException e1) {
					e1.printStackTrace();
				}
			}
			if (out != null) {
				out.close();
			}
			if (this.socket != null) {
				try {
					this.socket.close();
				} catch (IOException e1) {
					e1.printStackTrace();
				}
				this.socket = null;
			}
		}
	}
}

           

客户端

package com.itmayiedu;

import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.ZkClient;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;
import java.util.ArrayList;
import java.util.List;

/**
 * 客户端
 */
@SuppressWarnings("ALL")
public class ZkServerClient {
	/**
	 * 获取所有的服务地址
	 */
	public static List<String> listServer = new ArrayList<String>();
	/**
	 * socket服务端口
	 */
	private int port = 0;
	/**
	 * zk地址
	 */
	private static String ZK_SERVERS = "127.0.0.1:2181";
	/**
	 * 服务调用次数
	 */
	private static int count = 1;

	/**
	 * 获得注册的所有server服务
	 */
	public static void initServer() {
		//清除所有注册地址
		listServer.clear();
		// zk注册地址父节点
		final String memberServerPath = "/member";
		// 获取zk连接
		final ZkClient zkClient = new ZkClient(ZK_SERVERS, 6000, 1000);
		// 获取所有注册地址下子节点(服务列表)
		List<String> children = zkClient.getChildren(memberServerPath);
		// 保存服务列表到 listServer集合
		for (String p : children) {
			listServer.add((String) zkClient.readData(memberServerPath + "/" + p));
		}
		System.out.println("最新服务信息listServer:" + listServer.toString());
		// 订阅父节点下所有子节点事件
		zkClient.subscribeChildChanges(memberServerPath, new IZkChildListener() {
			// 子节点发生变化
			public void handleChildChange(String parentPath, List<String> childrens) throws Exception {
				//清除所有注册地址
				listServer.clear();
				//重新载入服务列表
				for (String subP : childrens) {
					// 读取子节点value值
					listServer.add((String) zkClient.readData(memberServerPath + "/" + subP));
				}
			}
		});
	}

	/**
	 *
	 * 获取当前server信息
	 * 负载均衡算法
	 * @return
	 */
	public static String getServer() {
		//轮训 --> 使用取模算法,服务调用次数%服务列表数量
		String serverName = listServer.get(count % listServer.size());
		++count;

		//主从模式,永远取第一个服务,当第一个服务挂了,会监听到并从服务列表中移除,从而在get(0),拿到的就是备用机的了
		//String serverName = listServer.get(0);
		return serverName;
	}

	/**
	 * 发送socket消息并获得回复,死方法
	 * @param name
	 */
	public void send(String name) {
		//通过负载均衡算法获得服务列表中的一个服务地址
		String server = ZkServerClient.getServer();
		String[] cfg = server.split(":");
		Socket socket = null;
		BufferedReader in = null;
		PrintWriter out = null;
		try {
			socket = new Socket(cfg[0], Integer.parseInt(cfg[1]));
			in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
			out = new PrintWriter(socket.getOutputStream(), true);
			out.println(name);
			while (true) {
				String resp = in.readLine();
				if (resp == null){
					break;
				}else if (resp.length() > 0) {
					System.out.println("Receive : " + resp);
					break;
				}
			}
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			if (out != null) {
				out.close();
			}
			if (in != null) {
				try {
					in.close();
				} catch (IOException e) {
					e.printStackTrace();
				}
			}
			if (socket != null) {
				try {
					socket.close();
				} catch (IOException e) {
					e.printStackTrace();
				}
			}
		}
	}


	public static void main(String[] args) {
		initServer();
		ZkServerClient client = new ZkServerClient();
		BufferedReader console = new BufferedReader(new InputStreamReader(System.in));
		while (true) {
			String name;
			try {
				name = console.readLine();
				if ("exit".equals(name)) {
					System.exit(0);
				}
				client.send(name);
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
	}
}

           

测试结果

我们可以看到客户端发的4条消息依次分发给两台服务器了,每台服务器到2到消息

服务端一

Zookeeper 的数据结构,java基础操作及实现分布式锁 ,实现本地负载均衡

服务端二

Zookeeper 的数据结构,java基础操作及实现分布式锁 ,实现本地负载均衡

客服端

Zookeeper 的数据结构,java基础操作及实现分布式锁 ,实现本地负载均衡

继续阅读