天天看點

zookeeper實作高可用

原來的項目是main方法直接啟動的jar包,但不能高可用,根據上司要求,增加高可用,在前面包一層。具體的看代碼:

1、導入Jar包:

<dependency>
			<groupId>org.apache.curator</groupId>
			<artifactId>curator-recipes</artifactId>
			<version>2.7.1</version>
		</dependency>
           

2、

HAMain.java

package com.chinaunicom;

import java.util.List;
import java.util.Properties;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.CountDownLatch;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.chinaunicom.util.FileReadUtil;

public class HAMain {

	private static Logger log = LoggerFactory.getLogger(HAMain.class.getName());
	// 在zk上注冊的機器節點清單
		public static SortedSet<String> servers;
		// 由zk自動遞增配置設定的節點id
		public static String myNodeID;

		private ZooKeeper zk;
		private final Stat stat = new Stat();
		// 應用的頂層目錄
		private  String spath = "/dtsEs";
		// 分隔符
		private final String delimiter = "/";
		//topicName
		private static  String topicName="";
		 //屬性檔案
		 private static Properties prop;
		 
		 private static boolean isUP=false;
		
		private CountDownLatch connectedSignal = new CountDownLatch(1);

		/**
		 * 主程式,讀取zookeeper配置資訊,連接配接zookeeper
		 * 監聽節點變化資訊。如果節點變化了,則判斷目前節點是否為最小節點,如果是最小節點,則運作主程式
		 * @param id
		 * @throws Exception
		 */
		public HAMain(String id) throws Exception {
			FileReadUtil propBean = new FileReadUtil();
			//獲得zookeeper配置資訊
			prop = propBean.getProperties("dts_es.properties");
			String connectString=prop.getProperty("zookeeper.connectString");
			int sessionTimeOut=Integer.parseInt(prop.getProperty("zookeeper.sessionTimeout"));
			try {
				// 建立一個與伺服器的連接配接
				zk = new ZooKeeper(connectString, sessionTimeOut , new Watcher() {
					@Override
					public void process(WatchedEvent event) {
						log.info("-------node Change:" + event);
						// 如果發生了spath節點下的子節點變化事件, 更新server清單, 并重新注冊監聽
						if (event.getType() == EventType.NodeChildrenChanged
								&& spath.equals(event.getPath())) {
							try {
								updateServerList();
							} catch (Exception e) {
								e.printStackTrace();
							}
						}
					}
				});
				createParentDirectory(id);
				createAppNode(id);
				updateServerList();
           
//此處開始阻塞
				connectedSignal.await();
           
//如果滿足條件,則開始執行任務
				log.info("我開始做任務啦!~~~myNodeID:"+HAMain.myNodeID);
				ClusterClientCore coreMain=new ClusterClientCore();
				String param[]={topicName};
				coreMain.main(param);
				log.error("我挂啦!~~~myNodeID:"+HAMain.myNodeID);
			} catch (Exception e) {
				e.printStackTrace();
			}
		}

		/**
		 * 建立一個子目錄節點
		 * @param id
		 * @throws Exception
		 */
		public void createAppNode(String id) throws Exception {
			myNodeID = zk.create(spath + delimiter, id.getBytes("utf-8"),
					Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
			myNodeID = myNodeID.substring(myNodeID.lastIndexOf('/') + 1);
		}

		/**
		 * 如果不存在則建立頂層目錄
		 * @param id
		 * @throws Exception
		 */
		public void createParentDirectory(String id) throws Exception {
			spath=spath+delimiter+id;
			Stat stat = null;
			try {
				stat = zk.exists(spath, true);
			} catch (Exception e) {
				e.printStackTrace();
			}
			if (stat == null) {
				zk.create(spath, id.getBytes("utf-8"),
						Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
			}
		}

		/**
		 * 更新伺服器節點清單
		 * @throws Exception
		 */
		public void updateServerList() throws Exception {
			SortedSet<String> set = new TreeSet<String>();
			// 擷取并監聽spath的子節點變化
			// watch參數為true, 表示監聽子節點變化事件.
			// 每次都需要重新注冊監聽, 因為一次注冊, 隻能監聽一次事件, 如果還想繼續保持監聽, 必須重新注冊
			List<String> subList = zk.getChildren(spath, true);
			for (String subNode : subList) {
				// 擷取每個子節點下關聯的server位址
//				 byte[] data = zk.getData(spath + delimiter + subNode, false, stat);
//				 log.info(subNode + "\t" + stat);
//				 String sdata = new String(data, "utf-8");
				set.add(subNode);
			}
			servers = set;
			 // 取消阻塞服務
			cancelAwait();
		}
		
		/**
		 * 如果目前是最小的節點,則取消阻塞
		 */
		public void cancelAwait() {
			String minNode = HAMain.servers.first();
			log.info("目前節點:"+HAMain.myNodeID + ",最小的節點:  " + minNode);
			if (HAMain.myNodeID.equals(minNode)) {// 驗證本機是否是最小節點
				if(!isUP){//如果沒有啟動過
					isUP=true;
				connectedSignal.countDown();
				}
			}
		}

		// 關閉連接配接
		public void close() throws InterruptedException {
			zk.close();
		}
		/**
		 * chengxu rukou  
		 * @param args
		 * @throws Exception
		 */
		public static void main(String[] args) throws Exception {
			String nodeid = "autoid";
			if (args.length == 1) {
				nodeid = args[0];
				topicName=nodeid;
			}
			new HAMain(nodeid);
		}
}
           

檔案工具類:主要讀取配置檔案

FileReadUtil:

package com.chinaunicom.util;

import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.io.Resource;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;

import com.alibaba.fastjson.JSON;


/**
 * 屬性檔案工具類
 * @author zsh
 *
 */
public class FileReadUtil {
 
 private static Logger log = LoggerFactory.getLogger(FileReadUtil.class);
 
 
 public static void main(String[] args) {
  FileReadUtil f = new FileReadUtil();
  f.getFileList();
 }
  

 
 
 
 /**
  * 擷取某個jar包下面檔案夾的檔案集合
  * @return
  */
 public Resource[] getFileList(){
  Resource[] workflowResources  = null;
  try {
   final PathMatchingResourcePatternResolver pmrpr = new PathMatchingResourcePatternResolver(FileReadUtil.class.getClassLoader());
   workflowResources = pmrpr.getResources("classpath*:tableJson/*.json");
   if (workflowResources == null || workflowResources.length == 0) {
    workflowResources = pmrpr.getResources("classpath*:resources/tableJson/*.json");
   }
  } catch (IOException e1) {
   e1.printStackTrace();
  }
  /*
  try {
   String templateDir = fileName+File.separator;
   log.error(templateDir);
         URL templateUri = FileReadUtil.class.getClassLoader().getResource(templateDir);
         if(templateUri != null) {
             File fileTemplateDir = new File(templateUri.toURI());
             templateFiles = fileTemplateDir.listFiles(new FilenameFilter(){
              public boolean accept(File dir, String name) {
               return name.endsWith(".json");
              }
             });
         }
  } catch (URISyntaxException e) {
   e.printStackTrace();
  }*/
  return workflowResources;
 }
 
 
 
 /**
  * 擷取檔案夾中所有檔案名
  * @param path
  * @return
  */
 public String[] getFileName(String fileName){
        File file = new File(fileName);
        String[] fileNames = file.list();
        return fileNames;
    }
 
 /**
     * 以行為機關讀取檔案,常用于讀面向行的格式化檔案
     */
    public String readFileByLines(InputStream inputStream) {
//     InputStream inputStream = this.getClass().getClassLoader().getResourceAsStream(fileName);  
     ByteArrayOutputStream outStream = new ByteArrayOutputStream();  
     try {
         byte[] data = new byte[1024];  
         int count = -1;  
         while((count = inputStream.read(data,0,1024)) != -1)  
             outStream.write(data, 0, count);  
           
         data = null;  
         inputStream.close();
         outStream.close();
         return new String(outStream.toByteArray(),"utf-8"); 
     } catch (IOException e) {
      e.printStackTrace();
     }  
     return "";
     /*StringBuffer str = new StringBuffer();
        BufferedReader reader = null;
        try {
            reader = new BufferedReader(new FileReader(file));
            String tempString = null;
            // 一次讀入一行,直到讀入null為檔案結束
            while ((tempString = reader.readLine()) != null) {
                // 顯示行号
             str.append(tempString);
            }
            reader.close();
            log.debug("表資訊讀取成功!");
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            if (reader != null) {
                try {
                    reader.close();
                } catch (IOException e1) {
                }
            }
        }
        return str.toString();*/
    }
 
 /**
  * 讀取屬性檔案的内容
  * @return
  */
 public  Properties getProperties(String proName){  
  this.getClass().getClassLoader();
  Properties props = null;
     try {
      InputStream inputStream = this.getClass().getClassLoader().getResourceAsStream(proName);  
      props = new Properties();
      props.load(inputStream);
      log.error("加載配置檔案屬性實體類初始化成功!");
  } catch (IOException e) {
   e.printStackTrace();
  } finally{
  } 
     return props;
 }  
}

           

代碼配置完畢。然後再在POM.XML中修改mian方法執行的入口。

<build>
		<plugins>
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-jar-plugin</artifactId>
				<version>2.4</version>
				<configuration>
					<archive>
						<manifest>
							<addClasspath>true</addClasspath>
							<classpathPrefix>lib/</classpathPrefix>
							<mainClass>com.**.HAMain</mainClass>
						</manifest>
					</archive>
				</configuration>
			</plugin>
		</plugins>

	</build>
           

至此,高可用配置完畢。把項目打完jar包,放在Linux下,執行 

java -jar ***-0.0.1-SNAPSHOT.jar params> 1.log &    即可執行。

也可以在指令前面增加 nohup 在背景執行。例如:nohup java -jar dtsEs-0.0.1-SNAPSHOT.jar rdsa365i83st0q3h8yi5 > 1.log &