需求:繪制管道使用者的每日趨勢(每分鐘一組資料一天1440組,2000+個管道,區分新/老使用者,2*1440*2000+=576萬+/每天),需要儲存90天。
查詢條件:管道号、新or老使用者、日期
rowkey:管道_日期_新or老使用者_小時分鐘(hhmm)
連接配接HBase
from thrift.protocol import TBinaryProtocol
from thrift.transport import TSocket
from thrift.transport import TTransport
from hbase import Hbase
def hbase_connect():
transport = TSocket.TSocket('*', 9090)
# transport = TSocket.TSocket('10.50.14.105', 9090)
hbase_transport = TTransport.TBufferedTransport(transport)
protocol = TBinaryProtocol.TBinaryProtocol(transport)
hbase_client = Hbase.Client(protocol)
hbase_transport.open()
return hbase_transport, hbase_client
建立表:
def create_hbase_table():
transport, client = hbase_connect()
tables = client.getTableNames()
print tables
client.createTable('client_rt_pv', [Hbase.ColumnDescriptor(name = 'default')])
tables = client.getTableNames()
print tables
插入資料:
mutationsbatch = []
#### loop
rowkey = '_'.join([tmp_pub, dayStr, 'ac', tmp_ct])
mutations = [
Hbase.Mutation(column="default:pv", value=str(tmp_pv)),
Hbase.Mutation(column="default:uv", value=str(tmp_uv)),
Hbase.Mutation(column="default:pvdivuv", value=str('%.2f' % (tmp_pv/float(tmp_uv) if tmp_uv != 0 else 0, ))),
Hbase.Mutation(column="default:tm", value=str(tmp_ct)),
Hbase.Mutation(column="default:pub", value=str("".join([tmp_pub, ' ']))),
Hbase.Mutation(column="default:pubname", value=pub_id.get(tmp_pub, 'unknown'))]
mutationsbatch.append(Hbase.BatchMutation(row=rowkey, mutations=mutations))
### end loop
hbase_client.mutateRows("client_rt_pv", mutationsbatch, None)
hive_transport.close()