• 为了保证你在浏览本网站时有着更好的体验,建议使用类似Chrome、Firefox之类的浏览器~~
    • 如果你喜欢本站的内容何不Ctrl+D收藏一下呢,与大家一起分享各种编程知识~
    • 本网站研究机器学习、计算机视觉、模式识别~当然不局限于此,生命在于折腾,何不年轻时多折腾一下

mapreduce之shuffle操作

bigdata admin 2年前 (2017-09-22) 827次浏览 0个评论 扫描二维码

Shuffle 过程是 MapReduce 的核心,也被称为奇迹发生的地方。要想理解 MapReduce, Shuffle 是必须要了解的。我看过很多相关的资料,但每次看完都云里雾里的绕着,很难理清大致的逻辑,反而越搅越混。前段时间在做 MapReduce job 性能调优的工作,需要深入代码研究 MapReduce 的运行机制,这才对 Shuffle 探了个究竟。考虑到之前我在看相关资料而看不懂时很恼火,所以在这里我尽最大的可能试着把 Shuffle 说清楚,让每一位想了解它原理的朋友都能有所收获。如果你对这篇文章有任何疑问或建议请留言到后面,谢谢! Shuffle 的正常意思是洗牌或弄乱,可能大家更熟悉的是 Java API 里的 Collections.shuffle(List)方法,它会随机地打乱参数 list 里的元素顺序。如果你不知道 MapReduce 里 Shuffle 是什么,那么请看这张图:

这张是官方对 Shuffle 过程的描述。但我可以肯定的是,单从这张图你基本不可能明白 Shuffle 的过程,因为它与事实相差挺多,细节也是错乱的。后面我会具体描述 Shuffle 的事实情况,所以这里你只要清楚 Shuffle 的大致范围就成-怎样把 map task 的输出结果有效地传送到 reduce 端。也可以这样理解, Shuffle 描述着数据从 map task 输出到 reduce task 输入的这段过程。
在 Hadoop 这样的集群环境中,大部分 map task 与 reduce task 的执行是在不同的节点上。当然很多情况下 Reduce 执行时需要跨节点去拉取其它节点上的 map task 结果。如果集群正在运行的 job 有很多,那么 task 的正常执行对集群内部的网络资源消耗会很严重。这种网络消耗是正常的,我们不能限制,能做的就是最大化地减少不必要的消耗。还有在节点内,相比于内存,磁盘 IO 对 job 完成时间的影响也是可观的。从最基本的要求来说,我们对 Shuffle 过程的期望可以有:
完整地从 map task 端拉取数据到 reduce 端。
在跨节点拉取数据时,尽可能地减少对带宽的不必要消耗。
减少磁盘 IO 对 task 执行的影响。
OK,看到这里时,大家可以先停下来想想,如果是自己来设计这段 Shuffle 过程,那么你的设计目标是什么。我想能优化的地方主要在于减少拉取数据的量及尽量使用内存而不是磁盘。

我的分析是基于 Hadoop0.21.0 的源码,如果与你所认识的 Shuffle 过程有差别,不吝指出。我会以 WordCount 为例,并假设它有 8 个 map task 和 3 个 reduce task。从上图看出,Shuffle 过程横跨 map 与 reduce 两端,所以下面我也会分两部分来展开。

先看看 map 端的情况,如下图:

上图可能是某个 map task 的运行情况。拿它与官方图的左半边比较,会发现很多不一致。官方图没有清楚地说明 partition, sort 与 combiner 到底作用在哪个阶段。我画了这张图,希望让大家清晰地了解从 map 数据输入到 map 端所有数据准备好的全过程。

整个流程我分了四步。简单些可以这样说,每个 map task 都有一个内存缓冲区,存储着 map 的输出结果,当缓冲区快满的时候需要将缓冲区的数据以一个临时文件的方式存放到磁盘,当整个 map task 结束后再对磁盘中这个 map task 产生的所有临时文件做合并,生成最终的正式输出文件,然后等待 reduce task 来拉数据。

