众所周知,Hadoop框架使用Mapper将数据处理成一个<key,value>键值对,再网络节点间对其进行整理(shuffle),然后使用Reducer处理数据并进行最终输出。
在上述过程中,我们看到至少两个性能瓶颈:
- 如果我们有10亿个数据,Mapper会生成10亿个键值对在网络间进行传输,但如果我们只是对数据求最大值,那么很明显的Mapper只需要输出它所知道的最大值即可。这样做不仅可以减轻网络压力,同样也可以大幅度提高程序效率。
- 使用专利中的国家一项来阐述数据倾斜这个定义。这样的数据远远不是一致性的或者说平衡分布的,由于大多数专利的国家都属于美国,这样不仅Mapper中的键值对、中间阶段(shuffle)的键值对等,大多数的键值对最终会聚集于一个单一的Reducer之上,压倒这个Reducer,从而大大降低程序的性能。
Hadoop通过使用一个介于Mapper和Reducer之间的Combiner步骤来解决上述瓶颈。你可以将Combiner视为Reducer的一个帮手,它主要是为了削减Mapper的输出从而减少网
络带宽和Reducer之上的负载。如果我们定义一个Combiner,MapReducer框架会对中间数据多次地使用它进行处理。
如果Reducer只运行简单的分布式方法,例如最大值、最小值、或者计数,那么我们可以让Reducer自己作为Combiner。但许多有用的方法不是分布式的。以下我们使用求平均值作为例子进行讲解:
Mapper输出它所处理的键值对,为了使单个DataNode计算平均值Reducer会对它收到的<key,value>键值对进行排序,求和。
由于Reducer将它所收到的<key,value>键值的数目视为输入数据中的<key,value>键值对的数目,此时使用Combiner的主要障碍就是计数操作。我们可以重写MapReduce程序来明确的跟踪计数过程。
代码如下:
-
packagecom;
-
-
importjava.io.IOException;
-
-
importorg.apache.hadoop.conf.Configuration;
-
importorg.apache.hadoop.conf.Configured;
-
importorg.apache.hadoop.fs.Path;
-
importorg.apache.hadoop.io.DoubleWritable;
-
importorg.apache.hadoop.io.LongWritable;
-
importorg.apache.hadoop.io.Text;
-
importorg.apache.hadoop.mapreduce.Job;
-
importorg.apache.hadoop.mapreduce.Mapper;
-
importorg.apache.hadoop.mapreduce.Reducer;
-
importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-
importorg.apache.hadoop.mapreduce.lib.input.TextInputFormat;
-
importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-
importorg.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
-
importorg.apache.hadoop.util.Tool;
-
importorg.apache.hadoop.util.ToolRunner;
-
-
publicclassAveragingWithCombinerextendsConfiguredimplementsTool{
-
-
publicstaticclassMapClassextendsMapper<LongWritable,Text,Text,Text>{
-
-
staticenumClaimsCounters{MISSING,QUOTED};
-
-
publicvoidmap(LongWritablekey,Textvalue,Contextcontext)throwsIOException,InterruptedException{
-
Stringfields[]=value.toString().split(",",-20);
-
Stringcountry=fields[4];
-
StringnumClaims=fields[8];
-
-
if(numClaims.length()>0&&!numClaims.startsWith("\"")){
-
context.write(newText(country),newText(numClaims+",1"));
-
}
-
}
-
}
-
-
publicstaticclassReduceextendsReducer<Text,Text,Text,DoubleWritable>{
-
-
-
publicvoidreduce(Textkey,Iterable<Text>values,Contextcontext)throwsIOException,InterruptedException{
-
doublesum=0;
-
intcount=0;
-
for(Textvalue:values){
-
Stringfields[]=value.toString().split(",");
-
sum+=Double.parseDouble(fields[0]);
-
count+=Integer.parseInt(fields[1]);
-
}
-
context.write(key,newDoubleWritable(sum/count));
-
}
-
}
-
-
publicstaticclassCombineextendsReducer<Text,Text,Text,Text>{
-
-
-
publicvoidreduce(Textkey,Iterable<Text>values,Contextcontext)throwsIOException,InterruptedException{
-
doublesum=0;
-
intcount=0;
-
for(Textvalue:values){
-
Stringfields[]=value.toString().split(",");
-
sum+=Double.parseDouble(fields[0]);
-
count+=Integer.parseInt(fields[1]);
-
}
-
context.write(key,newText(sum+","+count));
-
}
-
}
-
-
-
publicintrun(String[]args)throwsException{
-
-
Jobjob=newJob();
-
job.setJarByClass(AveragingWithCombiner.class);
-
-
FileInputFormat.addInputPath(job,newPath(args[0]));
-
FileOutputFormat.setOutputPath(job,newPath(args[1]));
-
-
job.setJobName("AveragingWithCombiner");
-
job.setMapperClass(MapClass.class);
-
job.setCombinerClass(Combine.class);
-
job.setReducerClass(Reduce.class);
-
job.setInputFormatClass(TextInputFormat.class);
-
job.setOutputFormatClass(TextOutputFormat.class);
-
-
job.setOutputKeyClass(Text.class);
-
job.setOutputValueClass(Text.class);
-
-
System.exit(job.waitForCompletion(true)?0:1);
-
return0;
-
}
-
-
publicstaticvoidmain(String[]args)throwsException{
-
intres=ToolRunner.run(newConfiguration(),newAveragingWithCombiner(),args);
-
System.exit(res);
-
}
-
-
}
分享到:
相关推荐
修改mapper和reducer数量,如何使用combiner,什么时候该选择哪个writeable等。资料里很详细说明了。
3.1 HDFS 文件操作 3.1.1 基本文件命令 3.1.2 编程读写HDFS 3.2 剖析MapReduce 程序 3.2.1 Hadoop数据类型 3.2.2 Mapper 3.2.3 Reducer 3.2.4 Partitioner:重定向Mapper输出 3.2.5 Combiner:本地reduce ...
3 Hadoop MapReduce平台使用 5 3.1 streaming介绍 5 3.2 C语言Map-Reduce程序示例 6 3.2.1计算任务 6 3.2.2 Mapper算法设计 7 3.2.3 Reducer算法设计 8 3.2.4 作业提交命令 9 3.3 shell Map-Reduce程序示例 9 3.3.1...
524.1.1 专利引用数据 534.1.2 专利描述数据 544.2 构建MapReduce程序的基础模板 554.3 计数 604.4 适应Hadoop API的改变 644.5 Hadoop的Streaming 674.5.1 通过Unix命令使用Streaming 684.5.2 通过...
4.4 本章小结5 优化HDFS 处理大数据的技术5.1 处理小文件技术点24 使用Avro 存储大量小文件5.2 通过压缩提高数据存储效率技术点25 选择合适的压缩解码器技术点26 在HDFS、MapReduce、Pig 和Hive 中使用...
hadoop权威指南第三版(英文版)。 Foreword . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . xiii Preface . . . . . . ....
map侧过滤后在reduce侧联结5.3 创建一个Bloom filter5.3.1 Bloom filter做了什么5.3.2 实现一个Bloom filter5.3.3 Hadoop 0.20 以上版本的Bloom filter5.4 温故知新5.5 小结5.6 更多资源第6 章 编程实践6.1 开发...
313.1.1 基本文件命令 323.1.2 编程读写HDFS 353.2 剖析MapReduce程序 373.2.1 Hadoop数据类型 393.2.2 Mapper 403.2.3 Reducer 413.2.4 Partitioner:重定向Mapper输出 413.2.5 Combiner:本地reduce 433.2.6 预定...
Map and Reduce Java MapReduce Scaling Out Data Flow Combiner Functions Running a Distributed MapReduce Job Hadoop Streaming Ruby Python import import import import import import org.apache.hadoop.fs...
Map Reduce中的Combiner就是为了避免map任务和reduce任务之间的数据传输而设置的,Hadoop允许用户针对map task的输出指定一个合并函数。即为了减少传输到Reduce中的数据量。它主要是为了削减Mapper的输出从而减少...
3)很多人的误解在 Map 阶段,如果不使用 Combiner便不会排序,这是错误的,不管你用不用 Combiner,Map Task 均会对产生的数据排序(如果没有 Reduce Task,则不会排序,实际上 Map 阶段的排序就是为了减轻 Reduce...
Map和 Reduce的设计思路(含 Map、Reduce阶段的 K、V类型) 基本要求与排序 因为两者代码具有关联性,故放在一起说。 首先在基本要求中,Map 我们对于输入的文件每句进行切割,将单词与文件名作为(text)key,...
Map阶段 采集数据 Combiner阶段 合并数据 Reduce阶段 最终处理,进行排序等自定义操作 每个阶段都会打印对应的数据处理情况,在Map阶段打印每一次读取切割之后的每个单词内容;在Combiner阶段打印单个分片里的单词...
5.2 通过压缩提高数据存储效率 技术点25 选择合适的压缩解码器 技术点26 在HDFS、MapReduce、Pig 和Hive 中使用数据压缩 技术点27 在MapReduce、Hive 和Pig 中处理可分割的LZOP 5.3 本章小结 6 诊断和优化...
Hadoop definitive 第三版, 目录如下 1. Meet Hadoop . . . 1 Data! 1 Data Storage and Analysis 3 Comparison with Other Systems 4 RDBMS 4 Grid Computing 6 Volunteer Computing 8 A Brief History of Hadoop 9...
这些按照时间顺序包括:输入分片(input split)、map阶段、combiner阶段、shuffle阶段和reduce阶段。(5个阶段) 3、map方法是如何调用reduce方法的 答:Shuffle过程是MapReduce的核心,也被称为奇迹发生的地方,...
Map的结果,会通过partition分发到Reducer上,Reducer做完Reduce操作后,通过OutputFormat,进行输出,下面我们就来分析参与这个过程的类。Mapper的结果,可能送到可能的Combiner做合并,Combiner在系统中并没有自己...
就相当于map后多reduce几次。 排序 如果想将mapreduce结果排序,需将排序对象作为键值。 案例:将利润求和后按照顺序排序 数据源 profit.txt 编号 | 姓名 | 收入 | 支出 1 ls 2850 100 2 ls 3566 200 3 ls 4555 323 ...