天天看點

HBase項目之谷粒微網誌:建立命名空間,微網誌内容表,使用者關系表,微網誌收件箱表,釋出微網誌内容,添加關注使用者,移除(取關)使用者,擷取關注的人的微網誌内容,HBase實戰項目Hbase實戰之谷粒微網誌2.2 建立命名空間以及表名的定義2.3 建立微網誌内容表2.4 建立使用者關系表2.5 建立微網誌收件箱表2.6 釋出微網誌内容2.7 添加關注使用者2.9 擷取關注的人的微網誌内容

Hbase實戰之谷粒微網誌

1 需求分析

1) 微網誌内容的浏覽,資料庫表設計

2) 使用者社交展現:關注使用者,取關使用者

3) 拉取關注的人的微網誌内容

2 代碼實作

2.1 代碼設計總覽:

1) 建立命名空間以及表名的定義

2) 建立微網誌内容表

3) 建立使用者關系表

4) 建立使用者微網誌内容接收郵件表

5) 釋出微網誌内容

6) 添加關注使用者

7) 移除(取關)使用者

8) 擷取關注的人的微網誌内容

9) 測試

資料庫關系表

HBase項目之谷粒微網誌:建立命名空間,微網誌内容表,使用者關系表,微網誌收件箱表,釋出微網誌内容,添加關注使用者,移除(取關)使用者,擷取關注的人的微網誌内容,HBase實戰項目Hbase實戰之谷粒微網誌2.2 建立命名空間以及表名的定義2.3 建立微網誌内容表2.4 建立使用者關系表2.5 建立微網誌收件箱表2.6 釋出微網誌内容2.7 添加關注使用者2.9 擷取關注的人的微網誌内容

2.2 建立命名空間以及表名的定義

//擷取配置conf
private Configuration conf = HBaseConfiguration.create();

