原來的項目是main方法直接啟動的jar包,但不能高可用,根據上司要求,增加高可用,在前面包一層。具體的看代碼:
1、導入Jar包:
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.7.1</version>
</dependency>
2、
HAMain.java
package com.chinaunicom;
import java.util.List;
import java.util.Properties;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.chinaunicom.util.FileReadUtil;
public class HAMain {
private static Logger log = LoggerFactory.getLogger(HAMain.class.getName());
// 在zk上注冊的機器節點清單
public static SortedSet<String> servers;
// 由zk自動遞增配置設定的節點id
public static String myNodeID;
private ZooKeeper zk;
private final Stat stat = new Stat();
// 應用的頂層目錄
private String spath = "/dtsEs";
// 分隔符
private final String delimiter = "/";
//topicName
private static String topicName="";
//屬性檔案
private static Properties prop;
private static boolean isUP=false;
private CountDownLatch connectedSignal = new CountDownLatch(1);
/**
* 主程式,讀取zookeeper配置資訊,連接配接zookeeper
* 監聽節點變化資訊。如果節點變化了,則判斷目前節點是否為最小節點,如果是最小節點,則運作主程式
* @param id
* @throws Exception
*/
public HAMain(String id) throws Exception {
FileReadUtil propBean = new FileReadUtil();
//獲得zookeeper配置資訊
prop = propBean.getProperties("dts_es.properties");
String connectString=prop.getProperty("zookeeper.connectString");
int sessionTimeOut=Integer.parseInt(prop.getProperty("zookeeper.sessionTimeout"));
try {
// 建立一個與伺服器的連接配接
zk = new ZooKeeper(connectString, sessionTimeOut , new Watcher() {
@Override
public void process(WatchedEvent event) {
log.info("-------node Change:" + event);
// 如果發生了spath節點下的子節點變化事件, 更新server清單, 并重新注冊監聽
if (event.getType() == EventType.NodeChildrenChanged
&& spath.equals(event.getPath())) {
try {
updateServerList();
} catch (Exception e) {
e.printStackTrace();
}
}
}
});
createParentDirectory(id);
createAppNode(id);
updateServerList();
//此處開始阻塞
connectedSignal.await();
//如果滿足條件,則開始執行任務
log.info("我開始做任務啦!~~~myNodeID:"+HAMain.myNodeID);
ClusterClientCore coreMain=new ClusterClientCore();
String param[]={topicName};
coreMain.main(param);
log.error("我挂啦!~~~myNodeID:"+HAMain.myNodeID);
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 建立一個子目錄節點
* @param id
* @throws Exception
*/
public void createAppNode(String id) throws Exception {
myNodeID = zk.create(spath + delimiter, id.getBytes("utf-8"),
Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
myNodeID = myNodeID.substring(myNodeID.lastIndexOf('/') + 1);
}
/**
* 如果不存在則建立頂層目錄
* @param id
* @throws Exception
*/
public void createParentDirectory(String id) throws Exception {
spath=spath+delimiter+id;
Stat stat = null;
try {
stat = zk.exists(spath, true);
} catch (Exception e) {
e.printStackTrace();
}
if (stat == null) {
zk.create(spath, id.getBytes("utf-8"),
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
}
/**
* 更新伺服器節點清單
* @throws Exception
*/
public void updateServerList() throws Exception {
SortedSet<String> set = new TreeSet<String>();
// 擷取并監聽spath的子節點變化
// watch參數為true, 表示監聽子節點變化事件.
// 每次都需要重新注冊監聽, 因為一次注冊, 隻能監聽一次事件, 如果還想繼續保持監聽, 必須重新注冊
List<String> subList = zk.getChildren(spath, true);
for (String subNode : subList) {
// 擷取每個子節點下關聯的server位址
// byte[] data = zk.getData(spath + delimiter + subNode, false, stat);
// log.info(subNode + "\t" + stat);
// String sdata = new String(data, "utf-8");
set.add(subNode);
}
servers = set;
// 取消阻塞服務
cancelAwait();
}
/**
* 如果目前是最小的節點,則取消阻塞
*/
public void cancelAwait() {
String minNode = HAMain.servers.first();
log.info("目前節點:"+HAMain.myNodeID + ",最小的節點: " + minNode);
if (HAMain.myNodeID.equals(minNode)) {// 驗證本機是否是最小節點
if(!isUP){//如果沒有啟動過
isUP=true;
connectedSignal.countDown();
}
}
}
// 關閉連接配接
public void close() throws InterruptedException {
zk.close();
}
/**
* chengxu rukou
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
String nodeid = "autoid";
if (args.length == 1) {
nodeid = args[0];
topicName=nodeid;
}
new HAMain(nodeid);
}
}
檔案工具類:主要讀取配置檔案
FileReadUtil:
package com.chinaunicom.util;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.io.Resource;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
import com.alibaba.fastjson.JSON;
/**
* 屬性檔案工具類
* @author zsh
*
*/
public class FileReadUtil {
private static Logger log = LoggerFactory.getLogger(FileReadUtil.class);
public static void main(String[] args) {
FileReadUtil f = new FileReadUtil();
f.getFileList();
}
/**
* 擷取某個jar包下面檔案夾的檔案集合
* @return
*/
public Resource[] getFileList(){
Resource[] workflowResources = null;
try {
final PathMatchingResourcePatternResolver pmrpr = new PathMatchingResourcePatternResolver(FileReadUtil.class.getClassLoader());
workflowResources = pmrpr.getResources("classpath*:tableJson/*.json");
if (workflowResources == null || workflowResources.length == 0) {
workflowResources = pmrpr.getResources("classpath*:resources/tableJson/*.json");
}
} catch (IOException e1) {
e1.printStackTrace();
}
/*
try {
String templateDir = fileName+File.separator;
log.error(templateDir);
URL templateUri = FileReadUtil.class.getClassLoader().getResource(templateDir);
if(templateUri != null) {
File fileTemplateDir = new File(templateUri.toURI());
templateFiles = fileTemplateDir.listFiles(new FilenameFilter(){
public boolean accept(File dir, String name) {
return name.endsWith(".json");
}
});
}
} catch (URISyntaxException e) {
e.printStackTrace();
}*/
return workflowResources;
}
/**
* 擷取檔案夾中所有檔案名
* @param path
* @return
*/
public String[] getFileName(String fileName){
File file = new File(fileName);
String[] fileNames = file.list();
return fileNames;
}
/**
* 以行為機關讀取檔案,常用于讀面向行的格式化檔案
*/
public String readFileByLines(InputStream inputStream) {
// InputStream inputStream = this.getClass().getClassLoader().getResourceAsStream(fileName);
ByteArrayOutputStream outStream = new ByteArrayOutputStream();
try {
byte[] data = new byte[1024];
int count = -1;
while((count = inputStream.read(data,0,1024)) != -1)
outStream.write(data, 0, count);
data = null;
inputStream.close();
outStream.close();
return new String(outStream.toByteArray(),"utf-8");
} catch (IOException e) {
e.printStackTrace();
}
return "";
/*StringBuffer str = new StringBuffer();
BufferedReader reader = null;
try {
reader = new BufferedReader(new FileReader(file));
String tempString = null;
// 一次讀入一行,直到讀入null為檔案結束
while ((tempString = reader.readLine()) != null) {
// 顯示行号
str.append(tempString);
}
reader.close();
log.debug("表資訊讀取成功!");
} catch (IOException e) {
e.printStackTrace();
} finally {
if (reader != null) {
try {
reader.close();
} catch (IOException e1) {
}
}
}
return str.toString();*/
}
/**
* 讀取屬性檔案的内容
* @return
*/
public Properties getProperties(String proName){
this.getClass().getClassLoader();
Properties props = null;
try {
InputStream inputStream = this.getClass().getClassLoader().getResourceAsStream(proName);
props = new Properties();
props.load(inputStream);
log.error("加載配置檔案屬性實體類初始化成功!");
} catch (IOException e) {
e.printStackTrace();
} finally{
}
return props;
}
}
代碼配置完畢。然後再在POM.XML中修改mian方法執行的入口。
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>2.4</version>
<configuration>
<archive>
<manifest>
<addClasspath>true</addClasspath>
<classpathPrefix>lib/</classpathPrefix>
<mainClass>com.**.HAMain</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
</plugins>
</build>
至此,高可用配置完畢。把項目打完jar包,放在Linux下,執行
java -jar ***-0.0.1-SNAPSHOT.jar params> 1.log & 即可執行。
也可以在指令前面增加 nohup 在背景執行。例如:nohup java -jar dtsEs-0.0.1-SNAPSHOT.jar rdsa365i83st0q3h8yi5 > 1.log &