公司datalink平台負責從各種資料源讀取資料并同步到其他的同步或者異構資料源,最近增加的HBase的reader利用到了Hbase的Replication特性。
正常情況下,我們配置HBase的Replication需要在主叢集上Hbase的shell裡進行如下一系列的配置。
登入到HMaster節點,打開Hbase的shell指令行,我們進行如下步驟:
1、打開主節點裡要複制的表列族的複制特性
disable 'order_detail'
alter 'order_detail', {NAME => 'basic_info',REPLICATION_SCOPE => '1'}
,{NAME => 'ext_info',REPLICATION_SCOPE => '1'}
enable 'order_detail'
2、主節點裡添加要複制的從節點的peer
add_peer '2','10.102.22.33:2181:/hbase'
3、設定peer下的表、列族的複制關系。
set_peer_tableCFs '2','order_detail'
思考:
這些都是在Hbase的指令行裡做的,我們的目标是通過管理平台能夠做到更友好、更友善的管理,那有沒有别的方式呢?
關于表的複制特性,我們建表的時候可以通過HBaseAdmin以代碼的方式實作,即在建表時指定REPLICATION_SCOPE為1
其實在指令行建表最終也是通過HbaseAdmin實作的。
那顯然在指令行中配置Replication一定也有對應的代碼。通過翻閱代碼和API我們找到了對應的實作類:ReplicationAdmin。
ReplicationAdmin是叢集副本操作的Admin工具類,主要職責是增加删除查詢 peer操作和設定peer同步表中繼資料TableCFs。
下面看下ReplicationAdmin的API:
方法名 | 參數 | 傳回值 | 注解 | 備注 |
---|---|---|---|---|
ReplicationAdmin | Configuration | ReplicationAdmin | 無 | 構造方法,參數Configuration為hbase叢集連結配置資訊 |
addPeer | String id String clusterKey | void | @Deprecated 已過期 | 添加副本叢集的方法,參數解析 id:peer ID,在hbase叢集中應唯一 clusterKey:副本叢集,格式:zk_address:port:<zkHbaseParent> |
addPeer | String id String clusterKey String tableCFs | void | @Deprecated 已過期 | 添加副本叢集的方法,參數解析 id:peer ID,在hbase叢集中應唯一 clusterKey:副本叢集,格式:zk_address:port:<zkHbaseParent> tableCFs:需要同步的表-列簇,格式:table_a:f1;table_a:f2;table_b:f1;table_b:f2 |
addPeer | String id ReplicationPeerConfig peerConfig Map<TableName,? extends Collection<String>> tableCfs | void | 無 | 添加副本叢集的方法,參數解析 id: peer ID,在hbase叢集中應唯一 peerConfig:副本叢集配置封裝,将副本叢集位址封裝到了peerConfig對象 tableCfs:需要同步的表-列簇,key 使用TableName對象表示,value使用 Collection<String> 實作類表示 |
removePeer | String id | void | 無 | 删除指定id的副本叢集 |
enablePeer | String id | void | 無 | 使指定副本叢集同步生效,生效後主叢集才開始以rpc方式推送資料 |
disablePeer | String id | void | 無 | 使指定副本叢集同步失效 |
getPeersCount | 無 | int | 無 | 傳回目前叢集副本叢集數量 |
listPeers | 無 | Map<String,String> | @Deprecated 已過期 | 傳回目前叢集副本集合 Key:peer ID value:副本叢集位址,格式:zk_address:port:<zkHbaseParent> |
listPeerConfigs | 無 | Map<String,ReplicationPeerConfig> | 無 | 傳回目前叢集副本集合 Key:peer ID value:ReplicationPeerConfig 副本封裝對象,内部封裝叢集位址 |
getPeerConfig | String id | ReplicationPeerConfig | 無 | 傳回指定副本叢集的叢集資訊 |
getPeerTableCFs | String id | String | 無 | 傳回指定副本叢集的TableCFs資訊,既所同步的表&表列簇。 |
setPeerTableCFs | String id String tableCFs | void | @Deprecated 已過期 | 設定指定副本叢集,從主叢集同步的表&表列簇。 tableCFs格式:table_a:f1;table_a:f2;table_b:f1;table_b:f2 |
setPeerTableCFs | String id Map<TableName,? extends Collection<String>> tableCfs | void | 無 | 設定指定副本叢集,從主叢集同步的表&表列簇。 tableCfs描述:需要同步的表-列簇,key 使用TableName對象表示,value使用 Collection<String> 實作類表示 |
getPeerState | String id | boolean | 無 | 擷取指定Peer副本叢集的,有效狀态,enable狀态傳回true,否則傳回false |
listReplicated | 無 | List<HashMap<String,String>> | 無 | 傳回目前叢集中,表-表列簇scope不為0的集合 key_1:"tableName" value_1:<tableName> key_2:"columnFamlyName" value_2:<columnName> key_3:"replicationType" value_3:<1> |
enableTableRep | final TableName tableName | void | 無 | 設定目前叢集的指定表開啟同步标記,既表的列簇全部設定scope為1; 該方法會查詢peer清單中所有的叢集,判斷peer代表的副本叢集是否擁有指定表,如果沒有指定表,則從主叢集複制表結構到該副本叢集 |
disableTableRep | final TableName tableName | void | 無 | 設定目前叢集的指定表關閉同步标記,既标的列簇全部設定scope為0 |
使用案例
add_Peer
/**
* 該用例添加叢集10.102.22.33:2181:/hbase_0.98為目前叢集的副本叢集,peer id為test_repl_01
*/
@Test
public void exampleAddPeer() {
try{
Configuration configuration = HBaseManager.getConfiguration("framework");//内部代碼,擷取配置
ReplicationAdmin replicationAdmin = new ReplicationAdmin(configuration);
ReplicationPeerConfig peerConfig = new ReplicationPeerConfig();
peerConfig.setClusterKey("10.102.22.33:5181:/hbase_0.98");
replicationAdmin.addPeer("test_repl_01",peerConfig,null);
} catch (IOException e) {
e.printStackTrace();
} catch (ReplicationException e) {
e.printStackTrace();
}
}
setPeerTableCfs
/**
* 這段代碼表示副本叢集peer:"test_repl_01" 需要同步表example_table_one的列簇f1,f2
* 設定後,隻要主叢集的example_table_one表的f1,f2列簇的scope為1,peer:"test_repl_01"代表的叢集即可同步資料
*/
@Test
public void exampleSetPeer() {
try {
Configuration configuration = HBaseManager.getConfiguration("framework");//内部代碼,擷取配置
ReplicationAdmin replicationAdmin = new ReplicationAdmin(configuration);
Map<TableName,List<String>> cfs = new HashMap<TableName,List<String>>();
TableName tableName = TableName.valueOf("example_table_one");
//key:tableName,value list;表示 table<example_table_one> 的列簇f1,f2 需要同步
cfs.put(tableName,Arrays.asList("f1","f2"));
//test_repl_01 為副本叢集的唯一id
replicationAdmin.setPeerTableCFs("test_repl_01",cfs);
} catch (IOException e) {
e.printStackTrace();
} catch (ReplicationException e) {
e.printStackTrace();
}
}