MapReduce 編程 系列三 Reduce階段實現
來源:程序員人生 發布時間:2014-10-02 08:00:01 閱讀次數:1882次
Reduce代碼就是做加和統計,
package org.freebird.reducer;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer.Context;
import org.apache.hadoop.mapreduce.Reducer;
public class LogReducer<Key> extends Reducer<Key, IntWritable, Key,IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Key key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
這里框架保證在調用reduce方法之前,相同的key的value已經被放在values中,從而組成一個pair <key, values>,這些pair之間也已經用key做了排序。
參考文檔:https://hadoop.apache.org/docs/stable/api/org/apache/hadoop/mapreduce/Reducer.html
迭代遍歷values,取出所有的value,都是1, 簡單加和。
然后結果寫入到context中。 注意,這里的context是Reducer包的Context。
最后,寫一個Job類,將初始環境設置好。
package org.freebird;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Job;
public class LogJob {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "sum_did_from_log_file");
job.setJarByClass(LogJob.class);
job.setMapperClass(org.freebird.mapper.LogMapper.class);
job.setCombinerClass(org.freebird.reducer.LogReducer.class);
job.setReducerClass(org.freebird.reducer.LogReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
生活不易,碼農辛苦
如果您覺得本網站對您的學習有所幫助,可以手機掃描二維碼進行捐贈