天天看點

HBase rowkey設計執行個體

需求:繪制管道使用者的每日趨勢(每分鐘一組資料一天1440組,2000+個管道,區分新/老使用者,2*1440*2000+=576萬+/每天),需要儲存90天。

查詢條件:管道号、新or老使用者、日期

rowkey:管道_日期_新or老使用者_小時分鐘(hhmm)

連接配接HBase

from thrift.protocol import TBinaryProtocol
from thrift.transport import TSocket
from thrift.transport import TTransport
from hbase import Hbase

def hbase_connect():
    transport = TSocket.TSocket('*', 9090)
    # transport = TSocket.TSocket('10.50.14.105', 9090)
    hbase_transport = TTransport.TBufferedTransport(transport)
    protocol = TBinaryProtocol.TBinaryProtocol(transport)
    hbase_client = Hbase.Client(protocol)
    hbase_transport.open()
    return hbase_transport, hbase_client      

建立表:

def create_hbase_table():
    transport, client = hbase_connect()
    tables = client.getTableNames()
    print tables
    client.createTable('client_rt_pv', [Hbase.ColumnDescriptor(name = 'default')])
    tables = client.getTableNames()
    print tables      

插入資料:

mutationsbatch = []

#### loop
rowkey = '_'.join([tmp_pub, dayStr, 'ac', tmp_ct])
mutations = [
    Hbase.Mutation(column="default:pv", value=str(tmp_pv)),
    Hbase.Mutation(column="default:uv", value=str(tmp_uv)),
    Hbase.Mutation(column="default:pvdivuv", value=str('%.2f' % (tmp_pv/float(tmp_uv) if tmp_uv != 0 else 0, ))),
    Hbase.Mutation(column="default:tm", value=str(tmp_ct)),
    Hbase.Mutation(column="default:pub", value=str("".join([tmp_pub, ' ']))),
    Hbase.Mutation(column="default:pubname", value=pub_id.get(tmp_pub, 'unknown'))]

mutationsbatch.append(Hbase.BatchMutation(row=rowkey, mutations=mutations))

### end loop
hbase_client.mutateRows("client_rt_pv", mutationsbatch, None)
hive_transport.close()      

繼續閱讀