天天看點

Hadoop的word co-occurrence實作

Word Co-occurrence一直不知道該怎麼正确翻譯, 單詞相似度?還是共生單詞?還是單詞的共生矩陣?

這在統計裡面是很常用的文本處理算法,用來度量一組文檔集中所有出現頻率最接近的詞組.嗯,其實是上下文詞組,不是單詞.算是一個比較常用的算法,可以衍生出其他的統計算法.能用來做推薦,因為它能夠提供的結果是"人們看了這個,也會看那個".比如做一些協同過濾之外的購物商品的推薦,信用卡的風險分析,或者是計算大家都喜歡什麼東西.

比如 I love you , 出現 "I love" 的同時往往伴随着 "love you" 的出現,不過中文的處理跟英文不一樣,需要先用分詞庫做預處理.

按照Mapper, Reducer和Driver的方式拆分代碼

Mapper程式:

package wco;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class WCoMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

  @Override
  public void map(LongWritable key, Text value, Context context)
      throws IOException, InterruptedException {
    
    /*
     * 将行内容全部轉換為小寫格式.
     */
    String line_lc = value.toString().toLowerCase();
    String before = null;
    
    /*
     *  将行拆分成單詞
     *  并且key是前一個單詞加上後一個單詞
     *  value 是 1
     */
    for (String word : line_lc.split("\\W+")) { //循環行内容,按照空格進行分割單詞
      if (word.length() > 0) {
        if (before != null) { //如果前詞不為空,則寫入上下文(第一次前詞一定是空,直接跳到下面的before = word)
          context.write(new Text(before + "," + word), new IntWritable(1));
        }
        before = word; //将現詞指派給前詞
      }
    }
  }
}      

Reducer程式:

package wco;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class WCoReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

  @Override
  public void reduce(Text key, Iterable<IntWritable> values, Context context)
      throws IOException, InterruptedException {

    int wordCount = 0;
    for (IntWritable value : values) {
      wordCount += value.get(); //單純計算word count
    }
    context.write(key, new IntWritable(wordCount));
  }
}      
package wco;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class WCo extends Configured implements Tool {

  @Override
  public int run(String[] args) throws Exception {

    if (args.length != 2) {
      System.out.printf("Usage: hadoop jar wco.WCo <input> <output>\n");
      return -1;
    }

    Job job = new Job(getConf());
    job.setJarByClass(WCo.class);
    job.setJobName("Word Co Occurrence");

    FileInputFormat.setInputPaths(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));

    job.setMapperClass(WCoMapper.class);
    job.setReducerClass(WCoReducer.class);

    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);

    boolean success = job.waitForCompletion(true);
    return success ? 0 : 1;
  }

  public static void main(String[] args) throws Exception {
    int exitCode = ToolRunner.run(new Configuration(), new WCo(), args);
    System.exit(exitCode);
  }
}      

繼續閱讀