讀取hdfs中檔案并做處理,取出卡号,通過卡号連接配接hbase查詢出對應客戶号,寫入redis,因為不用輸出,是以不調用context.write方法,整個操作在一個map中便可完成
protected HTable connect
//setup方法被MapReduce架構僅且執行一次,在執行Map任務前,進行相關變量或者資源的集中初始化工作。若是将資源初始化工作放在方法map()中,導緻Mapper任務在解析每一行輸入時都會進行資源初始化工作,導緻重複,程式運作效率不高!
protected void setup(Context context) throws IOExcption,InterruptedException{
super.setup(context)
String jobName = context.getJobName();
//檔案索引值
cartNoIndex = conf.get(jobName + "source.key","7");
//建立hbase連接配接,hbase-site.xml配置檔案需要在jar包中
Configuration config = HBaseConfiguration.create();
connect = new HTable(config,"tableName")
}
protected void map(writable key,Text value,Context context){
if(value == null || value.toString().trim().isEmpty()){
//計數器,記錄處理的條數
context.getCounter(....).increment(1);
}else{
String[] values = Utils.split(value,separator,true);
//業務邏輯處理
int i = Integer.parseInt(cartNoIndex);
if(i<values.length){
cardNo = values[i];
}else{
logger.error("cardNo cannot find");
}
//從hbase中查詢出對應客戶号
String rowkey = HTableManager.generatRowkey(cardNo);
Get getResult = new Get(rowkey.getBytes());
Result rs = connect.get(getResult);
String curNo = Bytes.toString(rs.getValue("f1".getBytes(),"column_name".getBtes());
RedisClient.getRedisClient().zincrbyset("spending:rank",countNum,custNo);
protected void cleanup(context context)throws IOException,InterruptedException{
super.cleanup(context);
connect.close();
}
public static String[] split(String value,String separator,boolean trimSpace){
String[] rtn = split(value.separator);
if(trimSpace && rtn != null){
for(int i=0;i<rtn.length;i++){
rtn[i] = rtn[i].trim();
}
}
return rtn;
}
public static String[] split(String value,String separator){
String[] rtn = null;
if(value != null){
boolean endBlank = false;
if(value.endsWith(separator)){
value +=" ";
endBlank = true;
}
separator = escapeExprSpecialWord(deparator);
if(endBlank){
rtn(rtn.length-1) = "";
}
}
return rtn;
}
public static String escapeExprSpecialWord(String keyWord){
if(keyword != null && !keyword.isEmpty()){
String[] fbsArr = {"\\","|","(",")"};
for(String key : fbsArr){
if(keyword.contains(key){
keyword = keyword.replace(key,"\\"+key);
}
}
}
return keyword;
}