天天看點

storm學習筆記(二)——Storm元件詳解之Tuple、SpoutTuple元組Spout資料源

目錄

Tuple元組

結構

生命周期

Spout資料源

結構

開發spout元件

Storm的核心概念包括:Stream、Spout、Bolt、Tuple、Task、Worker、Stream Grouping、Topology

Stream是被處理的資料,Spout是資料源,Bolt是處理資料的容器,Tuple是資料單元,Task是運作Spout和Bolt中的線程,Worker是運作這些線程的程序,Stream Grouping規定了Bolt接受何種類型的資料最為輸入,Topology是由Stream Grouping連接配接起來的Spout和Bolt節點網絡。

Tuple元組

結構

Tuple是Storm的主要資料結構,是Storm中使用的最基本單元、資料模型和元組。

Tuple就是一個值清單,Tuple中的值可以是任何類型的,動态類型的Tuple的fields可以不用聲明。

預設情況下,Storm中的Tuple支援私有類型、字元串、位元組數組等作為他的字段值。

Tuple的字段預設類型有:integer、float、double、long、short、string、byte、binary(byte[ ])。

資料結構如下圖:可以了解成一個鍵值對類型的資料結構。

storm學習筆記(二)——Storm元件詳解之Tuple、SpoutTuple元組Spout資料源

生命周期

下段java代碼展示了Spout(消息源)接口發出Tuple(消息)的整個過程,源碼如下:

public interface ISpout extends Serializable{
    void open(Map conf,TopologyContext context,SpoutOutputCollector collector);
    void nextTuple();
    void ack(Object msgId);
    void fail(Object msgId);
    void close();
}
           

首先,Storm調用Spout(消息源)的nextTuple方法來擷取下一個Tuple,Spout通過Open方法的參數提供的SpoutOutputCollector将新Tuple發射到其中一個輸出消息流。發射Tuple時,Spout提供一個message-id,通過這個ID來追蹤該Tuple。然後,Storm跟蹤該Tuple的樹形結構是否成功建立,從根據message-id調用Spout中的ack函數,以确認Tuple是否被完全處理。如果Tuple逾時,則調用Spout的fail方法。由此看出,同一個Tuple不管是acked還是failed都是由建立它的Spout發出并維護的,是以Storm會利用内部的Acker機制保證每個Tuple被可靠地處理。最後,在任務完成後,Spout調用Close方法結束Tuple的使命。

Spout資料源

結構

資料源(消息源)Spout是Storm的Topology中的消息生産者(Tuple的創造者),最源頭的接口是IComponent,如下圖所示,幾個Spout接口都繼承自IComponent。

storm學習筆記(二)——Storm元件詳解之Tuple、SpoutTuple元組Spout資料源

Spout從外部擷取資料後,向Topology中發出的Tuple可以是可靠的,也可以是不可靠的。一個可靠的消息源可以重新發射一個Tuple(如果該Tuple沒有被Storm成功處理),但是一個不可靠的消息源,Spout一旦發出一個Tuple就把它徹底“遺忘”,也就不可能再發了。

Spout可以發射多個流。要達到這樣的效果,使用OutputFieldsDeclarer.declareStream來定義多個流(定義多個Stream),然後使用SpoutOutputCollector來發射指定的流。

Spout的最頂層抽象是ISpout接口,在通常情況下(Shell和事務型的除外),實作一個Spout,可以直接實作接口IRichSpout,如果不想寫多餘代碼,可以直接繼承BaseRichSpout。

開發spout元件

下段代碼是開發Spout元件的一個簡單的執行個體:建立普通Java工程,導入storm依賴包到lib檔案夾下,buildpath之後即可。

package storm;

import java.util.Map;
import java.util.Random;
import java.util.stream.Collector;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
/*
 * 用于産生資料源
 * 本例中 資料源是不斷生成的一個1-100内的随機數
 */
public class NumberSpout extends BaseRichSpout {
	private SpoutOutputCollector collector;

	/*
	 * 這是Spout類中最重要的一個方法。用于發射Tuple
	 * 
	 */
	@Override
	public void nextTuple() {
		// TODO Auto-generated method stub
		while(true){
			int randomNum = new Random().nextInt(100);
			//Values可以了解為是Tuple的值,是一個集合類型,值可以是一個,也可以是多個
			Values value = new Values(randomNum);
			//emit方法用于發射元組
			collector.emit(value);
			try {
				Thread.sleep(500);
			} catch (InterruptedException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}
		
	}
	
	/*
	 * 當一個Task被初始化時會調用此open方法。
	 * 一般都會在此方法中初始化發送Tuple的對象SpoutOutputCollector和配置對象TopologyContext。
	 *
	 */
	@Override
	public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
		// 對collector進行初始化,因為nextTuple()方法利用collector發射元組
		this.collector = collector;
	}

	/*
	 * 此方法用于聲名目前spout的Tuple發送流,
	 * 流的定義是通過OutputFieldsDeclare.declareStream方法完成的
	 * 其中的參數包括了發送的域Fields。
	 */
	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		// Fields可以了解為時Tuple的鍵
		declarer.declare(new Fields("number"));
		
		
	}
	
}
           

代碼說明:從100以内的整數中随機産生一個數作為Tuple的值,然後通過_collector發送到Topology。Spout的最重要方法是nextTuple。nextTuple方法發射一個新的元組到Topology,如果沒有新元組發射,則直接傳回。

注意:任務Spout的nextTuple方法都不要實作成阻塞的,因為Storm是在相同的線程中調用Spout的方法。

此外,Spout的另外兩個重要方法是ack和fail方法,當Spout發射的元組被拓撲成功處理時,調用ack方法;當處理失敗時,調用fail方法。ack和fail方法僅可被可靠的Spout調用。