天天看點

Hadoop-Day7_sqoop_storm

Hadoop-Day7_sqoop_storm
2015年8月22日
傳智.司馬炎
Hadoop-Day7_sqoop_storm

Hadoop Day7

1.Sqoop

Ø Sqoop是什麼?(****了解*****)

Sqoop是一款開源的工具,主要用于在HADOOP(Hive)與傳統的資料庫(mysql、 postgresql...)間進行資料的傳遞,可以将一個關系型資料庫(例如 : MySQL ,Oracle ,Postgres等)中的資料導進到Hadoop的HDFS中,也可以将HDFS的資料導進到關系型資料庫中。

Sqoop項目開始于2009年,最早是作為Hadoop的一個第三方子產品存在,後來為了讓使用者能夠快速部署,也為了讓開發人員能夠更快速的疊代開發,Sqoop獨立成為一個Apache項目。

簡單來說:sqoop是一款資料遷移工具。

官方網址:http://sqoop.apache.org/

下載下傳:http://archive.apache.org/dist/sqoop/

Ø Sqoop安裝:(要有hadoop環境)

1.上傳源碼 sqoop-1.4.4.bin__hadoop-2.0.4-alpha.tar.gz,并解壓

# tar -zxvf sqoop-1.4.4.bin__hadoop-2.0.4-alpha.tar.gz -C /itcast/

2.安裝和配置

2.1在/etc/profile添加sqoop到環境變量

export SQOOP_HOME=/itcast/sqoop-1.4.4.bin__hadoop-2.0.4-alpha

export PATH=$PATH:$SQOOP_HOME/bin

2.2讓配置生效

source /etc/profile

3.将資料庫連接配接驅動拷貝到$SQOOP_HOME/lib裡

Ø 測試Sqooq使用(*****會用*****)

第一類:資料庫中的資料導入到HDFS上

# sqoop import --connect jdbc:mysql://192.168.0.104:3306/mysql --username root --password itcast --table help_category --target-dir '/sqoop/td'

指定輸出路徑、指定資料分隔符

# sqoop import --connect jdbc:mysql://192.168.0.104:3306/mysql --username root --password itcast --table help_category --target-dir '/sqoop/td' --fields-terminated-by '\t'

指定Map數量 -m

#sqoop import --connect jdbc:mysql://192.168.0.104:3306/mysql --username root --password itcast --table help_category --target-dir '/sqoop/td1' --fields-terminated-by '\t' -m 1

增加where條件, 注意:條件必須用引号引起來

#sqoop import --connect jdbc:mysql://192.168.0.104:3306/mysql --username root --password itcast --table help_category --where 'help_category_id>3' --target-dir '/sqoop/td2' --fields-terminated-by '\t' -m 1

增加query語句(使用 \ 将語句換行)

sqoop import --connect jdbc:mysql://192.168.0.104:3306/mysql --username root --password itcast \

--query 'SELECT * FROM help_category where help_category_id > 2 AND $CONDITIONS' --split-by help_category.help_category_id --target-dir '/sqoop/td3'

注意:如果使用--query這個指令的時候,需要注意的是where後面的參數,AND $CONDITIONS這個參數必須加上

而且存在單引号與雙引号的差別,如果--query後面使用的是雙引号,那麼需要在$CONDITIONS前加上\即\$CONDITIONS

如果設定map數量為1個時即-m 1,不用加上--split-by ${tablename.column},否則需要加上

第二類:将HDFS上的資料導出到資料庫中

sqoop export --connect jdbc:mysql://192.168.0.104:3306/mytest --username root --password itcast --export-dir '/td3' --table td_bak -m 1 --fields-termianted-by '\t'

注意:以上測試要配置mysql遠端連接配接

GRANT ALL PRIVILEGES ON mytest.* TO 'root'@'192.168.0.104' IDENTIFIED BY 'itcast' WITH GRANT OPTION;

FLUSH PRIVILEGES;

GRANT ALL PRIVILEGES ON *.* TO 'root'@'%' IDENTIFIED BY 'itcast' WITH GRANT OPTION;

FLUSH PRIVILEGES

2.Storm

Ø Storm簡介:(****了解****)

官方網址:http://storm.apache.org/

Apache Storm是一個免費、開源的分布式實時計算系統。使用它可以輕松實作資料流的實時處理和Hadoop的批處理。Strom很簡單,可以用任何程式設計語言!

Storm用例:實時線上分析,機器學習,連續計算,分布式RPC,ETL等。

Strom特點:

快速:基準時鐘在超過一百萬元組每秒處理的每個節點。

簡易的設定:有可擴充性、容錯性,保證了資料的處理能力,并且易于設定和操作。

Ø Storm的發展(***了解****)

流式計算的曆史

早在7、8年前諸如UC伯克利、斯坦福等大學就開始了對流式資料處理的研究,但是由于更多的關注于金融行業的業務場景或者網際網路流量監控的業務場 景,以及當時網際網路資料場景的限制,造成了研究多是基于對傳統資料庫處理的流式化,對流式架構本身的研究偏少。目前這樣的研究逐漸沒有了聲音,工業界更多 的精力轉向了實時資料庫。

2010年Yahoo!對S4的開源,2011年twitter對Storm的開源,改變了這個情況。以前網際網路的開發人員在做一個實時應用的時 候,除了要關注應用邏輯計算處理本身,還要為了資料的實時流轉、互動、分布大傷腦筋。但是現在情況卻大為不同,以Storm為例,開發人員可以快速的搭建 一套健壯、易用的實時流處理架構,配合SQL産品或者NoSQL産品或者MapReduce計算平台,就可以低成本的做出很多以前很難想象的實時産品:比 如一淘資料部的量子恒道品牌旗下的多個産品就是建構在實時流處理平台上的。

流式計算的最新進展

在資料處理時間和方式上,Storm與Hadoop MapReduce基本上是兩個對立面,而這兩個技術具備整合可能性極大程度該歸結于 YARN這個叢集管理層。Hortonworks當下正在緻力于通過新型處理架構Tez 來 提高Hive的速度,同時YARN還允許Hadoop使用者 運作Spark記憶體處理架構。同時, 微軟也在使用YARN讓Hadoop更加适合機器學習用例。

  此外,通過YARN,同叢集上同時運作HBase、 Giraph等不同技術也成為可能。此外,叢集管理技術Mesos(加州大學伯克利分校出品,現已成為Apache項目) 同樣支援了類似YARN功能,盡管其不是像YARN這樣與HDFS捆綁。

更多技術的整合預示Hadoop這個大資料處理平台絕不是昙花一現,同時也會讓Hadoop在大資料應用程式領域獲得更高的統治力。

Ø 體驗一下實時計算:(****了解****)
Hadoop-Day7_sqoop_storm

解壓apache-storm-0.9.2-incubating.tar.gz

到解壓目錄lib下,把所有jar導入工程。

編寫工程代碼:(或見工程源碼)

WordCountTopo.java

package cn.itcast.storm;

import backtype.storm.Config;

import backtype.storm.LocalCluster;

import backtype.storm.topology.TopologyBuilder;

import cn.itcast.storm.bolt.WordCounter;

import cn.itcast.storm.bolt.WordSpliter;

import cn.itcast.storm.spout.WordReader;

public class WordCountTopo {

/**

* Storm word count demo

*

* @param args

*/

public static void main(String[] args) {

if (args.length != 2) {

System.err.println("Usage: inputPaht timeOffset");

System.err.println("such as : java -jar WordCount.jar D://input/ 2");

System.exit(2);

}

TopologyBuilder builder = new TopologyBuilder();

builder.setSpout("word-reader", new WordReader());

builder.setBolt("word-spilter", new WordSpliter()).shuffleGrouping("word-reader");

builder.setBolt("word-counter", new WordCounter()).shuffleGrouping("word-spilter");

String inputPaht = args[0];

String timeOffset = args[1];

Config conf = new Config();

conf.put("INPUT_PATH", inputPaht);

conf.put("TIME_OFFSET", timeOffset);

conf.setDebug(false);

LocalCluster cluster = new LocalCluster();

cluster.submitTopology("WordCount", conf, builder.createTopology());

WordCounter.java

package cn.itcast.storm.bolt;

import java.util.HashMap;

import java.util.Map;

import java.util.Map.Entry;

import backtype.storm.task.TopologyContext;

import backtype.storm.topology.BasicOutputCollector;

import backtype.storm.topology.OutputFieldsDeclarer;

import backtype.storm.topology.base.BaseBasicBolt;

import backtype.storm.tuple.Tuple;

public class WordCounter extends BaseBasicBolt {

private static final long serialVersionUID = 5683648523524179434L;

private HashMap<String, Integer> counters = new HashMap<String, Integer>();

@Override

@SuppressWarnings("rawtypes")

public void prepare(Map stormConf, TopologyContext context) {

final long timeOffset = Long.parseLong(stormConf.get("TIME_OFFSET").toString());

new Thread(new Runnable() {

public void run() {

while (true) {

for (Entry<String, Integer> entry : counters.entrySet()) {

System.out.println(entry.getKey() + " : " + entry.getValue());

System.out.println("---------------------------------------");

try {

Thread.sleep(timeOffset * 1000);

} catch (InterruptedException e) {

e.printStackTrace();

}).start();

public void execute(Tuple input, BasicOutputCollector collector) {

String str = input.getString(0);

if (!counters.containsKey(str)) {

counters.put(str, 1);

} else {

Integer c = counters.get(str) + 1;

counters.put(str, c);

public void declareOutputFields(OutputFieldsDeclarer declarer) {

WordSpliter.java

import org.apache.commons.lang.StringUtils;

import backtype.storm.tuple.Fields;

import backtype.storm.tuple.Values;

public class WordSpliter extends BaseBasicBolt {

private static final long serialVersionUID = -5653803832498574866L;

String line = input.getString(0);

String[] words = line.split(" ");

for (String word : words) {

word = word.trim();

if (StringUtils.isNotBlank(word)) {

word = word.toLowerCase();

collector.emit(new Values(word));

declarer.declare(new Fields("word"));

WordReader.java

package cn.itcast.storm.spout;

import java.io.File;

import java.io.IOException;

import java.util.Collection;

import java.util.List;

import org.apache.commons.io.FileUtils;

import org.apache.commons.io.filefilter.FileFilterUtils;

import backtype.storm.spout.SpoutOutputCollector;

import backtype.storm.topology.base.BaseRichSpout;

public class WordReader extends BaseRichSpout {

private static final long serialVersionUID = 2197521792014017918L;

private String inputPath;

private SpoutOutputCollector collector;

public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {

this.collector = collector;

inputPath = (String) conf.get("INPUT_PATH");

public void nextTuple() {

Collection<File> files = FileUtils.listFiles(new File(inputPath), FileFilterUtils.notFileFilter(FileFilterUtils.suffixFileFilter(".bak")), null);

for (File f : files) {

List<String> lines = FileUtils.readLines(f, "UTF-8");

for (String line : lines) {

collector.emit(new Values(line));

FileUtils.moveFile(f, new File(f.getPath() + System.currentTimeMillis() + ".bak"));

} catch (IOException e) {

declarer.declare(new Fields("line"));

Ø Storm與Hadoop的對比(****了解****)

Topology 與 Mapreduce

一個關鍵的差別是: 一個MapReduce job最終會結束, 而一個topology永遠會運作(除非你手動kill掉)

Nimbus 與 ResourManager

在Storm的叢集裡面有兩種節點: 控制節點(master node)和工作節點(worker node)。控制節點上面運作一個叫Nimbus背景程式,它的作用類似Hadoop裡面的JobTracker。Nimbus負責在叢集裡面分發代碼,配置設定計算任務給機器, 并且監控狀态。

Supervisor (worker程序)與NodeManager(YarnChild)

每一個工作節點上面運作一個叫做Supervisor的節點。Supervisor會監聽配置設定給它那台機器的工作,根據需要啟動/關閉工作程序。每一個工作程序執行一個topology的一個子集;一個運作的topology由運作在很多機器上的很多工作程序組成。

Ø Storm的基本概念(****了解****)

在深入了解Storm之前,需要了解一些概念:

Topologies : 拓撲,也俗稱一個任務

Spouts : 拓撲的消息源

Bolts : 拓撲的處理邏輯單元

tuple:消息元組

Streams : 流

Stream groupings :流的分組政策

Tasks : 任務處理單元

Executor :工作線程

Workers :工作程序

Configuration : topology的配置

圖見資料-->淘寶資料月光寶盒雙11項目:

Hadoop-Day7_sqoop_storm
Ø Storm架構體系:(****了解*****)
Hadoop-Day7_sqoop_storm

Topologies 邏輯單元:Spouts 與 Bolts 消息

Hadoop-Day7_sqoop_storm
Hadoop-Day7_sqoop_storm

Strom叢集:

Hadoop-Day7_sqoop_storm
Hadoop-Day7_sqoop_storm
Hadoop-Day7_sqoop_storm
Ø Storm叢集部署(***必須掌握****)

1、安裝一個zookeeper叢集

2、上傳storm的安裝包,解壓

3、修改配置檔案storm.yaml

#所使用的zookeeper叢集主機

storm.zookeeper.servers:

- "zookeeperServer1"

- "zookeeperServer2"

- "zookeeperServer3"

#nimbus所在的主機名

nimbus.host: "zookeeperServer1"

//可以指定worker數量,如果不指定它根據你機器配置自動啟動

supervisor.slots.ports

-6701

-6702

-6703

-6704

-6705

4.将配置好的storm環境複制到zookeeperServer2,zookeeperServer3

# scp -r /itcast/apache-storm-0.9.2-incubating zookeeperServer2:/itcast

# scp -r /itcast/apache-storm-0.9.2-incubating zookeeperServer3:/itcast

啟動storm

# cd /itcast/apache-storm-0.9.2-incubating/bin

在nimbus主機上

//啟動協調管理nimbus

./storm nimbus 1>/dev/null 2>&1 &

//啟動web管理界面 啟動後可以通過nimbus主機名:8080端口進行仿問

./storm ui 1>/dev/null 2>&1 &

在supervisor主機上

./storm supervisor 1>/dev/null 2>&1 &

Ø 電信實時統計例子:(****擴充****)
Hadoop-Day7_sqoop_storm

RandomWordSpout.java

package cn.itcast.stormdemo;

import java.util.Random;

import backtype.storm.utils.Utils;

public class RandomWordSpout extends BaseRichSpout{

//模拟一些資料

String[] words = {"iphone","xiaomi","mate","sony","sumsung","moto","meizu"};

//不斷地往下一個元件發送tuple消息

//這裡面是該spout元件的核心邏輯

//可以從kafka消息隊列中拿到資料,簡便起見,我們從words數組中随機挑 //選一個商品名發送出去

Random random = new Random();

int index = random.nextInt(words.length);

//通過随機數拿到一個商品名

String godName = words[index];

//将商品名封裝成tuple,發送消息給下一個元件

collector.emit(new Values(godName));

//每發送一個消息,休眠500ms

Utils.sleep(500);

//初始化方法,在spout元件執行個體化時調用一次

//聲明本spout元件發送出去的tuple中的資料的字段名

declarer.declare(new Fields("orignname"));

SuffixBolt.java

import java.io.FileWriter;

import java.util.UUID;

public class SuffixBolt extends BaseBasicBolt{

FileWriter fileWriter = null;

//在bolt元件運作過程中隻會被調用一次

fileWriter = new FileWriter("/root/stormoutput/"+UUID.randomUUID());

throw new RuntimeException(e);

//該bolt元件的核心處理邏輯

//每收到一個tuple消息,就會被調用一次

public void execute(Tuple tuple, BasicOutputCollector collector) {

//先拿到上一個元件發送過來的商品名稱

String upper_name = tuple.getString(0);

String suffix_name = upper_name + "_itisok";

//為上一個元件發送過來的商品名稱添加字尾

fileWriter.write(suffix_name);

fileWriter.write("\n");

fileWriter.flush();

//本bolt已經不需要發送tuple消息到下一個元件,是以不需要再聲明tuple //的字段

public void declareOutputFields(OutputFieldsDeclarer arg0) {

UpperBolt.java

public class UpperBolt extends BaseBasicBolt{

//業務處理邏輯

//先擷取到上一個元件傳遞過來的資料,資料在tuple裡面

String godName = tuple.getString(0);

//将商品名轉換成大寫

String godName_upper = godName.toUpperCase();

//将轉換完成的商品名發送出去

collector.emit(new Values(godName_upper));

//聲明該bolt元件要發出去的tuple的字段

declarer.declare(new Fields("uppername"));

TopoMain.java

import backtype.storm.StormSubmitter;

import backtype.storm.generated.StormTopology;

* 組織各個處理元件形成一個完整的處理流程,就是所謂的topology(類似于 mapreduce程式中的job)

* 并且将該topology送出給storm叢集去運作,topology送出到叢集後就将永無休止地運作,除非人為或者異常退出

public class TopoMain {

public static void main(String[] args) throws Exception {

//将我們的spout元件設定到topology中去

//parallelism_hint :4 表示用4個excutor來執行這個元件

//setNumTasks(8) 設定的是該元件執行時的并發task數量,也就意味着1個excutor會運作2個task

builder.setSpout("randomspout", new RandomWordSpout(), 4).setNumTasks(8);

//将大寫轉換bolt元件設定到topology,并且指定它接收randomspout元件的消息

//.shuffleGrouping("randomspout")包含兩層含義:

//1、upperbolt元件接收的tuple消息一定來自于randomspout元件

//2、randomspout元件和upperbolt元件的大量并發task執行個體之間收發消息時采用的分組政策是随機分組shuffleGrouping

builder.setBolt("upperbolt", new UpperBolt(), 4).shuffleGrouping("randomspout");

//将添加字尾的bolt元件設定到topology,并且指定它接收upperbolt元件的消息

builder.setBolt("suffixbolt", new SuffixBolt(), 4).shuffleGrouping("upperbolt");

//用builder來建立一個topology

StormTopology demotop = builder.createTopology();

//配置一些topology在叢集中運作時的參數

//這裡設定的是整個demotop所占用的槽位數,也就是worker的數量

conf.setNumWorkers(4);

conf.setDebug(true);

conf.setNumAckers(0);

//将這個topology送出給storm叢集運作

StormSubmitter.submitTopology("demotopo", conf, demotop);

把工程打成jar包送出上去運作。

送出的Topologies運作格式如下:

指令格式:storm jar 【jar路徑】 【拓撲包名.拓撲類名】【stormIP位址】【storm端口】【拓撲名稱】【參數】

如:

storm jar /home/storm/storm-starter.jar storm.starter.WordCountTopology wordcountTop;

#送出storm-starter.jar到遠端叢集,并啟動wordcountTop拓撲。

Ø Storm 常用指令(****記住****)

1、啟動nimbus背景程式

指令格式:storm nimbus

2、啟動supervisor背景程式

指令格式:storm supervisor

3、啟動ui服務

指令格式:storm ui

4、送出Topologies

eg:

5、停止Topologies

檢視目前運作的topo: storm list

指令格式:storm kill 【拓撲名稱】

樣例:storm kill wordcountTop

#殺掉wordcountTop拓撲。

Ø Storm 相關配置項(***知道****)

在storm.yaml中常用的幾個選項

storm.zookeeper.root

Storm在zookeeper叢集中的根目錄,預設是“/”

topology.workers

每個Topology運作時的worker的預設數目,若在代碼中設定,則此選項值被覆寫

storm.zookeeper.servers

zookeeper叢集的節點清單

storm.local.dir

Storm用于存儲jar包和臨時檔案的本地存儲目錄

ui.port

Storm叢集的UI位址端口号,預設是8080

nimbus.host:

Nimbus節點的host

Supervisor節點的worker占位槽,叢集中的所有Topology公用這些槽位數,即使送出時設定了較大數值的槽位數,系統也會按照目前叢集中實際剩餘的槽位數來進行配置設定,當所有的槽位數都配置設定完時,新送出的Topology隻能等待,系統會一直監測是否有空餘的槽位空出來,如果有,就再次給新送出的Topology配置設定。

Ø Storm程式設計接口(****了解****)

l Spouts

Spout是Stream的消息産生源, Spout元件的實作可以通過繼承BaseRichSpout類或者其他*Spout類來完成,也可以通過實作IRichSpout接口來實作。

需要根據情況實作Spout類中重要的幾個方法有:

Hadoop-Day7_sqoop_storm

open方法

當一個Task被初始化的時候會調用此open方法。一般都會在此方法中對發送Tuple的對象SpoutOutputCollector和配置對象TopologyContext初始化。

示例如下:

1 public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {

this._collector = collector;

getComponentConfiguration方法

此方法用于聲明針對目前元件的特殊的Configuration配置。

public Map<String, Object> getComponentConfiguration() {

if(!_isDistributed) {

Map<String, Object> ret = new HashMap<String, Object>();

ret.put(Config.TOPOLOGY_MAX_TASK_PARALLELISM, 3);

return ret;10 11

return null;

這裡便是設定了Topology中目前Component的線程數量上限。

nextTuple方法

這是Spout類中最重要的一個方法。發射一個Tuple到Topology都是通過這個方法來實作的。

Utils.sleep(100);

final String[] words = new String[] {"twitter","facebook","google"};

final Random rand = new Random();

final String word = words[rand.nextInt(words.length)];

_collector.emit(new Values(word));

這裡便是從一個數組中随機選取一個單詞作為Tuple,然後通過_collector發送到Topology。

declareOutputFields方法

此方法用于聲明目前Spout的Tuple發送流。

Stream流的定義是通過OutputFieldsDeclare.declare方法完成的,其中的參數包括了發送的域Fields。

另外,除了上述幾個方法之外,還有ack、fail和close方法等;

Storm在監測到一個Tuple被成功處理之後會調用ack方法,處理失敗會調用fail方法;

這兩個方法在BaseRichSpout等類中已經被隐式的實作了。

l Bolts

    Bolt類接收由Spout或者其他上遊Bolt類發來的Tuple,對其進行處理。Bolt元件的實作可以通過繼承BasicRichBolt類或者IRichBolt接口來完成。

    Bolt類需要實作的主要方法有:

Hadoop-Day7_sqoop_storm

prepare方法

此方法和Spout中的open方法類似,為Bolt提供了OutputCollector,用來從Bolt中發送Tuple。Bolt中Tuple的發送可以在prepare方法中、execute方法中、cleanup等方法中進行,一般都是些在execute中。

public void prepare(Map conf, TopologyContext context, OutputCollector collector) {

this. _collector = collector;

和Spout類一樣,在Bolt中也可以有getComponentConfiguration方法。

Map<String, Object> conf = new HashMap<String, Object>();

conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS,

emitFrequencyInSeconds);

return conf;

此例定義了從系統元件“_system”的“_tick”流中發送Tuple到目前Bolt的頻率,當系統需要每隔一段時間執行特定的處理時,就可以利用這個系統的元件的特性來完成。

execute方法

這是Bolt中最關鍵的一個方法,對于Tuple的處理都可以放到此方法中進行。具體的發送也是在execute中通過調用emit方法來完成的。

有兩種情況,一種是emit方法中有兩個參數,另一個種是有一個參數。

(1)emit有一個參數:此唯一的參數是發送到下遊Bolt的Tuple,此時,由上遊發來的舊的Tuple在此隔斷,新的Tuple和舊的Tuple不再屬于同一棵Tuple樹。新的Tuple另起一個新的Tuple樹。

(2)emit有兩個參數:第一個參數是舊的Tuple的輸入流,第二個參數是發往下遊Bolt的新的Tuple流。此時,新的Tuple和舊的Tuple是仍然屬于同一棵Tuple樹,即,如果下遊的Bolt處理Tuple失敗,則會向上傳遞到目前Bolt,目前Bolt根據舊的Tuple流繼續往上遊傳遞,申請重發失敗的Tuple。保證Tuple處理的可靠性。

用于聲明目前Bolt發送的Tuple中包含的字段,和Spout中類似。

declarer.declare(new Fields("obj", "count", "actualWindowLengthInSeconds"));

此例說明目前Bolt類發送的Tuple包含了三個字段:"obj", "count", "actualWindowLengthInSeconds"。

l Topology

Hadoop-Day7_sqoop_storm
Ø Topology運作機制(***了解****)

(1)Storm送出後,會把代碼首先存放到Nimbus節點的inbox目錄下,之後,會把目前Storm運作的配置生成一個stormconf.ser檔案放到Nimbus節點的stormdist目錄中,在此目錄中同時還有序列化之後的Topology代碼檔案;

(2)在設定Topology所關聯的Spouts和Bolts時,可以同時設定目前Spout和Bolt的executor數目和task數目,預設情況下,一個Topology的task的總和是和executor的總和一緻的。之後,系統根據worker的數目,盡量平均的配置設定這些task的執行。worker在哪個supervisor節點上運作是由storm本身決定的;

(3)任務配置設定好之後,Nimbes節點會将任務的資訊送出到zookeeper叢集,同時在zookeeper叢集中會有workerbeats節點,這裡存儲了目前Topology的所有worker程序的心跳資訊;

(4)Supervisor節點會不斷的輪詢zookeeper叢集,在zookeeper的assignments節點中儲存了所有Topology的任務配置設定資訊、代碼存儲目錄、任務之間的關聯關系等,Supervisor通過輪詢此節點的内容,來領取自己的任務,啟動worker程序運作;

(5)一個Topology運作之後,就會不斷的通過Spouts來發送Stream流,通過Bolts來不斷的處理接收到的Stream流,Stream流是無界的。

最後一步會不間斷的執行,除非手動結束Topology。

   Topology中的Stream處理時的方法調用過程如下:

Hadoop-Day7_sqoop_storm

有幾點需要說明的地方:

   (1)每個元件(Spout或者Bolt)的構造方法和declareOutputFields方法都隻被調用一次。

   (2)open方法、prepare方法的調用是多次的。入口函數中設定的setSpout或者setBolt裡的并行度參數指的是executor的數目,是負責運作元件中的task的線程 的數目,此數目是多少,上述的兩個方法就會被調用多少次,在每個executor運作的時候調用一次。相當于一個線程的構造方法。

   (3)nextTuple方法、execute方法是一直被運作的,nextTuple方法不斷的發射Tuple,Bolt的execute不斷的接收Tuple進行處理。隻有這樣不斷地運作,才會産生無界的Tuple流,展現實時性。相當于線程的run方法。

   (4)在送出了一個topology之後,Storm就會建立spout/bolt執行個體并進行序列化。之後,将序列化的component發送給所有的任務所在的機器(即Supervisor節 點),在每一個任務上反序列化component。

   (5)Spout和Bolt之間、Bolt和Bolt之間的通信,是通過zeroMQ的消息隊列實作的。

   (6)上圖沒有列出ack方法和fail方法,在一個Tuple被成功處理之後,需要調用ack方法來标記成功,否則調用fail方法标記失敗,重新處理這個Tuple。

終止Topology

通過在Nimbus節點利用如下指令來終止一個Topology的運作:

bin/storm kill topologyName

kill之後,可以通過UI界面檢視topology狀态,會首先變成KILLED狀态,在清理完本地目錄和zookeeper叢集中的和目前Topology相關的資訊之後,此Topology就會徹底消失

3.Kafka

官方網址:https://kafka.apache.org/

源碼下載下傳:http://archive.apache.org/dist/kafka/

Ø Kafka簡介:(****了解***)

Kafka是一個分布式的消息緩存系統,用于日志處理的分布式消息隊列。日志資料容量大,但對可靠性要求不高,其日志資料主要包括使用者行為(登入、浏覽、點選、分享、喜歡)以及系統運作日志(CPU、記憶體、磁盤、網絡、系統及程序狀态)。

目前很多的消息隊列服務提供可靠傳遞保證,并預設是即時消費(不适合離線)。高可靠傳遞對日志不是必須的,故可通過降低可靠性來提高性能,同時通過建構分布式的叢集,允許消息在系統中累積,使得kafka同時支援離線和線上日志處理。

Ø Kafka架構(***了解***)
Hadoop-Day7_sqoop_storm
Hadoop-Day7_sqoop_storm
Hadoop-Day7_sqoop_storm

l kafka叢集中的伺服器都叫做broker

l kafka有兩類用戶端,一類叫producer(消息生産者),一類叫做consumer(消息消費者),用戶端和broker伺服器之間采用tcp協定連接配接

l kafka中不同業務系統的消息可以通過topic進行區分,而且每一個消息topic都會被分區,以分擔消息讀寫的負載

l 每一個分區都可以有多個副本,以防止資料的丢失

l 某一個分區中的資料如果需要更新,都必須通過該分區所有副本中的leader來更新

l 消費者可以分組,比如有兩個消費者組A和B,共同消費一個topic:order_info,A和B所消費的消息不會重複

比如 order_info 中有100個消息,每個消息有一個id,編号從0-99,那麼,如果A組消費0-49号,B組就消費50-99号

l 消費者在具體消費某個topic中的消息時,可以指定起始偏移量

Ø 叢集安裝(***實踐***)

1、上傳kafka安裝檔案到zookeeperServer1進行解壓

# mkdir /itcast/kafka_2.10-0.8.1.1/kafka-logs

2、修改server.properties

broker.id=0

zookeeper.connect=zookeeperServer1:2181,zookeeperServer2:2181,zookeeperServer3:2181

log.dirs=/itcast/kafka_2.10-0.8.1.1/kafka-logs

修改kafka-run-class.sh

把113行的“-XX:+UseCompressedOops”去掉。

3、将zookeeperServer1的kafka複制到其它機器(zookeeperServer2,zookeeperServer3)

# scp -r /itcast/kafka_2.10-0.8.1.1 zookeeperServer2:/itcast/

# vim /itcast/kafka_2.10-0.8.1.1/config/server.properties

broker.id=1 //修改broder.id

# scp -r /itcast/kafka_2.10-0.8.1.1 zookeeperServer3:/itcast/

broker.id=2 //修改broder.id

3、将zookeeper叢集啟動

4、在每一台節點上啟動broker

bin/kafka-server-start.sh config/server.properties

或在背景啟動

bin/kafka-server-start.sh config/server.properties 1>/dev/null 2>&1 &

5、在kafka叢集中建立一個topic

bin/kafka-topics.sh --create --zookeeper zookeeperServer1:2181 --replication-factor 3 --partitions 1 --topic order

6、用一個producer向某一個topic中寫入消息

bin/kafka-console-producer.sh --broker-list zookeeperServer1:9092 --topic order

7、用一個comsumer從某一個topic中讀取資訊

bin/kafka-console-consumer.sh --zookeeper zookeeperServer1:2181 --from-beginning --topic order

8、檢視一個topic的分區及副本狀态資訊

bin/kafka-topics.sh --describe --zookeeper zookeeperServer1:2181 --topic order

Ø 發送業務消息用戶端編寫(***實踐***)

Java工程中導入${KAFKA_HOME}/lib下所有jar包

ProducerDemo.java

package cn.itcast.kafka;

import java.util.Properties;

import kafka.javaapi.producer.Producer;

import kafka.producer.KeyedMessage;

import kafka.producer.ProducerConfig;

public class ProducerDemo {

Properties props = new Properties();

props.put("zk.connect", "zookeeperServer1:2181,zookeeperServer2:2181,zookeeperServer3:2181");

props.put("metadata.broker.list","zookeeperServer1:9092,zookeeperServer2:9092,zookeeperServer3:9092");

props.put("serializer.class", "kafka.serializer.StringEncoder");

ProducerConfig config = new ProducerConfig(props);

Producer<String, String> producer = new Producer<String, String>(config);

// 發送業務消息

// 讀取檔案 讀取記憶體資料庫 讀socket端口

for (int i = 1; i <= 100; i++) {

Thread.sleep(500);

producer.send(new KeyedMessage<String, String>("order",

"我是第" + i + "次來買東本西!"));

Ø 接收業務消息用戶端編寫(***實踐***)

ConsumerDemo.java

import kafka.consumer.Consumer;

import kafka.consumer.ConsumerConfig;

import kafka.consumer.ConsumerIterator;

import kafka.consumer.KafkaStream;

import kafka.javaapi.consumer.ConsumerConnector;

import kafka.message.MessageAndMetadata;

public class ConsumerDemo {

private static final String topic = "order";

private static final Integer threads = 1;

props.put("zookeeper.connect", "zookeeperServer1:2181,zookeeperServer2:2181,zookeeperServer3:2181");

props.put("group.id", "1111");

props.put("auto.offset.reset", "smallest");

ConsumerConfig config = new ConsumerConfig(props);

ConsumerConnector consumer =Consumer.createJavaConsumerConnector(config);

Map<String, Integer> topicCountMap = new HashMap<String, Integer>();

topicCountMap.put(topic, threads);

topicCountMap.put("order", 1);

Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);

List<KafkaStream<byte[], byte[]>> streams = consumerMap.get("order");

for(final KafkaStream<byte[], byte[]> kafkaStream : streams){

for(MessageAndMetadata<byte[], byte[]> mm : kafkaStream){

String msg = new String(mm.message());

System.out.println(msg);

歲月裡,寒暑交替。人世間,北來南往。銘心的,雲煙的。都付往事,不念,不問。

繼續閱讀