`

Hadoop 使用Combiner提高Map/Reduce程序效率

 
阅读更多

众所周知,Hadoop框架使用Mapper将数据处理成一个<key,value>键值对,再网络节点间对其进行整理(shuffle),然后使用Reducer处理数据并进行最终输出。

在上述过程中,我们看到至少两个性能瓶颈:

  1. 如果我们有10亿个数据,Mapper会生成10亿个键值对在网络间进行传输,但如果我们只是对数据求最大值,那么很明显的Mapper只需要输出它所知道的最大值即可。这样做不仅可以减轻网络压力,同样也可以大幅度提高程序效率。
  2. 使用专利中的国家一项来阐述数据倾斜这个定义。这样的数据远远不是一致性的或者说平衡分布的,由于大多数专利的国家都属于美国,这样不仅Mapper中的键值对、中间阶段(shuffle)的键值对等,大多数的键值对最终会聚集于一个单一的Reducer之上,压倒这个Reducer,从而大大降低程序的性能。

Hadoop通过使用一个介于Mapper和Reducer之间的Combiner步骤来解决上述瓶颈。你可以将Combiner视为Reducer的一个帮手,它主要是为了削减Mapper的输出从而减少网

络带宽和Reducer之上的负载。如果我们定义一个Combiner,MapReducer框架会对中间数据多次地使用它进行处理。

如果Reducer只运行简单的分布式方法,例如最大值、最小值、或者计数,那么我们可以让Reducer自己作为Combiner。但许多有用的方法不是分布式的。以下我们使用求平均值作为例子进行讲解:

Mapper输出它所处理的键值对,为了使单个DataNode计算平均值Reducer会对它收到的<key,value>键值对进行排序,求和。

由于Reducer将它所收到的<key,value>键值的数目视为输入数据中的<key,value>键值对的数目,此时使用Combiner的主要障碍就是计数操作。我们可以重写MapReduce程序来明确的跟踪计数过程。

代码如下:

  1. packagecom;
  2. importjava.io.IOException;
  3. importorg.apache.hadoop.conf.Configuration;
  4. importorg.apache.hadoop.conf.Configured;
  5. importorg.apache.hadoop.fs.Path;
  6. importorg.apache.hadoop.io.DoubleWritable;
  7. importorg.apache.hadoop.io.LongWritable;
  8. importorg.apache.hadoop.io.Text;
  9. importorg.apache.hadoop.mapreduce.Job;
  10. importorg.apache.hadoop.mapreduce.Mapper;
  11. importorg.apache.hadoop.mapreduce.Reducer;
  12. importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  13. importorg.apache.hadoop.mapreduce.lib.input.TextInputFormat;
  14. importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  15. importorg.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
  16. importorg.apache.hadoop.util.Tool;
  17. importorg.apache.hadoop.util.ToolRunner;
  18. publicclassAveragingWithCombinerextendsConfiguredimplementsTool{
  19. publicstaticclassMapClassextendsMapper<LongWritable,Text,Text,Text>{
  20. staticenumClaimsCounters{MISSING,QUOTED};
  21. //MapMethod
  22. publicvoidmap(LongWritablekey,Textvalue,Contextcontext)throwsIOException,InterruptedException{
  23. Stringfields[]=value.toString().split(",",-20);
  24. Stringcountry=fields[4];
  25. StringnumClaims=fields[8];
  26. if(numClaims.length()>0&&!numClaims.startsWith("\"")){
  27. context.write(newText(country),newText(numClaims+",1"));
  28. }
  29. }
  30. }
  31. publicstaticclassReduceextendsReducer<Text,Text,Text,DoubleWritable>{
  32. //ReduceMethod
  33. publicvoidreduce(Textkey,Iterable<Text>values,Contextcontext)throwsIOException,InterruptedException{
  34. doublesum=0;
  35. intcount=0;
  36. for(Textvalue:values){
  37. Stringfields[]=value.toString().split(",");
  38. sum+=Double.parseDouble(fields[0]);
  39. count+=Integer.parseInt(fields[1]);
  40. }
  41. context.write(key,newDoubleWritable(sum/count));
  42. }
  43. }
  44. publicstaticclassCombineextendsReducer<Text,Text,Text,Text>{
  45. //ReduceMethod
  46. publicvoidreduce(Textkey,Iterable<Text>values,Contextcontext)throwsIOException,InterruptedException{
  47. doublesum=0;
  48. intcount=0;
  49. for(Textvalue:values){
  50. Stringfields[]=value.toString().split(",");
  51. sum+=Double.parseDouble(fields[0]);
  52. count+=Integer.parseInt(fields[1]);
  53. }
  54. context.write(key,newText(sum+","+count));
  55. }
  56. }
  57. //runMethod
  58. publicintrun(String[]args)throwsException{
  59. //CreateandRuntheJob
  60. Jobjob=newJob();
  61. job.setJarByClass(AveragingWithCombiner.class);
  62. FileInputFormat.addInputPath(job,newPath(args[0]));
  63. FileOutputFormat.setOutputPath(job,newPath(args[1]));
  64. job.setJobName("AveragingWithCombiner");
  65. job.setMapperClass(MapClass.class);
  66. job.setCombinerClass(Combine.class);
  67. job.setReducerClass(Reduce.class);
  68. job.setInputFormatClass(TextInputFormat.class);
  69. job.setOutputFormatClass(TextOutputFormat.class);
  70. job.setOutputKeyClass(Text.class);
  71. job.setOutputValueClass(Text.class);
  72. System.exit(job.waitForCompletion(true)?0:1);
  73. return0;
  74. }
  75. publicstaticvoidmain(String[]args)throwsException{
  76. intres=ToolRunner.run(newConfiguration(),newAveragingWithCombiner(),args);
  77. System.exit(res);
  78. }
  79. }
分享到:
评论

相关推荐

    提高hadoop的mapreduce job效率笔记

    修改mapper和reducer数量,如何使用combiner,什么时候该选择哪个writeable等。资料里很详细说明了。

    Hadoop实战中文版

    3.1 HDFS 文件操作 3.1.1 基本文件命令 3.1.2 编程读写HDFS 3.2 剖析MapReduce 程序 3.2.1 Hadoop数据类型 3.2.2 Mapper 3.2.3 Reducer 3.2.4 Partitioner:重定向Mapper输出 3.2.5 Combiner:本地reduce ...

    MapReduce分布式计算平台编程示例

    3 Hadoop MapReduce平台使用 5 3.1 streaming介绍 5 3.2 C语言Map-Reduce程序示例 6 3.2.1计算任务 6 3.2.2 Mapper算法设计 7 3.2.3 Reducer算法设计 8 3.2.4 作业提交命令 9 3.3 shell Map-Reduce程序示例 9 3.3.1...

    Hadoop实战中文版.PDF

    524.1.1 专利引用数据 534.1.2 专利描述数据 544.2 构建MapReduce程序的基础模板 554.3 计数 604.4 适应Hadoop API的改变 644.5 Hadoop的Streaming 674.5.1 通过Unix命令使用Streaming 684.5.2 通过...

    Hadoop实战(第2版)

    4.4 本章小结5 优化HDFS 处理大数据的技术5.1 处理小文件技术点24 使用Avro 存储大量小文件5.2 通过压缩提高数据存储效率技术点25 选择合适的压缩解码器技术点26 在HDFS、MapReduce、Pig 和Hive 中使用...

    hadoop 权威指南(第三版)英文版

    hadoop权威指南第三版(英文版)。 Foreword . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . xiii Preface . . . . . . ....

    Hadoop实战(陆嘉恒)译

    map侧过滤后在reduce侧联结5.3 创建一个Bloom filter5.3.1 Bloom filter做了什么5.3.2 实现一个Bloom filter5.3.3 Hadoop 0.20 以上版本的Bloom filter5.4 温故知新5.5 小结5.6 更多资源第6 章 编程实践6.1 开发...

    Hadoop实战

    313.1.1 基本文件命令 323.1.2 编程读写HDFS 353.2 剖析MapReduce程序 373.2.1 Hadoop数据类型 393.2.2 Mapper 403.2.3 Reducer 413.2.4 Partitioner:重定向Mapper输出 413.2.5 Combiner:本地reduce 433.2.6 预定...

    hadoop_the_definitive_guide_3nd_edition.pdf

    Map and Reduce Java MapReduce Scaling Out Data Flow Combiner Functions Running a Distributed MapReduce Job Hadoop Streaming Ruby Python import import import import import import org.apache.hadoop.fs...

    forcombiner_reduce_java_mapReduce_markizj_yourselfarq_源码

    Map Reduce中的Combiner就是为了避免map任务和reduce任务之间的数据传输而设置的,Hadoop允许用户针对map task的输出指定一个合并函数。即为了减少传输到Reduce中的数据量。它主要是为了削减Mapper的输出从而减少...

    2018最新BAT大数据面试题.docx

    3)很多人的误解在 Map 阶段,如果不使用 Combiner便不会排序,这是错误的,不管你用不用 Combiner,Map Task 均会对产生的数据排序(如果没有 Reduce Task,则不会排序,实际上 Map 阶段的排序就是为了减轻 Reduce...

    hadoop倒排索引实现 完整代码+报告

    Map和 Reduce的设计思路(含 Map、Reduce阶段的 K、V类型) 基本要求与排序 因为两者代码具有关联性,故放在一起说。 首先在基本要求中,Map 我们对于输入的文件每句进行切割,将单词与文件名作为(text)key,...

    MapReduce单词统计 hadoop集群

    Map阶段 采集数据 Combiner阶段 合并数据 Reduce阶段 最终处理,进行排序等自定义操作 每个阶段都会打印对应的数据处理情况,在Map阶段打印每一次读取切割之后的每个单词内容;在Combiner阶段打印单个分片里的单词...

    Hadoop硬实战 [(美)霍姆斯著][电子工业出版社][2015.01]_PDF电子书下载 带书签目录 高清完整版.rar )

    5.2 通过压缩提高数据存储效率 技术点25 选择合适的压缩解码器 技术点26 在HDFS、MapReduce、Pig 和Hive 中使用数据压缩 技术点27 在MapReduce、Hive 和Pig 中处理可分割的LZOP 5.3 本章小结 6 诊断和优化...

    hadoop_the_definitive_guide_3nd_edition

    Hadoop definitive 第三版, 目录如下 1. Meet Hadoop . . . 1 Data! 1 Data Storage and Analysis 3 Comparison with Other Systems 4 RDBMS 4 Grid Computing 6 Volunteer Computing 8 A Brief History of Hadoop 9...

    大数据开发笔试.docx

    这些按照时间顺序包括:输入分片(input split)、map阶段、combiner阶段、shuffle阶段和reduce阶段。(5个阶段) 3、map方法是如何调用reduce方法的 答:Shuffle过程是MapReduce的核心,也被称为奇迹发生的地方,...

    Hadoop二次开发必懂(下)

    Map的结果,会通过partition分发到Reducer上,Reducer做完Reduce操作后,通过OutputFormat,进行输出,下面我们就来分析参与这个过程的类。Mapper的结果,可能送到可能的Combiner做合并,Combiner在系统中并没有自己...

    Hadoop中MapReduce基本案例及代码(四)

    就相当于map后多reduce几次。 排序 如果想将mapreduce结果排序,需将排序对象作为键值。 案例:将利润求和后按照顺序排序 数据源 profit.txt 编号 | 姓名 | 收入 | 支出 1 ls 2850 100 2 ls 3566 200 3 ls 4555 323 ...

Global site tag (gtag.js) - Google Analytics