这几天看了一些关于Hadoop MapReduce的Shuffling过程的资料,弄明白这个过程,总结一下。介于自己之前没有接触过hadoop和分布式计算这个领域,理解或许有些误,欢迎拍砖。

先看一下map和reduce函数的输入和输出格式:

map: <k1, v1> => <k2, v2>
reduce: <k1, [v1, v2, ...]> => <k2, v3>

map的输出和reduce的输入数据格式是不同的,一个是key-value, 另外一个是key-list。

因此map和reduce函数中间需要有一个过程做这种转换,这个过程在hadoop中就叫Shuffle,也是“magic”发生的地方。

Shuffle包含多个步骤,在mapper和reducer端都有涉及。 看下面的shuffling过程图:

e87713375b9e0d30ecfbb87f4f3372a3.png

Mapper端

Mapper端最后输出结果:磁盘上的一个sorted partitioned data。

map函数接收hadoop划分好的splited data, 输出一个个key-value对后,会有下面3个步骤:

  1. partition
  2. spill, sort
  3. merge

最后在磁盘输出一个sorted partitioned data文件。

对于map得到的每个key-value对,很自然地想到,这个key-value都交给哪个reducer处理。这项工作是由Partitioner接口负责的,默认是hash(key)%R哈希函数分散到各个reducer去。

经过partitioner处理后,每个key-value对都得到分配到的reduecer信息,分配相同的reducer属于相同的partition,然后把记录先写入内存(In-memory buffer)。内存分配图如下:
b1759dca164df38d73e4c38e17d6f73d.png

内存会划分一个个partition,每个partition都交个独自一个reduecer处理(总的partition数目=reducer数量),

当buffer达到一定阀值(默认80M),就开始执行spill步骤,即分成小文件写入磁盘。在写之前,先对memory中每个partition进行排序(in-memory sort)。这个步骤会产生很多个spilled文件,如果处理的数据量大的话。

最后一个步骤就是sort,准确来说是merge,把spill步骤产生的所有spilled files merge成一个大的已排序文件。merge是相同的partition之间进行,因为每个partition的数据已经是有序的,所有类似归并排序中的merge,merge后的数据格式就是

写入磁盘前有可能还会进行combine操作(如果developer设置了combine操作),combine操作作用相当于reduce,把mapper端的key,[value, …]先处理,变成<key, [value1]>(如<a, [1, 1, 1]> => <a, 3>), 这样能有效减少最后写入磁盘文件的大小,网络需要传输的大小更少,因此更快。

Reducer端

  1. fetch
  2. sort(merge)
  3. reduce

所有mapper处理完后,reducer收到jobtracker发来的信息。

第一步fetch,从各个mapper处拉数据,reducer会开启一些cpy线程从不同的mapper端拉取数据,通过http协议。拉取的数据先放在内存,如果超过一定阀值,就保存在磁盘上,和mapper端类似。

第二步merge,开始对数据开始merge,(为什么要merge?合并不同的mapper端拉来的partitions成为一个大的有序partition),最后的数据格式就是, 即reduce函数需要的格式。

merge完毕以后,开始执行reduce函数。

举个例子可能更容易理解:

以word count为例:
假设各有两个map和reduce机器,输入文件被分为两个splits,看下面两个图:

c9c657f294cc31b7eb5c7d51796f8c8b.png
c0603ed2bede9d207b3857ee90970e96.png

map1端(位于图上面)中的map输出一个个<key, value>对:<a, 1>, <a,1>, <b,1>, …,后,每个对会经过Partitioner处理,Partitioner类根据key划分到不同的partition区域(partition是分配给reducer处理的数据段),如a,b分到partition1, c,d分配到partition2, 处理完所有key-value对后,对每个partition内部进行排序,写入磁盘,成一个大的partitioned,sorted文件,等待reducer来拉取处理。

当所有map机器完成所有操作后,reduer1机器收到通知,通过http拉取各个partition1的数据,然后进行merge,最后得到key,[value]的数据格式。

全文说的都是总的大过程步骤,用来初学者理解shuffle过程的,里面很多细节还没有深入。

Reference

  1. https://www.inkling.com/read/hadoop-definitive-guide-tom-white-3rd/chapter-6/shuffle-and-sort
  2. map execution
  3. reduce execution
  4. http://www.eurecom.fr/~michiard/teaching/slides/clouds/tutorial-mapreduce.pdf
  5. http://hadoop.apache.org/docs/stable/mapred_tutorial.html#Counters
  6. http://langyu.iteye.com/blog/992916?page=3#comments