当然这里的每一步都可能包含着多个步骤与细节,下面我对细节来一一说明:
1. 在 map task 执行时,它的输入数据来源于 HDFS 的 block,当然在 MapReduce 概念中,map task 只读取 split。Split 与 block 的对应关系可能是多对一,默认是一对一。在 WordCount 例子里,假设 map 的输入数据都是像“aaa”这样的字符串。

  1. 在经过 mapper 的运行后,我们得知 mapper 的输出是这样一个 key/value 对: key 是“aaa”, value 是数值 1。因为当前 map 端只做加 1 的操作,在 reduce task 里才去合并结果集。前面我们知道这个 job 有 3 个 reduce task,到底当前的“aaa”应该交由哪个 reduce 去做呢,是需要现在决定的。
    MapReduce 提供 Partitioner 接口,它的作用就是根据 key 或 value 及 reduce 的数量来决定当前的这对输出数据最终应该交由哪个 reduce task 处理。默认对 key hash 后再以 reduce task 数量取模。默认的取模方式只是为了平均 reduce 的处理能力,如果用户自己对 Partitioner 有需求,可以订制并设置到 job 上。在我们的例子中,“aaa”经过 Partitioner 后返回 0,也就是这对值应当交由第一个 reducer 来处理。接下来,需要将数据写入内存缓冲区中,缓冲区的作用是批量收集 map 结果,减少磁盘 IO 的影响。我们的 key/value 对以及 Partition 的结果都会被写入缓冲区。当然写入之前,key 与 value 值都会被序列化成字节数组。整个内存缓冲区就是一个字节数组,它的字节索引及 key/value 存储结构我没有研究过。如果有朋友对它有研究,那么请大致描述下它的细节吧。
  2. 这个内存缓冲区是有大小限制的,默认是 100MB。当 map task 的输出结果很多时,就可能会撑爆内存,所以需要在一定条件下将缓冲区中的数据临时写入磁盘,然后重新利用这块缓冲区。这个从内存往磁盘写数据的过程被称为 Spill,中文可译为溢写,字面意思很直观。这个溢写是由单独线程来完成,不影响往缓冲区写 map 结果的线程。溢写线程启动时不应该阻止 map 的结果输出,所以整个缓冲区有个溢写的比例 spill.percent。这个比例默认是 0.8,也就是当缓冲区的数据已经达到阈值(buffer size * spill percent = 100MB * 0.8 = 80MB),溢写线程启动,锁定这 80MB 的内存,执行溢写过程。Map task 的输出结果还可以往剩下的 20MB 内存中写,互不影响。
    当溢写线程启动后,需要对这 80MB 空间内的 key 做排序(Sort)。排序是 MapReduce 模型默认的行为,这里的排序也是对序列化的字节做的排序。在这里我们可以想想,因为 map task 的输出是需要发送到不同的 reduce 端去,而内存缓冲区没有对将发送到相同 reduce 端的数据做合并,那么这种合并应该是体现是磁盘文件中的。从官方图上也可以看到写到磁盘中的溢写文件是对不同的 reduce 端的数值做过合并。所以溢写过程一个很重要的细节在于,如果有很多个 key/value 对需要发送到某个 reduce 端去,那么需要将这些 key/value 值拼接到一块,减少与 partition 相关的索引记录。在针对每个 reduce 端而合并数据时,有些数据可能像这样:“aaa”/1, “aaa”/1。对于 WordCount 例子,就是简单地统计单词出现的次数,如果在同一个 map task 的结果中有很多个像“aaa”一样出现多次的 key,我们就应该把它们的值合并到一块,这个过程叫 reduce 也叫 combine。但 MapReduce 的术语中,reduce 只指 reduce 端执行从多个 map task 取数据做计算的过程。除 reduce 外,非正式地合并数据只能算做 combine 了。其实大家知道的,MapReduce 中将 Combiner 等同于 Reducer。

    如果 client 设置过 Combiner,那么现在就是使用 Combiner 的时候了。将有相同 key 的 key/value 对的 value 加起来,减少溢写到磁盘的数据量。Combiner 会优化 MapReduce 的中间结果,所以它在整个模型中会多次使用。那哪些场景才能使用 Combiner 呢?从这里分析,Combiner 的输出是 Reducer 的输入,Combiner 绝不能改变最终的计算结果。所以从我的想法来看,Combiner 只应该用于那种 Reduce 的输入 key/value 与输出 key/value 类型完全一致,且不影响最终结果的场景。比如累加,最大值等。Combiner 的使用一定得慎重,如果用好,它对 job 执行效率有帮助,反之会影响 reduce 的最终结果。

  3. 每次溢写会在磁盘上生成一个溢写文件,如果 map 的输出结果真的很大,有多次这样的溢写发生,磁盘上相应的就会有多个溢写文件存在。当 map task 真正完成时,内存缓冲区中的数据也全部溢写到磁盘中形成一个溢写文件。最终磁盘中会至少有一个这样的溢写文件存在(如果 map 的输出结果很少,当 map 执行完成时,只会产生一个溢写文件),因为最终的文件只有一个,所以需要将这些溢写文件归并到一起,这个过程就叫做 Merge。Merge 是怎样的?如前面的例子,“aaa”从某个 map task 读取过来时值是 5,从另外一个 map 读取时值是 8,因为它们有相同的 key,所以得 merge 成 group。什么是 group。对于“aaa”就是像这样的:{“aaa”, [5, 8, 2, …]},数组中的值就是从不同溢写文件中读取出来的,然后再把这些值加起来。请注意,因为 merge 是将多个溢写文件合并到一个文件,所以可能也有相同的 key 存在,在这个过程中如果 client 设置过 Combiner,也会使用 Combiner 来合并相同的 key。
    至此,map 端的所有工作都已结束,最终生成的这个文件也存放在 TaskTracker 够得着的某个本地目录内。每个 reduce task 不断地通过 RPC 从 JobTracker 那里获取 map task 是否完成的信息,如果 reduce task 得到通知,获知某台 TaskTracker 上的 map task 执行完成,Shuffle 的后半段过程开始启动。简单地说,reduce task 在执行之前的工作就是不断地拉取当前 job 里每个 map task 的最终结果,然后对从不同地方拉取过来的数据不断地做 merge,也最终形成一个文件作为 reduce task 的输入文件。见下图:


如 map 端的细节图,Shuffle 在 reduce 端的过程也能用图上标明的三点来概括。当前 reduce copy 数据的前提是它要从 JobTracker 获得有哪些 map task 已执行结束,这段过程不表,有兴趣的朋友可以关注下。Reducer 真正运行之前,所有的时间都是在拉取数据,做 merge,且不断重复地在做。如前面的方式一样,下面我也分段地描述 reduce 端的 Shuffle 细节:
1. Copy 过程,简单地拉取数据。Reduce 进程启动一些数据 copy 线程(Fetcher),通过 HTTP 方式请求 map task 所在的 TaskTracker 获取 map task 的输出文件。因为 map task 早已结束,这些文件就归 TaskTracker 管理在本地磁盘中。

  1. Merge 阶段。这里的 merge 如 map 端的 merge 动作,只是数组中存放的是不同 map 端 copy 来的数值。Copy 过来的数据会先放入内存缓冲区中,这里的缓冲区大小要比 map 端的更为灵活,它基于 JVM 的 heap size 设置,因为 Shuffle 阶段 Reducer 不运行,所以应该把绝大部分的内存都给 Shuffle 用。这里需要强调的是,merge 有三种形式:1)内存到内存 2)内存到磁盘 3)磁盘到磁盘。默认情况下第一种形式不启用,让人比较困惑,是吧。当内存中的数据量到达一定阈值,就启动内存到磁盘的 merge。与 map 端类似,这也是溢写的过程,这个过程中如果你设置有 Combiner,也是会启用的,然后在磁盘中生成了众多的溢写文件。第二种 merge 方式一直在运行,直到没有 map 端的数据时才结束,然后启动第三种磁盘到磁盘的 merge 方式生成最终的那个文件。
  2. Reducer 的输入文件。不断地 merge 后,最后会生成一个“最终文件”。为什么加引号?因为这个文件可能存在于磁盘上,也可能存在于内存中。对我们来说,当然希望它存放于内存中,直接作为 Reducer 的输入,但默认情况下,这个文件是存放于磁盘中的。至于怎样才能让这个文件出现在内存中,之后的性能优化篇我再说。当 Reducer 的输入文件已定,整个 Shuffle 才最终结束。然后就是 Reducer 执行,把结果放到 HDFS 上。
    上面就是整个 Shuffle 的过程。细节很多,我很多都略过了,只试着把要点说明白。当然,我可能也有理解或表述上的很多问题,不吝指点。我希望不断地完善和修改这篇文章,能让它通俗、易懂,看完就能知道 Shuffle 的方方面面。至于具体的实现原理,各位有兴趣就自己去探索,如果不方便的话,留言给我,我再来研究并反馈。

转载自 http://blog.csdn.net/chenjieit619/article/details/53421202


Deeplearn, 版权所有丨如未注明 , 均为原创丨本网站采用BY-NC-SA协议进行授权 , 转载请注明mapreduce 之 shuffle 操作
喜欢 (0)
admin
关于作者:

您必须 登录 才能发表评论!