本周的工作分兩個部分
一:搭建JStorm環境(三個機器組成的叢集)
由于微軟AZure的虛拟機還沒有申請下來,我先在實驗室的環境下搭建了
1. 搭建Zookeeper叢集
a) 下載下傳Zookeeper的3.4.5版本,解壓至/xxx/xxx/zookeeper-3.4.5
b) 配置環境變量(在~/.bashrc)
export ZOOKEEPER_HOME=/xxx/xxx/zookeeper-3.4.5
export PATH=$PATH:$HOME/bin:$ZOOKEEPER_HOME/bin
export CLASSPATH=$CLASSPATH:$ZOOKEEPER_HOME/lib
c) 配置$ZOOKEEPER_HOME/conf/zoo.cfg,主要有
dataDir=/home/yangrenkai/data/zookeeper/data
clientPort=5181
server.1=blade5:2881:3881
server.2=blade7:2881:3881
server.3=blade8:2881:3881
d) 在dataDir下建立一個myid檔案,内容為1或2或3,根據server.x的x來定。
2. 安裝java1.7和python 2.6,因為JStorm是由大量的java和python編寫。
3. 安裝JStorm-0.9.3.1
a) 下載下傳JStorm-0.9.3.1版本,解壓至/xxx/xxx/jstorm-0.9.3.1
b) 配置環境變量(在~/.bashrc)
export JSTORM_HOME=/xxx/xxx/jstorm-0.9.3.1
export PATH=$PATH:$JSTORM_HOME/bin
c) 配置$JSTORM_HOME/conf/storm.yaml
storm.zookeeper.servers: 表示zookeeper 的位址
storm.zookeeper.port: 表示zookeeper的端口
nimbus.host: 表示nimbus的位址
storm.zookeeper.root: 表示jstorm在zookeeper中的根目錄,當多個JStorm共享一個ZOOKEEPER時,需要設定該選項,預設 即為“/jstorm”
storm.local.dir: 表示jstorm臨時資料存放目錄,需要保證jstorm程式對該目錄有寫權限
java.library.path: zeromq 和java zeromq library的安裝目錄,預設"/usr/local/lib:/opt/local/lib:/usr/lib"
supervisor.slots.ports: 表示supervisor 提供的端口slot清單,注意不要和其他端口發生沖突,預設是68xx,而storm的是67xx
supervisor.disk.slot: 表示提供資料目錄,當一台機器有多塊磁盤時,可以提供磁盤讀寫slot,友善有重IO操作的應用
topology.enable.classloader: false, 預設關閉classloader,如果應用的jar與jstorm的依賴的jar發生沖突,比如應用使用thrift9, 但jstorm使用thrift7時,就需要打開classloader
nimbus.groupfile.path: 如果需要做資源隔離,比如資料倉庫使用多少資源,技術部使用多少資源,無線部門使用多少資源時, 就需要打開分組功能, 設定一個配置檔案的絕對路徑,改配置檔案如源碼中group_file.ini所示
storm.local.dir: jstorm使用的本地臨時目錄,如果一台機器同時運作storm和jstorm的話, 則不要共用一個目錄,必須将二者分 離開
d) 在送出topology的節點上輸入指令
#mkdir ~/.jstorm
#cp -f $JSTORM_HOME/conf/storm.yaml ~/.jstorm
e) 先啟動zk,在啟動nimbus和supervisor,而且nimbus和supervisor最好不要在一個節點,我是1個nimbus和2個supervisor,一 個supervisor配置四個port
4. JStorm需要tomcat來展現UI,是以需要安裝tomcat
a) 下載下傳Tomcat8.0.9,解壓至/xxx/xxx/Tomcat-8.0.9
b) 運作指令:
cd /xxx/xxx/Tomcat-8.0.9/webapps/
cp $JSTORM_HOME/jstorm-ui-0.9.3.war ./
mv ROOT ROOT.old
ln -s jstorm-ui-0.9.3 ROOT
c) 啟動,/xxx/xxx/Tomcat-8.0.9/bin/startup.sh
二:寫完TopK_on_JStorm的第一個版本(項目位址)
1. 建立jstorm-topk工程
2. 整個項目提供一個簡單的topk計算流程,由并發度為1的ScoreProduceSpout提供随機數資料(id,score),并發度為4的ComputeBolt提供topk計算,并發度為1的PrintAndStoreBolt的彙總列印。
3. 建立TopKServerTopology
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new ScoreProduceSpout(), 1);
builder.setBolt("compute", new ComputeBolt(), 4).shuffleGrouping("spout");
builder.setBolt("print", new PrintAndStoreBolt(), 1).shuffleGrouping(
"compute");
4. 建立ScoreProduceSpout,繼承IRichSpout(細節在下一次周報講述)
_collector.emit(new Values(tupleId, id, score), tupleId);
其中tupleId是long類型從0遞增,id是四位的[0-9a-zA-Z]組成字元,socre是1000000内随機數。而emit方法的tupleId參數代表要和Acker進行通信,達到記錄級的不丢失。(Ack機制在其他博文講述)
5. 建立ComputeBolt,繼承IRichBolt(細節在下一次周報講述)
把原資料集分割4份,并行處理,每個task都計算自己流上的topk,即使task當機或者tuple fail,都重新進行積累計算。在excute()方法實作topK算法,比較複雜,可以看項目位址中的源代碼。
繼承IRichBolt,可以自己控制資料是ack還是繼續往下一個bolt發送(即由下一個bolt來控制ack)
6. 建立PrintAndStoreBolt,繼承IRichBolt(細節在下一次周報講述)
所有的結果在這裡彙總,excute()仍然實作和ComputeBolt類似的算法,不同的是資料量較小(由ComputeBolt過濾了)和列印(為以後的持久化/輸出)留下接口。
7. 在JStorm叢集上運作,jstorm jar topk.jar com.msopentech.jstorm.topk.topology.TopKServerTopology
已經可以完成topk的基本需求。
下周計劃
1. 能在微軟Azure上搭建叢集并運作topk算法(暫時還沒有獲得賬号,正在尋求導師幫助)。
2. topK的算法可以繼續進行提升。
3. REST API輸入的實作。
感謝CSDN開源夏令營和商之狄老師的指導與支援!