Hbase實戰之谷粒微網誌
1 需求分析
1) 微網誌内容的浏覽,資料庫表設計
2) 使用者社交展現:關注使用者,取關使用者
3) 拉取關注的人的微網誌内容
2 代碼實作
2.1 代碼設計總覽:
1) 建立命名空間以及表名的定義
2) 建立微網誌内容表
3) 建立使用者關系表
4) 建立使用者微網誌内容接收郵件表
5) 釋出微網誌内容
6) 添加關注使用者
7) 移除(取關)使用者
8) 擷取關注的人的微網誌内容
9) 測試
資料庫關系表
2.2 建立命名空間以及表名的定義
//擷取配置conf
private Configuration conf = HBaseConfiguration.create();
//微網誌内容表的表名
private static final byte[] TABLE_CONTENT = Bytes.toBytes("weibo:content");
//使用者關系表的表名
private static final byte[] TABLE_RELATIONS = Bytes.toBytes("weibo:relations");
//微網誌收件箱表的表名
private static final byte[] TABLE_RECEIVE_CONTENT_EMAIL = Bytes.toBytes("weibo:receive_content_email");
public void initNamespace(){
HBaseAdmin admin = null;
try {
admin = new HBaseAdmin(conf);
//命名空間類似于關系型資料庫中的schema,可以想象成檔案夾
NamespaceDescriptor weibo = NamespaceDescriptor
.create("weibo")
.addConfiguration("creator", "Jinji")
.addConfiguration("create_time", System.currentTimeMillis() + "")
.build();
admin.createNamespace(weibo);
} catch (MasterNotRunningException e) {
e.printStackTrace();
} catch (ZooKeeperConnectionException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}finally{
if(null != admin){
try {
admin.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
2.3 建立微網誌内容表
表結構:
方法名 | creatTableeContent |
Table Name | weibo:content |
RowKey | 使用者ID_時間戳 |
ColumnFamily | info |
ColumnLabel | 标題,内容,圖檔 |
Version | 1個版本 |
代碼:
/**
* 建立微網誌内容表
* Table Name:weibo:content
* RowKey:使用者ID_時間戳
* ColumnFamily:info
* ColumnLabel:标題 内容 圖檔URL
* Version:1個版本
*/
public void createTableContent(){
HBaseAdmin admin = null;
try {
admin = new HBaseAdmin(conf);
//建立表表述
HTableDescriptor content = new HTableDescriptor(TableName.valueOf(TABLE_CONTENT));
//建立列族描述
HColumnDescriptor info = new HColumnDescriptor(Bytes.toBytes("info"));
//設定塊緩存
info.setBlockCacheEnabled(true);
//設定塊緩存大小
info.setBlocksize(2097152);
//設定壓縮方式
// info.setCompressionType(Algorithm.SNAPPY);
//設定版本确界
info.setMaxVersions(1);
info.setMinVersions(1);
content.addFamily(info);
admin.createTable(content);
} catch (MasterNotRunningException e) {
e.printStackTrace();
} catch (ZooKeeperConnectionException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}finally{
if(null != admin){
try {
admin.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
2.4 建立使用者關系表
表結構:
方法名 | createTableRelations |
Table Name | weibo:relations |
RowKey | 使用者ID |
ColumnFamily | attends、fans |
ColumnLabel | 關注使用者ID,粉絲使用者ID |
ColumnValue | 使用者ID |
Version | 1個版本 |
代碼:
/**
* 使用者關系表
* Table Name:weibo:relations
* RowKey:使用者ID
* ColumnFamily:attends,fans
* ColumnLabel:關注使用者ID,粉絲使用者ID
* ColumnValue:使用者ID
* Version:1個版本
*/
public void createTableRelations(){
HBaseAdmin admin = null;
try {
admin = new HBaseAdmin(conf);
HTableDescriptor relations = new HTableDescriptor(TableName.valueOf(TABLE_RELATIONS));
//關注的人的列族
HColumnDescriptor attends = new HColumnDescriptor(Bytes.toBytes("attends"));
//設定塊緩存
attends.setBlockCacheEnabled(true);
//設定塊緩存大小
attends.setBlocksize(2097152);
//設定壓縮方式
// info.setCompressionType(Algorithm.SNAPPY);
//設定版本确界
attends.setMaxVersions(1);
attends.setMinVersions(1);
//粉絲列族
HColumnDescriptor fans = new HColumnDescriptor(Bytes.toBytes("fans"));
fans.setBlockCacheEnabled(true);
fans.setBlocksize(2097152);
fans.setMaxVersions(1);
fans.setMinVersions(1);
relations.addFamily(attends);
relations.addFamily(fans);
admin.createTable(relations);
} catch (MasterNotRunningException e) {
e.printStackTrace();
} catch (ZooKeeperConnectionException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}finally{
if(null != admin){
try {
admin.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
2.5 建立微網誌收件箱表
表結構:
方法名 | createTableReceiveContentEmails |
Table Name | weibo:receive_content_email |
RowKey | 使用者ID |
ColumnFamily | info |
ColumnLabel | 使用者ID |
ColumnValue | 取微網誌内容的RowKey |
Version | 1000 |
代碼:
/**
* 建立微網誌收件箱表
* Table Name: weibo:receive_content_email
* RowKey:使用者ID
* ColumnFamily:info
* ColumnLabel:使用者ID-釋出微網誌的人的使用者ID
* ColumnValue:關注的人的微網誌的RowKey
* Version:1000
*/
public void createTableReceiveContentEmail(){
HBaseAdmin admin = null;
try {
admin = new HBaseAdmin(conf);
HTableDescriptor receive_content_email = new HTableDescriptor(TableName.valueOf(TABLE_RECEIVE_CONTENT_EMAIL));
HColumnDescriptor info = new HColumnDescriptor(Bytes.toBytes("info"));
info.setBlockCacheEnabled(true);
info.setBlocksize(2097152);
info.setMaxVersions(1000);
info.setMinVersions(1000);
receive_content_email.addFamily(info);;
admin.createTable(receive_content_email);
} catch (MasterNotRunningException e) {
e.printStackTrace();
} catch (ZooKeeperConnectionException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}finally{
if(null != admin){
try {
admin.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
2.6 釋出微網誌内容
a、微網誌内容表中添加1條資料
b、微網誌收件箱表對所有粉絲使用者添加資料
代碼:Message.java
public class Message {
private String uid;
private String timestamp;
private String content;
public String getUid() {
return uid;
}
public void setUid(String uid) {
this.uid = uid;
}
public String getTimestamp() {
return timestamp;
}
public void setTimestamp(String timestamp) {
this.timestamp = timestamp;
}
public String getContent() {
return content;
}
public void setContent(String content) {
this.content = content;
}
@Override
public String toString() {
return "Message [uid=" + uid + ", timestamp=" + timestamp + ", content=" + content + "]";
}
}
代碼:public void publishContent(String uid, String content)
/**
* 釋出微網誌
* a、微網誌内容表中資料+1
* b、向微網誌收件箱表中加入微網誌的Rowkey
*/
public void publishContent(String uid, String content){
HConnection connection = null;
try {
connection = HConnectionManager.createConnection(conf);
//a、微網誌内容表中添加1條資料,首先擷取微網誌内容表描述
HTableInterface contentTBL = connection.getTable(TableName.valueOf(TABLE_CONTENT));
//組裝Rowkey
long timestamp = System.currentTimeMillis();
String rowKey = uid + "_" + timestamp;
Put put = new Put(Bytes.toBytes(rowKey));
put.add(Bytes.toBytes("info"), Bytes.toBytes("content"), timestamp, Bytes.toBytes(content));
contentTBL.put(put);
//b、向微網誌收件箱表中加入釋出的Rowkey
//b.1、查詢使用者關系表,得到目前使用者有哪些粉絲
HTableInterface relationsTBL = connection.getTable(TableName.valueOf(TABLE_RELATIONS));
//b.2、取出目标資料
Get get = new Get(Bytes.toBytes(uid));
get.addFamily(Bytes.toBytes("fans"));
Result result = relationsTBL.get(get);
List<byte[]> fans = new ArrayList<byte[]>();
//周遊取出目前釋出微網誌的使用者的所有粉絲資料
for(Cell cell : result.rawCells()){
fans.add(CellUtil.cloneQualifier(cell));
}
//如果該使用者沒有粉絲,則直接return
if(fans.size() <= 0) return;
//開始操作收件箱表
HTableInterface recTBL = connection.getTable(TableName.valueOf(TABLE_RECEIVE_CONTENT_EMAIL));
List<Put> puts = new ArrayList<Put>();
for(byte[] fan : fans){
Put fanPut = new Put(fan);
fanPut.add(Bytes.toBytes("info"), Bytes.toBytes(uid), timestamp, Bytes.toBytes(rowKey));
puts.add(fanPut);
}
recTBL.put(puts);
} catch (IOException e) {
e.printStackTrace();
}finally{
if(null != connection){
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
2.7 添加關注使用者
a、在微網誌使用者關系表中,對目前主動操作的使用者添加新關注的好友
b、在微網誌使用者關系表中,對被關注的使用者添加新的粉絲
c、微網誌收件箱表中添加所關注的使用者釋出的微網誌
代碼實作:public void addAttends(String uid, String... attends)
/**
* 關注使用者邏輯
* a、在微網誌使用者關系表中,對目前主動操作的使用者添加新的關注的好友
* b、在微網誌使用者關系表中,對被關注的使用者添加粉絲(目前操作的使用者)
* c、目前操作使用者的微網誌收件箱添加所關注的使用者釋出的微網誌rowkey
*/
public void addAttends(String uid, String... attends){
//參數過濾
if(attends == null || attends.length <= 0 || uid == null || uid.length() <= 0){
return;
}
HConnection connection = null;
try {
connection = HConnectionManager.createConnection(conf);
//使用者關系表操作對象(連接配接到使用者關系表)
HTableInterface relationsTBL = connection.getTable(TableName.valueOf(TABLE_RELATIONS));
List<Put> puts = new ArrayList<Put>();
//a、在微網誌使用者關系表中,添加新關注的好友
Put attendPut = new Put(Bytes.toBytes(uid));
for(String attend : attends){
//為目前使用者添加關注的人
attendPut.add(Bytes.toBytes("attends"), Bytes.toBytes(attend), Bytes.toBytes(attend));
//b、為被關注的人,添加粉絲
Put fansPut = new Put(Bytes.toBytes(attend));
fansPut.add(Bytes.toBytes("fans"), Bytes.toBytes(uid), Bytes.toBytes(uid));
//将所有關注的人一個一個的添加到puts(List)集合中
puts.add(fansPut);
}
puts.add(attendPut);
relationsTBL.put(puts);
//c.1、微網誌收件箱添加關注的使用者釋出的微網誌内容(content)的rowkey
HTableInterface contentTBL = connection.getTable(TableName.valueOf(TABLE_CONTENT));
Scan scan = new Scan();
//用于存放取出來的關注的人所釋出的微網誌的rowkey
List<byte[]> rowkeys = new ArrayList<byte[]>();
for(String attend : attends){
//過濾掃描rowkey,即:前置位比對被關注的人的uid_
RowFilter filter = new RowFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator(attend + "_"));
//為掃描對象指定過濾規則
scan.setFilter(filter);
//通過掃描對象得到scanner
ResultScanner result = contentTBL.getScanner(scan);
//疊代器周遊掃描出來的結果集
Iterator<Result> iterator = result.iterator();
while(iterator.hasNext()){
//取出每一個符合掃描結果的那一行資料
Result r = iterator.next();
for(Cell cell : r.rawCells()){
//将得到的rowkey放置于集合容器中
rowkeys.add(CellUtil.cloneRow(cell));
}
}
}
//c.2、将取出的微網誌rowkey放置于目前操作使用者的收件箱中
if(rowkeys.size() <= 0) return;
//得到微網誌收件箱表的操作對象
HTableInterface recTBL = connection.getTable(TableName.valueOf(TABLE_RECEIVE_CONTENT_EMAIL));
//用于存放多個關注的使用者的釋出的多條微網誌rowkey資訊
List<Put> recPuts = new ArrayList<Put>();
for(byte[] rk : rowkeys){
Put put = new Put(Bytes.toBytes(uid));
//uid_timestamp
String rowKey = Bytes.toString(rk);
//借取uid
String attendUID = rowKey.substring(0, rowKey.indexOf("_"));
long timestamp = Long.parseLong(rowKey.substring(rowKey.indexOf("_") + 1));
//将微網誌rowkey添加到指定單元格中
put.add(Bytes.toBytes("info"), Bytes.toBytes(attendUID), timestamp, rk);
recPuts.add(put);
}
recTBL.put(recPuts);
} catch (IOException e) {
e.printStackTrace();
}finally{
if(null != connection){
try {
connection.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
2.8 移除(取關)使用者
a、在微網誌使用者關系表中,對目前主動操作的使用者移除取關的好友(attends)
b、在微網誌使用者關系表中,對被取關的使用者移除粉絲
c、微網誌收件箱中删除取關的使用者釋出的微網誌
代碼:public void removeAttends(String uid, String... attends)
/**
* 取消關注(remove)
* a、在微網誌使用者關系表中,對目前主動操作的使用者删除對應取關的好友
* b、在微網誌使用者關系表中,對被取消關注的人删除粉絲(目前操作人)
* c、從收件箱中,删除取關的人的微網誌的rowkey
*/
public void removeAttends(String uid, String... attends){
//過濾資料
if(uid == null || uid.length() <= 0 || attends == null || attends.length <= 0) return;
HConnection connection = null;
try {
connection = HConnectionManager.createConnection(conf);
//a、在微網誌使用者關系表中,删除已關注的好友
HTableInterface relationsTBL = connection.getTable(TableName.valueOf(TABLE_RELATIONS));
//待删除的使用者關系表中的所有資料
List<Delete> deletes = new ArrayList<Delete>();
//目前取關操作者的uid對應的Delete對象
Delete attendDelete = new Delete(Bytes.toBytes(uid));
//周遊取關,同時每次取關都要将被取關的人的粉絲-1
for(String attend : attends){
attendDelete.deleteColumn(Bytes.toBytes("attends"), Bytes.toBytes(attend));
//b
Delete fansDelete = new Delete(Bytes.toBytes(attend));
fansDelete.deleteColumn(Bytes.toBytes("fans"), Bytes.toBytes(uid));
deletes.add(fansDelete);
}
deletes.add(attendDelete);
relationsTBL.delete(deletes);
//c、删除取關的人的微網誌rowkey 從 收件箱表中
HTableInterface recTBL = connection.getTable(TableName.valueOf(TABLE_RECEIVE_CONTENT_EMAIL));
Delete recDelete = new Delete(Bytes.toBytes(uid));
for(String attend : attends){
recDelete.deleteColumn(Bytes.toBytes("info"), Bytes.toBytes(attend));
}
recTBL.delete(recDelete);
} catch (IOException e) {
e.printStackTrace();
}
}
2.9 擷取關注的人的微網誌内容
a、從微網誌收件箱中擷取所關注的使用者的微網誌RowKey
b、根據擷取的RowKey,得到微網誌内容
代碼實作:public List<Message> getAttendsContent(String uid)
/**
* 擷取微網誌實際内容
* a、從微網誌收件箱中擷取所有關注的人的釋出的微網誌的rowkey
* b、根據得到的rowkey去微網誌内容表中得到資料
* c、将得到的資料封裝到Message對象中
*/
public List<Message> getAttendsContent(String uid){
HConnection connection = null;
try {
connection = HConnectionManager.createConnection(conf);
HTableInterface recTBL = connection.getTable(TableName.valueOf(TABLE_RECEIVE_CONTENT_EMAIL));
//a、從收件箱中取得微網誌rowKey
Get get = new Get(Bytes.toBytes(uid));
//設定最大版本号
get.setMaxVersions(5);
List<byte[]> rowkeys = new ArrayList<byte[]>();
Result result = recTBL.get(get);
for(Cell cell : result.rawCells()){
rowkeys.add(CellUtil.cloneValue(cell));
}
//b、根據取出的所有rowkey去微網誌内容表中檢索資料
HTableInterface contentTBL = connection.getTable(TableName.valueOf(TABLE_CONTENT));
List<Get> gets = new ArrayList<Get>();
//根據rowkey取出對應微網誌的具體内容
for(byte[] rk : rowkeys){
Get g = new Get(rk);
gets.add(g);
}
//得到所有的微網誌内容的result對象
Result[] results = contentTBL.get(gets);
List<Message> messages = new ArrayList<Message>();
for(Result res : results){
for(Cell cell : res.rawCells()){
Message message = new Message();
String rowKey = Bytes.toString(CellUtil.cloneRow(cell));
String userid = rowKey.substring(0, rowKey.indexOf("_"));
String timestamp = rowKey.substring(rowKey.indexOf("_") + 1);
String content = Bytes.toString(CellUtil.cloneValue(cell));
message.setContent(content);
message.setTimestamp(timestamp);
message.setUid(userid);
messages.add(message);
}
}
return messages;
} catch (IOException e) {
e.printStackTrace();
}finally{
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
return null;
}
8.2.10 測試
-- 測試釋出微網誌内容
public void testPublishContent(WeiBo wb)
-- 測試添加關注
public void testAddAttend(WeiBo wb)
-- 測試取消關注
public void testRemoveAttend(WeiBo wb)
-- 測試展示内容
public void testShowMessage(WeiBo wb)
代碼:
/**
* 釋出微網誌内容
* 添加關注
* 取消關注
* 展示内容
*/
public void testPublishContent(WeiBo wb){
wb.publishContent("0001", "今天買了一包空氣,送了點薯片,非常開心!!");
wb.publishContent("0001", "今天天氣不錯。");
}
public void testAddAttend(WeiBo wb){
wb.publishContent("0008", "準備下課!");
wb.publishContent("0009", "準備關機!");
wb.addAttends("0001", "0008", "0009");
}
public void testRemoveAttend(WeiBo wb){
wb.removeAttends("0001", "0008");
}
public void testShowMessage(WeiBo wb){
List<Message> messages = wb.getAttendsContent("0001");
for(Message message : messages){
System.out.println(message);
}
}
public static void main(String[] args) {
WeiBo weibo = new WeiBo();
weibo.initTable();
weibo.testPublishContent(weibo);
weibo.testAddAttend(weibo);
weibo.testShowMessage(weibo);
weibo.testRemoveAttend(weibo);
weibo.testShowMessage(weibo);
}
全部代碼筆記:
固定命名類Contants:
public class Contants {
//命名空間
public static final String NAME_SPACE = "weibo";
//使用者關系表
public static final String RELATION_TABLE = NAME_SPACE + ":relation";
//微網誌内容表
public static final String CONTENT_TABLE = NAME_SPACE + ":content";
//收件箱表
public static final String INBOX_TABLE = NAME_SPACE + ":inbox";
}
業務工具類:WeiBoUtil:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.RowFilter;
import org.apache.hadoop.hbase.filter.SubstringComparator;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
public class WeiBoUtil {
//擷取hbase配置資訊
private static Configuration configuration = HBaseConfiguration.create();
static {
configuration.set("hbase.zookeeper.quorum", "192.168.9.102");
}
/**
* 建立命名空間
*/
public static void createNamespace(String ns) throws IOException {
//擷取hbase管理者對象
Connection connection = ConnectionFactory.createConnection(configuration);
Admin admin = connection.getAdmin();
//建構命名空間描述器
NamespaceDescriptor namespaceDescriptor = NamespaceDescriptor.create(ns).build();
//建立namespace
admin.createNamespace(namespaceDescriptor);
admin.close();
connection.close();
}
/**
* 建立表
*/
public static void createTable(String tableName, int versions, String... cfs) throws IOException {
//擷取hbase管理者對象
Connection connection = ConnectionFactory.createConnection(configuration);
Admin admin = connection.getAdmin();
HTableDescriptor hTableDescriptor = new HTableDescriptor(TableName.valueOf(tableName));
for (String cf : cfs) {
HColumnDescriptor hColumnDescriptor = new HColumnDescriptor(cf);
hColumnDescriptor.setMaxVersions(versions);
hTableDescriptor.addFamily(hColumnDescriptor);
}
admin.createTable(hTableDescriptor);
admin.close();
connection.close();
}
public static void putData(String tableName, String uid, String cf, String cn, String value) throws IOException {
Connection connection = ConnectionFactory.createConnection(configuration);
Table table = connection.getTable(TableName.valueOf(tableName));
//封裝put
long ts = System.currentTimeMillis();
String rowkey = uid + "_" + ts;
Put put = new Put(Bytes.toBytes(rowkey));
put.addColumn(Bytes.toBytes(cf), Bytes.toBytes(cn), ts, Bytes.toBytes(value));
//執行操作
table.put(put);
//更新收件箱表
Table inboxTable = connection.getTable(TableName.valueOf(Contants.INBOX_TABLE));
Table relationTable = connection.getTable(TableName.valueOf(Contants.RELATION_TABLE));
Get get = new Get(Bytes.toBytes(uid));
Result result = relationTable.get(get);
ArrayList<Put> puts = new ArrayList<>();
for (Cell cell : result.rawCells()) {
if ("fans".equals(Bytes.toString(CellUtil.cloneFamily(cell)))) {
byte[] inboxRowkey = CellUtil.cloneQualifier(cell);
Put inboxPut = new Put(inboxRowkey);
inboxPut.addColumn(Bytes.toBytes("info"), Bytes.toBytes(uid), ts, Bytes.toBytes(rowkey));
puts.add(inboxPut);
}
}
inboxTable.put(puts);
table.close();
inboxTable.close();
connection.close();
}
/**
* 添加關注使用者(多個)
* 1.在使用者關系表中,給目前使用者添加attends
* 2.在使用者關系表中,給被關注使用者添加fans
* 3.在收件箱表中,給目前使用者添加關注使用者最近所發微網誌的rowkey
*/
public static void addAttends(String uid, String... attends) throws IOException {
//1.在使用者關系表中,給目前使用者添加attends
Connection connection = ConnectionFactory.createConnection(configuration);
Table table = connection.getTable(TableName.valueOf(Contants.RELATION_TABLE));
Put attendPut = new Put(Bytes.toBytes(uid));
//存放被關注使用者的添加對象
ArrayList<Put> puts = new ArrayList<>();
puts.add(attendPut);
for (String attend : attends) {
attendPut.addColumn(Bytes.toBytes("attends"), Bytes.toBytes(attend), Bytes.toBytes(""));
//2.在使用者關系表中,給被關注使用者添加fans
Put put = new Put(Bytes.toBytes(attend));
put.addColumn(Bytes.toBytes("fans"), Bytes.toBytes(uid), Bytes.toBytes(""));
puts.add(put);
}
table.put(puts);
//3.在收件箱表中,給目前使用者添加關注使用者最近所發微網誌的rowkey
Table inboxTabel = connection.getTable(TableName.valueOf(Contants.INBOX_TABLE));
Table contentTable = connection.getTable(TableName.valueOf(Contants.CONTENT_TABLE));
Put inboxPut = new Put(Bytes.toBytes(uid));
if (attends.length <= 0) {
return;
}
//循環添加要增加的資料
for (String attend : attends) {
//通過startRow和stopRow建構掃描器
// Scan scan = new Scan(Bytes.toBytes(attend), Bytes.toBytes(attend + "|"));
//通過過濾器建構掃描器
Scan scan = new Scan();
RowFilter rowFilter = new RowFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator(attend + "_"));
scan.setFilter(rowFilter);
//擷取所有符合掃描規則的額資料
ResultScanner scanner = contentTable.getScanner(scan);
//循環周遊取出每條資料的rowkey添加到inboxPut中
for (Result result : scanner) {
byte[] row = result.getRow();
inboxPut.addColumn(Bytes.toBytes("info"), Bytes.toBytes(attend), row);
//往收件箱表中給操作者添加資料
inboxTabel.put(inboxPut);
}
}
//關閉資源
inboxTabel.close();
contentTable.close();
table.close();
connection.close();
}
/**
* 取關
* 1.在使用者關系表中,删除目前使用者的attends
* 2.在使用者關系表中,删除被取關使用者的fans(操作者)
* 3.在收件箱表中删除取關使用者的所有資料
*/
public static void deleteRelation(String uid, String... deletes) throws IOException {
Connection connection = ConnectionFactory.createConnection(configuration);
//擷取使用者關系表對象
Table relationTable = connection.getTable(TableName.valueOf(Contants.RELATION_TABLE));
//存放關系表中所有要輸出的對象的集合
ArrayList<Delete> deleteArrayList = new ArrayList<>();
// 1.在使用者關系表中,删除目前使用者的attends
Delete userDelete = new Delete(Bytes.toBytes(uid));
for (String delete : deletes) {
//給目前使用者添加要删除的列
userDelete.addColumn(Bytes.toBytes("attends"), Bytes.toBytes(delete));
//2.在使用者關系表中,删除被取關使用者的fans(操作者)
Delete fanDelete = new Delete(Bytes.toBytes(delete));
//給被關注這添加删除的列
fanDelete.addColumn(Bytes.toBytes("fans"), Bytes.toBytes(uid));
deleteArrayList.add(fanDelete);
}
deleteArrayList.add(userDelete);
//使用者關系表删除操作
relationTable.delete(deleteArrayList);
//3.在收件箱表中删除取關使用者的所有資料
Table inboxTable = connection.getTable(TableName.valueOf(Contants.INBOX_TABLE));
Delete inboxDelete = new Delete(Bytes.toBytes(uid));
//循環添加要删除内容
for (String delete : deletes) {
inboxDelete.addColumns(Bytes.toBytes("info"), Bytes.toBytes(delete));
}
inboxTable.delete(inboxDelete);
//關閉資源
relationTable.close();
inboxTable.close();
connection.close();
}
/**
* 擷取關注的人的微網誌内容
*/
public static void getWeiBo(String uid) throws IOException {
//擷取微網誌内容表及收件箱表對象
Connection connection = ConnectionFactory.createConnection(configuration);
Table inboxTable = connection.getTable(TableName.valueOf(Contants.INBOX_TABLE));
Table contentTable = connection.getTable(TableName.valueOf(Contants.CONTENT_TABLE));
Get get = new Get(Bytes.toBytes(uid));
get.setMaxVersions(3);
Result result = inboxTable.get(get);
for (Cell cell : result.rawCells()) {
byte[] contentRowkey = CellUtil.cloneValue(cell);
Get contentGet = new Get(contentRowkey);
Result contentResult = contentTable.get(contentGet);
for (Cell cell1 : contentResult.rawCells()) {
String uid_ts = Bytes.toString(CellUtil.cloneRow(cell1));
String id = uid_ts.split("_")[0];
String ts = uid_ts.split("_")[1];
String date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date(Long.parseLong(ts)));
System.out.println("使用者:" + id + ",時間" + date + ",内容:" + Bytes.toString(CellUtil.cloneValue(cell1)));
}
}
inboxTable.close();
contentTable.close();
connection.close();
}
}
測試主類WeiBo:
import java.io.IOException;
public class Weibo {
public static void init() throws IOException {
WeiBoUtil.createNamespace(Contants.NAME_SPACE);
//建立使用者關系表
WeiBoUtil.createTable(Contants.RELATION_TABLE, 1, "attends", "fans");
//建立微網誌内容表
WeiBoUtil.createTable(Contants.CONTENT_TABLE, 1, "info");
//建立收件箱表
WeiBoUtil.createTable(Contants.INBOX_TABLE, 100, "info");
}
public static void main(String[] args) throws IOException {
// init();
//關注
// WeiBoUtil.addAttends("1001", "1002", "1003");
//被關注的人發微網誌(多個人發微網誌)
// WeiBoUtil.putData(Contants.CONTENT_TABLE, "1002", "info", "content", "今天天氣真晴朗!");
// WeiBoUtil.putData(Contants.CONTENT_TABLE, "1002", "info", "content", "春困秋乏!");
// WeiBoUtil.putData(Contants.CONTENT_TABLE, "1003", "info", "content", "夏打盹!");
// WeiBoUtil.putData(Contants.CONTENT_TABLE, "1001", "info", "content", "冬眠睡不醒!");
//擷取關注人的微網誌
WeiBoUtil.getWeiBo("1001");
//關注已經發過微網誌的人
// WeiBoUtil.addAttends("1002", "1001");
//擷取關注人的微網誌
// WeiBoUtil.getWeiBo("1002");
//取消關注
WeiBoUtil.deleteRelation("1001","1002");
//擷取關注人的微網誌
WeiBoUtil.getWeiBo("1001");
}
}
運作擷取關注人的微網誌
關注已發微網誌的人并擷取其釋出的微網誌: