hadoop概念-MapReduce数据流

云计算 waitig 866℃ 百度已收录 0评论

数据流

术语Job

一个完整的MapReduce作业称作job,它包括三部分:
– 输入数据
– MapReduce程序
– 配置信息

Hadoop工作时会将job分成若干个task:map任务和reduce任务。

The tasks are scheduled using YARN and run on nodes in the cluster. If a task fails, it will be automatically rescheduled to run on a different node.

使用YARN来做任务的调度,这些任务在集群的节点上执行。如果一个任务失败,将自动在不同的节点上重新执行。

这里写图片描述

分片(input split)到Map

Hadoop将MapReduce的输入数据划分为等长的小数据块,称为输入分片(input split)或简称“分片”。Hadoop为每个分片构建一个map任务,并由该任务来运行用户自定义的map函数来处理分片中的每条记录,换句话说:一个分片一个map任务,分片中的一个记录就需要调用一次map函数

拥有许多分片,意味着处理每个分片所需要的时间少于处理整个输入数据所花的时间。因此,如果我们并行处理每个分片,且每个分片数据比较小,那么整个处理过程将获得更好的负载平衡,因为一台较快的计算机能够处理的数据分片比一台较慢的计算机更多,且成一定的比例。即使使用相同的机器,失败的进程或其他同时运行的作业能够实现满意的负载平衡,并且如果分片被切分得更细,负载平衡的会更高。

另一方面,如果分片切分得太小,那么管理分片的总时间和构建map任务的总时间将决定作业的整个执行时间。对于大多数作业来说,一个合理的分片大小趋向于HDFS的一个块的大小,默认是64MB,不过可以针对集群调整这个默认值(对新建的所有文件),或对新建的每个文件具体指定。

Hadoop在存储有输入数据(HDFS 中的数据)的节点上运行map任务,可以获得最佳性能。这就是所谓的“数据本地化优化”(data locality optimization),因为它无需使用宝贵的集群带宽资源。但是,有时对于一个map任务的输入来说,存储有某个HDFS数据块备份的三个节点可能正在运行其他map任务,此时作业调度需要在三个备份中的某个数据寻求同个机架中空闲的机器来运行该map任务。仅仅在非常偶然的情况下(该情况基本上不会发生),会使用其他机架中的机器运行该map任务,这将导致机架与机架之间的网络传输。

现在我们应该清楚为什么最佳分片的大小应该与块大小相同:因为它是确保可以存储在单个节点上的最大输入块的大小。如果分片跨越两个数据块,那么对于任何一个HDFS节点,基本上都不可能同时存储这两个数据块,因此分片中的部分数据需要通过网络传输到map任务节点。

map的任务输出是写入到本地磁盘而非HDFS的。那么为什么呢?因为map任务输出的是中间结果,一旦map任务完成即会被删除,如果把它存入HDFS中并实现备份容错,未免有点大题小做。如果一个map任务失败,hadoop会在另一个节点重启一个map任务。

流入Reduce

而reduce任务并不具备数据本地化优势——单个reduce任务的输入通常来自所有mapper输出。一般排序过的map输出需要通过网络传输发送到运行reduce任务的节点,并在reduce端进行合并。reduce的输出通常需要存储到HDFS中以实现可靠存储。每个reduce任务将输出数据的第一个复本存储在本地节点,而其它复本则存储到其它节点,因此reduce输出也需要占用网络带宽。

单个Reduce任务的数据流图如下:

这里写图片描述

reduce任务的数量并非由输入数据大小决定,而是特别指定。如有多个reduce任务,则每个map任务都会对其输出进行分区(partition),每个分区对应一个Reduce任务。相同键的记录都会被partition到同一个分区中(但一个分区有多种key)。具体的分区方式可以由用户定义的分区函数来控制,但是默认的partitioner(使用hash函数分桶)工作得很好。

我们把map任务和reduce任务之间的数据流称为shuffle,因为每个reduce任务的输入都来自多个map任务,因此,这个阶段比较复杂,而shuffle过程中的参数调整对job运行的总时间是有非常大的影响,一般MapReduce的调优主要就是调整shuffle阶段的参数

这里写图片描述

Combiner Functions

群上的可用带宽限制了MapReduce的作业数量,因为map的中间结果传递给reduce是要经过网络传输的,因此最重要的一点就是尽量减少map和reduce任务间传输的数据量。不过,Hadoop允许用户针对map任务的输出指定一个合并函数(combiner),用合并函数的输出作为reduce函数的输入,但需要注意,合并函数的运用不应该改变reduce函数的计算结果。

例如有两个map的输出分别是map1={0,20,10};map2={15,25},求最大值,我们可以对先每个map的数据的数据进行合并,合并完成之后再传输给reducer:

 map1={0,20,10}->combiner->{20};
 map2={15,25}->combiner->{25};
 reducer->{25}
 即 max(0,20,10,15,25)=max(max(0,20,10),max(15,25))=25

但需要特别注意的是,并不是任何场景都是可以用combiner的,比如把上面的例子改成求平均值:

combiner后的reducer的结果:       avg(avg(0,20,10),avg(15,25))=avg(10,20)=15;
没有进行combiner的reducer结果:  avg(0,20,10,15,25)=14;

在构建Job时指定combiner函数:

job.setCombinerClass(***.class);

通常Combiner和Reducer是一致的。


本文由【waitig】发表在等英博客
本文固定链接:hadoop概念-MapReduce数据流
欢迎关注本站官方公众号,每日都有干货分享!
等英博客官方公众号
点赞 (0)分享 (0)