`

Hadoop中MapReduce的原理

 
阅读更多

Hadoop中的MapReduce是一个使用简易的软件框架,基于它写出来的应用程序能够运行在由上千个商用机器组成的大型集群上,并以一种可靠容错的式并 行处理上T级别的数据集。

一个MapReduce作业(job)通常会把输入的数据集切分为若干独立的数据块,由map任务(task)以完全并行的方式处理它们。框架会对map的输出先进行排序,然后把结果输入给reduce任务。通常作业的输入和输出都会被存储在文件系统中。整个框架负责任务的调度和监控,以及重新执行已经失败的任务。

通常,MapReduce框架和分布式文件系统是运行在一组相同的节点上的,也就是说,计算节点和存储节点通常在一起。这种配置允许框架在那些已经存好数据的节点上高效地调度任务,这可以使整个集群的网络带宽被非常高效地利用。

MapReduce框架由一个单独的master JobTracker和每个集群节点一个slave TaskTracker共同组成。master负责调度构成一个作业的所有任务,这些任务分布在不同的slave上,master监控它们的执行,重新执行已经失败的任务。而slave仅负责执行由master指派的任务。

应用程序至少应该指明输入输出的位置(路径),并通过实现合适的接口或抽象类提供map和reduce函数。再加上其他作业的参数,就构成了作业配置(jobconfiguration)。然后,Hadoop的jobclient提交作业(jar包可执行程序等)和配置信息给JobTracker,后者负责分发这些软件和配置信息给slave、调度任务并监控它们的执行,同时提供状态和诊断信息给job-client。

输入与输出:MapReduce框架运转在<key,value>键值对上,也就是说,框架把作业的输入看为是一组<key,value>键值对,同样也产出一组<key,value>键值对做为作业的输出,这两组键值对的类型可能不同。

框架需要对key和value的类(classes)进行序列化操作,因此,这些类需要实现Writable接口。另外,为了方便框架执行排序操作,key类必须实现WritableComparable接口。一个MapReduce作业的输入和输出类型如下所示:

(input)<k1,v1>->map-><k2,v2>->combine-><k2,v2>->reduce-><k3,v3>(output)

Hadoop中的MapReduce工作流程简介

一般来讲,Hadoop的一个简单的MapReduce任务执行流程如下图所示



1) JobTracker负责分布式环境中实现客户端创建任务并提交。

2) InputFormat模块负责做Map前的预处理,主要包括以下几个工作:验证输入的格式是否符合JobConfig的输入定义,可以是专门定义或者是Writable的子类。将input的文件切分为逻辑上的输入InputSplit,因为在分布式文件系统中blocksize是有大小限制的,因此大文件会被划分为多个较小的block。通过RecordReader来处理经过文件切分为Inputsplit的一组records,输出给Map。因为Inputsplit是逻辑切分的第一步,如何根据文件中的信息来具体切分还需要RecordReader完成。

3) 将RecordReader处理后的结果作为Map的输入,然后Map执行定义的Map逻辑,输出处理后的(key,value)对到临时中间文件。

4) Combiner是可选择的,它的主要作用是在每一个Map执行完分析以后,在本地优先作Reduce的工作,减少在Reduce过程中的数据传输量。

5) Partitioner也是选择配置,主要作用是在多个Reduce的情况下,指定Map的结果由某一个Reduce处理,每一个Reduce都会有单独的输出文件。

6) Reduce执行具体的业务逻辑,即用户编写的处理数据得到结果的业务,并且将处理结果输出给OutputFormat。

7) OutputFormat的作用是,验证输出目录是否已经存在和输出结果类型是否复合Config中配置类型,如果都成立,则输出Reduce汇总后的结果。


Hadoop中MapReduce的任务调度

首先要保证master节点的NameNode,SecondedNameNode,JobTracker和slaves节点的DataNode,TaskTracker都已经启动。通常MapRedcue作业是通过JobClient.rubJob(job)方法向master节点的JobTracker提交的,JobTracker接到JobClient的请求后把其加入作业队列中。JobTracker一直在等待JobClient通过RPC向其提交作业,而TaskTracker一直通过RPC向JobTracker发送心跳信号询问有没有任务可做,如果有,则请求JobTracker派发任务给它执行。如果JobTracker的作业队列不为空,则TaskTracker发送的心跳将会获得JobTracker给它派发的任务。这是一个主动请求的任务:slave的TaskTracker主动向master的JobTracker请求任务。当TaskTracker接到任务后,通过自身调度在本slave建立起Task,执行任务。下图是MapReduce任务请求调度的过程示意图:


具体来说,这个过程包括两个步骤:

1).JobClient提交作业

JobClient.runJob(job)静态方法会实例化一个JobClient的实例,然后用此实例的submitJob(job)方法向JobTracker提交作业。此方法会返回一个RunningJob对象,它用来跟踪作业的状态。作业提交完毕后,JobClient会根据此对象开始关注作业的进度,直到作业完成。submitJob(job)内部是通过调用submitJobInternal(job)方法完成实质性的作业提交的。submitJobInternal(job)方法首先会向hadoop分布系统文件系统(HDFS)依次上传三个文件:job.jar,job.split和job.xml。job.jar里面包含了执行此任务需要的各种类,比如Mapper,Reducer等实现;job.split是文件分块的相关信息,比如有数据分多少个块,块的大小(默认64M)等。job.xml是有关的作业配置,例如Mapper,Combiner,Reducer的类型,输入输出格式的类型等。

2).JobTacker调度作业

JobTracker接到JobClient提交的作业后,即在JobTracker.submitJob(job)方法中,首先产生一个JobInProgress对象。此对象代表一道作业,它的作用是维护这道作业的所有信息,包括作业相关信息JobProfile和最近作业状态JobStatus,并将作业所有规划的Task登记到任务列表中。随后JobTracker将此JobInProgress对象通过listener.jobAdded(job)方法加入到调度队列中,并用一个成员变量jobs来维护所有的作业。然后等到有TaskTracker空闲,使用JobTracker.AssignTask(tasktracker)来请求任务,如果调度队列不空,程序便通过调度算法取出一个task交给来请求的TaskTracker去执行。至此,整个任务分配过程基本完成,各个类的相互关系和依赖性如图5.5所示:





分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics