天天看點

MapReduce、Hbase接口API實踐

讀取hdfs中檔案并做處理,取出卡号,通過卡号連接配接hbase查詢出對應客戶号,寫入redis,因為不用輸出,是以不調用context.write方法,整個操作在一個map中便可完成

protected HTable connect
//setup方法被MapReduce架構僅且執行一次,在執行Map任務前,進行相關變量或者資源的集中初始化工作。若是将資源初始化工作放在方法map()中,導緻Mapper任務在解析每一行輸入時都會進行資源初始化工作,導緻重複,程式運作效率不高!
protected void setup(Context context) throws IOExcption,InterruptedException{
    super.setup(context)
    String jobName = context.getJobName();
    //檔案索引值
    cartNoIndex = conf.get(jobName + "source.key","7");
   //建立hbase連接配接,hbase-site.xml配置檔案需要在jar包中
    Configuration config = HBaseConfiguration.create();
    connect = new HTable(config,"tableName")
}

protected void map(writable key,Text value,Context context){
    if(value == null || value.toString().trim().isEmpty()){
    //計數器,記錄處理的條數
    context.getCounter(....).increment(1);
    }else{
        String[] values = Utils.split(value,separator,true);
    //業務邏輯處理
    int i = Integer.parseInt(cartNoIndex);
    if(i<values.length){
        cardNo = values[i];
    }else{
        logger.error("cardNo cannot find");
    }

//從hbase中查詢出對應客戶号
String rowkey = HTableManager.generatRowkey(cardNo);
Get getResult = new Get(rowkey.getBytes());
Result rs = connect.get(getResult);
String curNo = Bytes.toString(rs.getValue("f1".getBytes(),"column_name".getBtes());
RedisClient.getRedisClient().zincrbyset("spending:rank",countNum,custNo);

protected void cleanup(context context)throws IOException,InterruptedException{
  super.cleanup(context);
  connect.close();
}

      
public static String[] split(String value,String separator,boolean trimSpace){
    String[] rtn = split(value.separator);
    if(trimSpace && rtn != null){
        for(int i=0;i<rtn.length;i++){
            rtn[i] = rtn[i].trim();
        }
    }
    return rtn;
}


public static String[] split(String value,String separator){
    String[] rtn = null;
    if(value != null){
        boolean endBlank = false;
        if(value.endsWith(separator)){
            value +=" ";
            endBlank = true;
        }
    separator = escapeExprSpecialWord(deparator);
     if(endBlank){
        rtn(rtn.length-1) = "";
     }
   }
    return rtn;
}


public static String escapeExprSpecialWord(String keyWord){
        if(keyword != null && !keyword.isEmpty()){
            String[] fbsArr = {"\\","|","(",")"};
              for(String key : fbsArr){
                    if(keyword.contains(key){
                        keyword = keyword.replace(key,"\\"+key);
                    }
              }
        }
    return keyword;
}