//微網誌内容表的表名
private static final byte[] TABLE_CONTENT = Bytes.toBytes("weibo:content");
//使用者關系表的表名
private static final byte[] TABLE_RELATIONS = Bytes.toBytes("weibo:relations");
//微網誌收件箱表的表名
private static final byte[] TABLE_RECEIVE_CONTENT_EMAIL = Bytes.toBytes("weibo:receive_content_email");
public void initNamespace(){
	HBaseAdmin admin = null;
	try {
		admin = new HBaseAdmin(conf);
		//命名空間類似于關系型資料庫中的schema,可以想象成檔案夾
		NamespaceDescriptor weibo = NamespaceDescriptor
				.create("weibo")
				.addConfiguration("creator", "Jinji")
				.addConfiguration("create_time", System.currentTimeMillis() + "")
				.build();
		admin.createNamespace(weibo);
	} catch (MasterNotRunningException e) {
		e.printStackTrace();
	} catch (ZooKeeperConnectionException e) {
		e.printStackTrace();
	} catch (IOException e) {
		e.printStackTrace();
	}finally{
		if(null != admin){
			try {
				admin.close();
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
	}
}
           
HBase項目之谷粒微網誌:建立命名空間,微網誌内容表,使用者關系表,微網誌收件箱表,釋出微網誌内容,添加關注使用者,移除(取關)使用者,擷取關注的人的微網誌内容,HBase實戰項目Hbase實戰之谷粒微網誌2.2 建立命名空間以及表名的定義2.3 建立微網誌内容表2.4 建立使用者關系表2.5 建立微網誌收件箱表2.6 釋出微網誌内容2.7 添加關注使用者2.9 擷取關注的人的微網誌内容

2.3 建立微網誌内容表

表結構:

方法名 creatTableeContent
Table Name weibo:content
RowKey 使用者ID_時間戳
ColumnFamily info
ColumnLabel 标題,内容,圖檔
Version 1個版本

代碼:

/**
 * 建立微網誌内容表
 * Table Name:weibo:content
 * RowKey:使用者ID_時間戳
 * ColumnFamily:info
 * ColumnLabel:标題	内容		圖檔URL
 * Version:1個版本
 */
public void createTableContent(){
	HBaseAdmin admin = null;
	try {
		admin = new HBaseAdmin(conf);
		//建立表表述
		HTableDescriptor content = new HTableDescriptor(TableName.valueOf(TABLE_CONTENT));
		//建立列族描述
		HColumnDescriptor info = new HColumnDescriptor(Bytes.toBytes("info"));
		//設定塊緩存
		info.setBlockCacheEnabled(true);
		//設定塊緩存大小
		info.setBlocksize(2097152);
		//設定壓縮方式
//			info.setCompressionType(Algorithm.SNAPPY);
		//設定版本确界
		info.setMaxVersions(1);
		info.setMinVersions(1);
		
		content.addFamily(info);
		admin.createTable(content);
		
	} catch (MasterNotRunningException e) {
		e.printStackTrace();
	} catch (ZooKeeperConnectionException e) {
		e.printStackTrace();
	} catch (IOException e) {
		e.printStackTrace();
	}finally{
		if(null != admin){
			try {
				admin.close();
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
	}
}
           

2.4 建立使用者關系表

表結構:

方法名 createTableRelations
Table Name weibo:relations
RowKey 使用者ID
ColumnFamily attends、fans
ColumnLabel 關注使用者ID,粉絲使用者ID
ColumnValue 使用者ID
Version 1個版本

代碼:

/**
 * 使用者關系表
 * Table Name:weibo:relations
 * RowKey:使用者ID
 * ColumnFamily:attends,fans
 * ColumnLabel:關注使用者ID,粉絲使用者ID
 * ColumnValue:使用者ID
 * Version:1個版本
 */
public void createTableRelations(){
	HBaseAdmin admin = null;
	try {
		admin = new HBaseAdmin(conf);
		HTableDescriptor relations = new HTableDescriptor(TableName.valueOf(TABLE_RELATIONS));
		
		//關注的人的列族
		HColumnDescriptor attends = new HColumnDescriptor(Bytes.toBytes("attends"));
		//設定塊緩存
		attends.setBlockCacheEnabled(true);
		//設定塊緩存大小
		attends.setBlocksize(2097152);
		//設定壓縮方式
//			info.setCompressionType(Algorithm.SNAPPY);
		//設定版本确界
		attends.setMaxVersions(1);
		attends.setMinVersions(1);
		
		//粉絲列族
		HColumnDescriptor fans = new HColumnDescriptor(Bytes.toBytes("fans"));
		fans.setBlockCacheEnabled(true);
		fans.setBlocksize(2097152);
		fans.setMaxVersions(1);
		fans.setMinVersions(1);
		
		
		relations.addFamily(attends);
		relations.addFamily(fans);
		admin.createTable(relations);
		
	} catch (MasterNotRunningException e) {
		e.printStackTrace();
	} catch (ZooKeeperConnectionException e) {
		e.printStackTrace();
	} catch (IOException e) {
		e.printStackTrace();
	}finally{
		if(null != admin){
			try {
				admin.close();
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
	}
}
           

2.5 建立微網誌收件箱表

表結構:

方法名 createTableReceiveContentEmails
Table Name weibo:receive_content_email
RowKey 使用者ID
ColumnFamily info
ColumnLabel 使用者ID
ColumnValue 取微網誌内容的RowKey
Version 1000

代碼:

/**
 * 建立微網誌收件箱表
 * Table Name: weibo:receive_content_email
 * RowKey:使用者ID
 * ColumnFamily:info
 * ColumnLabel:使用者ID-釋出微網誌的人的使用者ID
 * ColumnValue:關注的人的微網誌的RowKey
 * Version:1000
 */
public void createTableReceiveContentEmail(){
	HBaseAdmin admin = null;
	try {
		admin = new HBaseAdmin(conf);
		HTableDescriptor receive_content_email = new HTableDescriptor(TableName.valueOf(TABLE_RECEIVE_CONTENT_EMAIL));
		HColumnDescriptor info = new HColumnDescriptor(Bytes.toBytes("info"));
		
		info.setBlockCacheEnabled(true);
		info.setBlocksize(2097152);
		info.setMaxVersions(1000);
		info.setMinVersions(1000);
		
		receive_content_email.addFamily(info);;
		admin.createTable(receive_content_email);
	} catch (MasterNotRunningException e) {
		e.printStackTrace();
	} catch (ZooKeeperConnectionException e) {
		e.printStackTrace();
	} catch (IOException e) {
		e.printStackTrace();
	}finally{
		if(null != admin){
			try {
				admin.close();
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
	}
}
           

2.6 釋出微網誌内容

a、微網誌内容表中添加1條資料

b、微網誌收件箱表對所有粉絲使用者添加資料

代碼:Message.java

public class Message {
	private String uid;
	private String timestamp;
	private String content;
	
	public String getUid() {
		return uid;
	}
	public void setUid(String uid) {
		this.uid = uid;
	}
	public String getTimestamp() {
		return timestamp;
	}
	public void setTimestamp(String timestamp) {
		this.timestamp = timestamp;
	}
	public String getContent() {
		return content;
	}
	public void setContent(String content) {
		this.content = content;
	}
	@Override
	public String toString() {
		return "Message [uid=" + uid + ", timestamp=" + timestamp + ", content=" + content + "]";
	}
}
           

代碼:public void publishContent(String uid, String content)

/**
 * 釋出微網誌
 * a、微網誌内容表中資料+1
 * b、向微網誌收件箱表中加入微網誌的Rowkey
 */
public void publishContent(String uid, String content){
	HConnection connection = null;
	try {
		connection = HConnectionManager.createConnection(conf);
		//a、微網誌内容表中添加1條資料,首先擷取微網誌内容表描述
		HTableInterface contentTBL = connection.getTable(TableName.valueOf(TABLE_CONTENT));
		//組裝Rowkey
		long timestamp = System.currentTimeMillis();
		String rowKey = uid + "_" + timestamp;
		
		Put put = new Put(Bytes.toBytes(rowKey));
		put.add(Bytes.toBytes("info"), Bytes.toBytes("content"), timestamp, Bytes.toBytes(content));
		
		contentTBL.put(put);
		
		//b、向微網誌收件箱表中加入釋出的Rowkey
		//b.1、查詢使用者關系表,得到目前使用者有哪些粉絲
		HTableInterface relationsTBL = connection.getTable(TableName.valueOf(TABLE_RELATIONS));
		//b.2、取出目标資料
		Get get = new Get(Bytes.toBytes(uid));
		get.addFamily(Bytes.toBytes("fans"));
		
		Result result = relationsTBL.get(get);
		List<byte[]> fans = new ArrayList<byte[]>();
		
		//周遊取出目前釋出微網誌的使用者的所有粉絲資料
		for(Cell cell : result.rawCells()){
			fans.add(CellUtil.cloneQualifier(cell));
		}
		//如果該使用者沒有粉絲,則直接return
		if(fans.size() <= 0) return;
		//開始操作收件箱表
		HTableInterface recTBL = connection.getTable(TableName.valueOf(TABLE_RECEIVE_CONTENT_EMAIL));
		List<Put> puts = new ArrayList<Put>();
		for(byte[] fan : fans){
			Put fanPut = new Put(fan);
			fanPut.add(Bytes.toBytes("info"), Bytes.toBytes(uid), timestamp, Bytes.toBytes(rowKey));
			puts.add(fanPut);
		}
		recTBL.put(puts);
	} catch (IOException e) {
		e.printStackTrace();
	}finally{
		if(null != connection){
			try {
				connection.close();
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
	}
}
           

2.7 添加關注使用者

a、在微網誌使用者關系表中,對目前主動操作的使用者添加新關注的好友

b、在微網誌使用者關系表中,對被關注的使用者添加新的粉絲

c、微網誌收件箱表中添加所關注的使用者釋出的微網誌

代碼實作:public void addAttends(String uid, String... attends)

/**
 * 關注使用者邏輯
 * a、在微網誌使用者關系表中,對目前主動操作的使用者添加新的關注的好友
 * b、在微網誌使用者關系表中,對被關注的使用者添加粉絲(目前操作的使用者)
 * c、目前操作使用者的微網誌收件箱添加所關注的使用者釋出的微網誌rowkey
 */
public void addAttends(String uid, String... attends){
	//參數過濾
	if(attends == null || attends.length <= 0 || uid == null || uid.length() <= 0){
		return;
	}
	HConnection connection = null;
	try {
		connection = HConnectionManager.createConnection(conf);
		//使用者關系表操作對象(連接配接到使用者關系表)
		HTableInterface relationsTBL = connection.getTable(TableName.valueOf(TABLE_RELATIONS));
		List<Put> puts = new ArrayList<Put>();
		//a、在微網誌使用者關系表中,添加新關注的好友
		Put attendPut = new Put(Bytes.toBytes(uid));
		for(String attend : attends){
			//為目前使用者添加關注的人
			attendPut.add(Bytes.toBytes("attends"), Bytes.toBytes(attend), Bytes.toBytes(attend));
			//b、為被關注的人,添加粉絲
			Put fansPut = new Put(Bytes.toBytes(attend));
			fansPut.add(Bytes.toBytes("fans"), Bytes.toBytes(uid), Bytes.toBytes(uid));
			//将所有關注的人一個一個的添加到puts(List)集合中
			puts.add(fansPut);
		}
		puts.add(attendPut);
		relationsTBL.put(puts);
		
		//c.1、微網誌收件箱添加關注的使用者釋出的微網誌内容(content)的rowkey
		HTableInterface contentTBL = connection.getTable(TableName.valueOf(TABLE_CONTENT));
		Scan scan = new Scan();
		//用于存放取出來的關注的人所釋出的微網誌的rowkey
		List<byte[]> rowkeys = new ArrayList<byte[]>();
		
		for(String attend : attends){
			//過濾掃描rowkey,即:前置位比對被關注的人的uid_
			RowFilter filter = new RowFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator(attend + "_"));
			//為掃描對象指定過濾規則
			scan.setFilter(filter);
			//通過掃描對象得到scanner
			ResultScanner result = contentTBL.getScanner(scan);
			//疊代器周遊掃描出來的結果集
			Iterator<Result> iterator = result.iterator();
			while(iterator.hasNext()){
				//取出每一個符合掃描結果的那一行資料
				Result r = iterator.next();
				for(Cell cell : r.rawCells()){
					//将得到的rowkey放置于集合容器中
					rowkeys.add(CellUtil.cloneRow(cell));
				}
				
			}
		}

//c.2、将取出的微網誌rowkey放置于目前操作使用者的收件箱中
		if(rowkeys.size() <= 0) return;
		//得到微網誌收件箱表的操作對象
		HTableInterface recTBL = connection.getTable(TableName.valueOf(TABLE_RECEIVE_CONTENT_EMAIL));
		//用于存放多個關注的使用者的釋出的多條微網誌rowkey資訊
		List<Put> recPuts = new ArrayList<Put>();
		for(byte[] rk : rowkeys){
			Put put = new Put(Bytes.toBytes(uid));
			//uid_timestamp
			String rowKey = Bytes.toString(rk);
			//借取uid
			String attendUID = rowKey.substring(0, rowKey.indexOf("_"));
			long timestamp = Long.parseLong(rowKey.substring(rowKey.indexOf("_") + 1));
			//将微網誌rowkey添加到指定單元格中
			put.add(Bytes.toBytes("info"), Bytes.toBytes(attendUID), timestamp, rk);
			recPuts.add(put);
		}
		
		recTBL.put(recPuts);
		
	} catch (IOException e) {
		e.printStackTrace();
	}finally{
		if(null != connection){
			try {
				connection.close();
			} catch (IOException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}
	}
}
           

2.8 移除(取關)使用者

a、在微網誌使用者關系表中,對目前主動操作的使用者移除取關的好友(attends)

b、在微網誌使用者關系表中,對被取關的使用者移除粉絲

c、微網誌收件箱中删除取關的使用者釋出的微網誌

代碼:public void removeAttends(String uid, String... attends)

/**
 * 取消關注(remove)
 * a、在微網誌使用者關系表中,對目前主動操作的使用者删除對應取關的好友
 * b、在微網誌使用者關系表中,對被取消關注的人删除粉絲(目前操作人)
 * c、從收件箱中,删除取關的人的微網誌的rowkey
 */
public void removeAttends(String uid, String... attends){
	//過濾資料
	if(uid == null || uid.length() <= 0 || attends == null || attends.length <= 0) return;
	HConnection connection = null;
	
	try {
		connection = HConnectionManager.createConnection(conf);
		//a、在微網誌使用者關系表中,删除已關注的好友
		HTableInterface relationsTBL = connection.getTable(TableName.valueOf(TABLE_RELATIONS));
		
		//待删除的使用者關系表中的所有資料
		List<Delete> deletes = new ArrayList<Delete>();
		//目前取關操作者的uid對應的Delete對象
		Delete attendDelete = new Delete(Bytes.toBytes(uid));
		//周遊取關,同時每次取關都要将被取關的人的粉絲-1
		for(String attend : attends){
			attendDelete.deleteColumn(Bytes.toBytes("attends"), Bytes.toBytes(attend));
			//b
			Delete fansDelete = new Delete(Bytes.toBytes(attend));
			fansDelete.deleteColumn(Bytes.toBytes("fans"), Bytes.toBytes(uid));
			deletes.add(fansDelete);
		}
		
		deletes.add(attendDelete);
		relationsTBL.delete(deletes);
		
		//c、删除取關的人的微網誌rowkey 從 收件箱表中
		HTableInterface recTBL = connection.getTable(TableName.valueOf(TABLE_RECEIVE_CONTENT_EMAIL));
		
		Delete recDelete = new Delete(Bytes.toBytes(uid));
		for(String attend : attends){
			recDelete.deleteColumn(Bytes.toBytes("info"), Bytes.toBytes(attend));
		}
		recTBL.delete(recDelete);
	} catch (IOException e) {
		e.printStackTrace();
	}
}
           

2.9 擷取關注的人的微網誌内容

a、從微網誌收件箱中擷取所關注的使用者的微網誌RowKey

b、根據擷取的RowKey,得到微網誌内容

代碼實作:public List<Message> getAttendsContent(String uid)

/**
 * 擷取微網誌實際内容
 * a、從微網誌收件箱中擷取所有關注的人的釋出的微網誌的rowkey
 * b、根據得到的rowkey去微網誌内容表中得到資料
 * c、将得到的資料封裝到Message對象中
 */
public List<Message> getAttendsContent(String uid){
	HConnection connection = null;
	try {
		connection = HConnectionManager.createConnection(conf);
		HTableInterface recTBL = connection.getTable(TableName.valueOf(TABLE_RECEIVE_CONTENT_EMAIL));
		//a、從收件箱中取得微網誌rowKey
		Get get = new Get(Bytes.toBytes(uid));
		//設定最大版本号
		get.setMaxVersions(5);
		List<byte[]> rowkeys = new ArrayList<byte[]>();
		Result result = recTBL.get(get);
		for(Cell cell : result.rawCells()){
			rowkeys.add(CellUtil.cloneValue(cell));
		}
		//b、根據取出的所有rowkey去微網誌内容表中檢索資料
		HTableInterface contentTBL = connection.getTable(TableName.valueOf(TABLE_CONTENT));
		List<Get> gets = new ArrayList<Get>();
		//根據rowkey取出對應微網誌的具體内容
		for(byte[] rk : rowkeys){
			Get g = new Get(rk);
			gets.add(g);
		}
		//得到所有的微網誌内容的result對象
		Result[] results = contentTBL.get(gets);
		
		List<Message> messages = new ArrayList<Message>();
		for(Result res : results){
			for(Cell cell : res.rawCells()){
				Message message = new Message();
				
				String rowKey = Bytes.toString(CellUtil.cloneRow(cell));
				String userid = rowKey.substring(0, rowKey.indexOf("_"));
				String timestamp = rowKey.substring(rowKey.indexOf("_") + 1);
				String content = Bytes.toString(CellUtil.cloneValue(cell));
				
				message.setContent(content);
				message.setTimestamp(timestamp);
				message.setUid(userid);
				
				messages.add(message);
			}
		}
		return messages;
	} catch (IOException e) {
		e.printStackTrace();
	}finally{
		try {
			connection.close();
		} catch (IOException e) {
			e.printStackTrace();
		}
	}
	return null;
}
           

8.2.10 測試

-- 測試釋出微網誌内容

public void testPublishContent(WeiBo wb)

-- 測試添加關注

public void testAddAttend(WeiBo wb)

-- 測試取消關注

public void testRemoveAttend(WeiBo wb)

-- 測試展示内容

public void testShowMessage(WeiBo wb)

代碼:

/**
 * 釋出微網誌内容
 * 添加關注
 * 取消關注
 * 展示内容
 */
public void testPublishContent(WeiBo wb){
	wb.publishContent("0001", "今天買了一包空氣,送了點薯片,非常開心!!");
	wb.publishContent("0001", "今天天氣不錯。");
}

public void testAddAttend(WeiBo wb){
	wb.publishContent("0008", "準備下課!");
	wb.publishContent("0009", "準備關機!");
	wb.addAttends("0001", "0008", "0009");
}

public void testRemoveAttend(WeiBo wb){
	wb.removeAttends("0001", "0008");
}

public void testShowMessage(WeiBo wb){
	List<Message> messages = wb.getAttendsContent("0001");
	for(Message message : messages){
		System.out.println(message);
	}
}

public static void main(String[] args) {
	WeiBo weibo = new WeiBo();
	weibo.initTable();
	
	weibo.testPublishContent(weibo);
	weibo.testAddAttend(weibo);
	weibo.testShowMessage(weibo);
	weibo.testRemoveAttend(weibo);
	weibo.testShowMessage(weibo);
}
           

全部代碼筆記:

固定命名類Contants:

public class Contants {
    //命名空間
    public static final String NAME_SPACE = "weibo";
    //使用者關系表
    public static final String RELATION_TABLE = NAME_SPACE + ":relation";
    //微網誌内容表
    public static final String CONTENT_TABLE = NAME_SPACE + ":content";
    //收件箱表
    public static final String INBOX_TABLE = NAME_SPACE + ":inbox";
}
           

業務工具類:WeiBoUtil:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.RowFilter;
import org.apache.hadoop.hbase.filter.SubstringComparator;
import org.apache.hadoop.hbase.util.Bytes;

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;

public class WeiBoUtil {

    //擷取hbase配置資訊
    private static Configuration configuration = HBaseConfiguration.create();

    static {
        configuration.set("hbase.zookeeper.quorum", "192.168.9.102");
    }

    /**
     * 建立命名空間
     */
    public static void createNamespace(String ns) throws IOException {
        //擷取hbase管理者對象
        Connection connection = ConnectionFactory.createConnection(configuration);
        Admin admin = connection.getAdmin();

        //建構命名空間描述器
        NamespaceDescriptor namespaceDescriptor = NamespaceDescriptor.create(ns).build();

        //建立namespace
        admin.createNamespace(namespaceDescriptor);

        admin.close();
        connection.close();
    }

    /**
     * 建立表
     */
    public static void createTable(String tableName, int versions, String... cfs) throws IOException {

        //擷取hbase管理者對象
        Connection connection = ConnectionFactory.createConnection(configuration);
        Admin admin = connection.getAdmin();

        HTableDescriptor hTableDescriptor = new HTableDescriptor(TableName.valueOf(tableName));
        for (String cf : cfs) {
            HColumnDescriptor hColumnDescriptor = new HColumnDescriptor(cf);
            hColumnDescriptor.setMaxVersions(versions);
            hTableDescriptor.addFamily(hColumnDescriptor);
        }

        admin.createTable(hTableDescriptor);

        admin.close();
        connection.close();
    }

    public static void putData(String tableName, String uid, String cf, String cn, String value) throws IOException {

        Connection connection = ConnectionFactory.createConnection(configuration);
        Table table = connection.getTable(TableName.valueOf(tableName));

        //封裝put
        long ts = System.currentTimeMillis();
        String rowkey = uid + "_" + ts;
        Put put = new Put(Bytes.toBytes(rowkey));
        put.addColumn(Bytes.toBytes(cf), Bytes.toBytes(cn), ts, Bytes.toBytes(value));

        //執行操作
        table.put(put);

        //更新收件箱表
        Table inboxTable = connection.getTable(TableName.valueOf(Contants.INBOX_TABLE));
        Table relationTable = connection.getTable(TableName.valueOf(Contants.RELATION_TABLE));

        Get get = new Get(Bytes.toBytes(uid));
        Result result = relationTable.get(get);

        ArrayList<Put> puts = new ArrayList<>();

        for (Cell cell : result.rawCells()) {
            if ("fans".equals(Bytes.toString(CellUtil.cloneFamily(cell)))) {
                byte[] inboxRowkey = CellUtil.cloneQualifier(cell);

                Put inboxPut = new Put(inboxRowkey);
                inboxPut.addColumn(Bytes.toBytes("info"), Bytes.toBytes(uid), ts, Bytes.toBytes(rowkey));

                puts.add(inboxPut);
            }
        }
        inboxTable.put(puts);

        table.close();
        inboxTable.close();
        connection.close();
    }

    /**
     * 添加關注使用者(多個)
     * 1.在使用者關系表中,給目前使用者添加attends
     * 2.在使用者關系表中,給被關注使用者添加fans
     * 3.在收件箱表中,給目前使用者添加關注使用者最近所發微網誌的rowkey
     */
    public static void addAttends(String uid, String... attends) throws IOException {

        //1.在使用者關系表中,給目前使用者添加attends
        Connection connection = ConnectionFactory.createConnection(configuration);
        Table table = connection.getTable(TableName.valueOf(Contants.RELATION_TABLE));

        Put attendPut = new Put(Bytes.toBytes(uid));

        //存放被關注使用者的添加對象
        ArrayList<Put> puts = new ArrayList<>();

        puts.add(attendPut);

        for (String attend : attends) {
            attendPut.addColumn(Bytes.toBytes("attends"), Bytes.toBytes(attend), Bytes.toBytes(""));
            //2.在使用者關系表中,給被關注使用者添加fans
            Put put = new Put(Bytes.toBytes(attend));
            put.addColumn(Bytes.toBytes("fans"), Bytes.toBytes(uid), Bytes.toBytes(""));
            puts.add(put);
        }
        table.put(puts);

        //3.在收件箱表中,給目前使用者添加關注使用者最近所發微網誌的rowkey
        Table inboxTabel = connection.getTable(TableName.valueOf(Contants.INBOX_TABLE));
        Table contentTable = connection.getTable(TableName.valueOf(Contants.CONTENT_TABLE));

        Put inboxPut = new Put(Bytes.toBytes(uid));

        if (attends.length <= 0) {
            return;
        }

        //循環添加要增加的資料
        for (String attend : attends) {
            //通過startRow和stopRow建構掃描器
//            Scan scan = new Scan(Bytes.toBytes(attend), Bytes.toBytes(attend + "|"));
            //通過過濾器建構掃描器
            Scan scan = new Scan();
            RowFilter rowFilter = new RowFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator(attend + "_"));
            scan.setFilter(rowFilter);

            //擷取所有符合掃描規則的額資料
            ResultScanner scanner = contentTable.getScanner(scan);

            //循環周遊取出每條資料的rowkey添加到inboxPut中
            for (Result result : scanner) {
                byte[] row = result.getRow();
                inboxPut.addColumn(Bytes.toBytes("info"), Bytes.toBytes(attend), row);
                //往收件箱表中給操作者添加資料
                inboxTabel.put(inboxPut);
            }
        }

        //關閉資源
        inboxTabel.close();
        contentTable.close();
        table.close();
        connection.close();
    }


    /**
     * 取關
     * 1.在使用者關系表中,删除目前使用者的attends
     * 2.在使用者關系表中,删除被取關使用者的fans(操作者)
     * 3.在收件箱表中删除取關使用者的所有資料
     */
    public static void deleteRelation(String uid, String... deletes) throws IOException {

        Connection connection = ConnectionFactory.createConnection(configuration);
        //擷取使用者關系表對象
        Table relationTable = connection.getTable(TableName.valueOf(Contants.RELATION_TABLE));

        //存放關系表中所有要輸出的對象的集合
        ArrayList<Delete> deleteArrayList = new ArrayList<>();

        // 1.在使用者關系表中,删除目前使用者的attends
        Delete userDelete = new Delete(Bytes.toBytes(uid));
        for (String delete : deletes) {
            //給目前使用者添加要删除的列
            userDelete.addColumn(Bytes.toBytes("attends"), Bytes.toBytes(delete));

            //2.在使用者關系表中,删除被取關使用者的fans(操作者)
            Delete fanDelete = new Delete(Bytes.toBytes(delete));
            //給被關注這添加删除的列
            fanDelete.addColumn(Bytes.toBytes("fans"), Bytes.toBytes(uid));
            deleteArrayList.add(fanDelete);
        }
        deleteArrayList.add(userDelete);

        //使用者關系表删除操作
        relationTable.delete(deleteArrayList);

        //3.在收件箱表中删除取關使用者的所有資料
        Table inboxTable = connection.getTable(TableName.valueOf(Contants.INBOX_TABLE));
        Delete inboxDelete = new Delete(Bytes.toBytes(uid));

        //循環添加要删除内容
        for (String delete : deletes) {
            inboxDelete.addColumns(Bytes.toBytes("info"), Bytes.toBytes(delete));
        }
        inboxTable.delete(inboxDelete);

        //關閉資源
        relationTable.close();
        inboxTable.close();
        connection.close();
    }

    /**
     * 擷取關注的人的微網誌内容
     */
    public static void getWeiBo(String uid) throws IOException {

        //擷取微網誌内容表及收件箱表對象
        Connection connection = ConnectionFactory.createConnection(configuration);
        Table inboxTable = connection.getTable(TableName.valueOf(Contants.INBOX_TABLE));
        Table contentTable = connection.getTable(TableName.valueOf(Contants.CONTENT_TABLE));

        Get get = new Get(Bytes.toBytes(uid));
        get.setMaxVersions(3);

        Result result = inboxTable.get(get);
        for (Cell cell : result.rawCells()) {
            byte[] contentRowkey = CellUtil.cloneValue(cell);
            Get contentGet = new Get(contentRowkey);
            Result contentResult = contentTable.get(contentGet);
            for (Cell cell1 : contentResult.rawCells()) {
                String uid_ts = Bytes.toString(CellUtil.cloneRow(cell1));
                String id = uid_ts.split("_")[0];
                String ts = uid_ts.split("_")[1];

                String date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date(Long.parseLong(ts)));
                System.out.println("使用者:" + id + ",時間" + date + ",内容:" + Bytes.toString(CellUtil.cloneValue(cell1)));
            }
        }

        inboxTable.close();
        contentTable.close();
        connection.close();
    }

}
           

測試主類WeiBo:

import java.io.IOException;

public class Weibo {

    public static void init() throws IOException {
        WeiBoUtil.createNamespace(Contants.NAME_SPACE);
        //建立使用者關系表
        WeiBoUtil.createTable(Contants.RELATION_TABLE, 1, "attends", "fans");
        //建立微網誌内容表
        WeiBoUtil.createTable(Contants.CONTENT_TABLE, 1, "info");
        //建立收件箱表
        WeiBoUtil.createTable(Contants.INBOX_TABLE, 100, "info");
    }


    public static void main(String[] args) throws IOException {
//        init();

        //關注
//        WeiBoUtil.addAttends("1001", "1002", "1003");

        //被關注的人發微網誌(多個人發微網誌)
//        WeiBoUtil.putData(Contants.CONTENT_TABLE, "1002", "info", "content", "今天天氣真晴朗!");
//        WeiBoUtil.putData(Contants.CONTENT_TABLE, "1002", "info", "content", "春困秋乏!");
//        WeiBoUtil.putData(Contants.CONTENT_TABLE, "1003", "info", "content", "夏打盹!");
//        WeiBoUtil.putData(Contants.CONTENT_TABLE, "1001", "info", "content", "冬眠睡不醒!");
        //擷取關注人的微網誌
        WeiBoUtil.getWeiBo("1001");

        //關注已經發過微網誌的人
//        WeiBoUtil.addAttends("1002", "1001");

        //擷取關注人的微網誌
//        WeiBoUtil.getWeiBo("1002");

        //取消關注
        WeiBoUtil.deleteRelation("1001","1002");

        //擷取關注人的微網誌
        WeiBoUtil.getWeiBo("1001");

    }
}
           

運作擷取關注人的微網誌

HBase項目之谷粒微網誌:建立命名空間,微網誌内容表,使用者關系表,微網誌收件箱表,釋出微網誌内容,添加關注使用者,移除(取關)使用者,擷取關注的人的微網誌内容,HBase實戰項目Hbase實戰之谷粒微網誌2.2 建立命名空間以及表名的定義2.3 建立微網誌内容表2.4 建立使用者關系表2.5 建立微網誌收件箱表2.6 釋出微網誌内容2.7 添加關注使用者2.9 擷取關注的人的微網誌内容

關注已發微網誌的人并擷取其釋出的微網誌:

HBase項目之谷粒微網誌:建立命名空間,微網誌内容表,使用者關系表,微網誌收件箱表,釋出微網誌内容,添加關注使用者,移除(取關)使用者,擷取關注的人的微網誌内容,HBase實戰項目Hbase實戰之谷粒微網誌2.2 建立命名空間以及表名的定義2.3 建立微網誌内容表2.4 建立使用者關系表2.5 建立微網誌收件箱表2.6 釋出微網誌内容2.7 添加關注使用者2.9 擷取關注的人的微網誌内容

繼續閱讀