在这里插入图片描述

【作者主页】Francek Chen
【专栏介绍】 ⌈ ⌈ 大数据技术原理与应用 ⌋ ⌋ 专栏系统介绍大数据的相关知识,分为大数据基础篇、大数据存储与管理篇、大数据处理与分析篇、大数据应用篇。内容包含大数据概述、大数据处理架构Hadoop、分布式文件系统HDFS、分布式数据库HBase、NoSQL数据库、云数据库、MapReduce、Hadoop再探讨、数据仓库Hive、Spark、流计算、Flink、图计算、数据可视化,以及大数据在互联网领域、生物医学领域的应用和大数据的其他应用。
【GitCode】专栏资源保存在我的GitCode仓库:https://gitcode.com/Morse_Chen/BigData_principle_application


理解 MapReduce 的工作流程,是开展 MapReduce 编程的前提。本节首先给出工作流程概述,并阐述 MapReduce 的各个执行阶段,最后对 MapReduce 的核心环节—Shuffle 过程进行剖析。

一、工作流程概述

大规模数据集的处理包括分布式存储和分布式计算两个核心环节。谷歌用分布式文件系统 GFS 实现分布式数据存储,用 MapReduce 实现分布式计算;而 Hadoop 使用分布式文件系统 HDFS 实现分布式数据存储,用 Hadoop MapReduce 实现分布式计算。MapReduce 的输入和输出都需要借助于分布式文件系统进行存储,这些文件被分布存储到集群中的多个节点上。

MapReduce 的核心思想可以用“分而治之”来描述,如图1所示,也就是把一个大的数据集拆分成多个小数据集在多台机器上并行处理。也就是说,一个大的 MapReduce 作业,首先会被拆分成许多个 Map 任务在多台机器上并行执行,每个 Map 任务通常运行在数据存储的节点上。这样计算和数据就可以放在一起运行,不需要额外的数据传输开销。当 Map 任务结束后,会生成以<key,value>形式的许多中间结果。然后,这些中间结果会被分发到多个 Reduce 任务在多台机器上并行执行,具有相同 key 的<key,value>会被发送到同一个 Reduce 任务,Reduce 任务会对中间结果进行汇总计算得到最后结果,并输出到分布式文件系统。

在这里插入图片描述

图1 MapReduce工作流程

需要指出的是,不同的 Map 任务之间不会进行通信,不同的 Reduce 任务之间也不会发生任何信息交换;用户不能显式地从一台机器向另一台机器发送消息,所有的数据交换都是通过 MapReduce 框架自身去实现的。

在 MapReduce 的整个执行过程中,Map 任务的输入文件、Reduce 任务的处理结果都是保存在分布式文件系统中的,而 Map 任务处理得到的中间结果保存在本地存储中(如磁盘)。另外,只有当 Map 处理全部结束后,Reduce 过程才能开始;只有 Map 才需要考虑数据局部性,实现“计算向数据靠拢”,Reduce 则无须考虑数据局部性。

二、MapReduce 的各个执行阶段

下面是一个 MapReduce 算法的执行过程。

(1)MapReduce 框架使用 InputFormat 模块做 Map 前的预处理,比如验证输入的格式是否符合输入定义;然后,将输入文件切分为逻辑上的多个 InputSplit。InputSplit 是 MapReduce 对文件进行处理和运算的输入单位,只是一个逻辑概念,每个 InputSplit 并没有对文件进行实际切分,只是记录了要处理的数据的位置和长度。

(2)因为 InputSplit 是逻辑切分而非物理切分,所以还需要通过 RecordReader(RR)根据 InputSplit 中的信息来处理 InputSplit 中的具体记录,加载数据并将其转换为适合 Map 任务读取的键值对,输入给 Map 任务。

(3)Map 任务会根据用户自定义的映射规则,输出一系列的<key,value>作为中间结果。

(4)为了让 Reduce 可以并行处理 Map 的结果,需要对 Map 的输出进行一定的分区(Partition)、排序(Sort)、合并(Combine)、归并(Merge)等操作,得到<key,value-list>形式的中间结果,再交给对应的 Reduce 来处理,这个过程称为 Shuffle。从无序的<key,value>到有序的<key,value-list>,这个过程用 Shuffle 来称呼是非常形象的。

(5)Reduce 以一系列<key,value-list>中间结果作为输入,执行用户定义的逻辑,输出结果交给 OutputFormat 模块。

(6)OutputFormat 模块会验证输出目录是否已经存在,以及输出结果类型是否符合配置文件中的配置类型,如果都满足,就输出 Reduce 的结果到分布式文件系统。

MapReduce 工作流程中的各个执行阶段,具体如图2所示。

在这里插入图片描述

图2 MapReduce工作流程中的各个执行阶段

三、Shuffle 过程详解

Shuffle 过程是 MapReduce 整个工作流程的核心环节,理解 Shuffle 过程的基本原理,对于理解 MapReduce 流程至关重要。

(一)Shuffle 过程简介

Shuffle 是指对 Map 任务输出结果进行分区、排序、合并、归并等处理并交给 Reduce 的过程。因此,Shuffle 过程分为 Map 端的操作和 Reduce 端的操作,如图3所示,主要执行以下操作。

在这里插入图片描述

图3 Shuffle过程

(1)在 Map 端的 Shuffle 过程。

Map 任务的输出结果首先被写入缓存,当缓存满时,就启动溢写操作,把缓存中的数据写入磁盘文件,并清空缓存。当启动溢写操作时,首先需要对缓存中的数据进行分区,然后对每个分区的数据进行排序和合并,再写入磁盘文件。每次溢写操作会生成一个新的磁盘文件,随着 Map 任务的执行,磁盘中就会生成多个溢写文件。在 Map 任务全部结束之前,这些溢写文件会被归并成一个大的磁盘文件,然后通知相应的 Reduce 任务来“领取”属于自己处理的数据。

(2)在 Reduce 端的 Shuffle 过程。

Reduce 任务从 Map 端的不同 Map 机器“领取”属于自己处理的那部分数据,然后对数据进行归并后交给 Reduce 处理。

(二)Map 端的 Shuffle 过程

Map 端的 Shuffle 过程包括 4 个步骤,如图4所示。

在这里插入图片描述

图4 Map端的Shuffle过程

(1)输入数据和执行 Map 任务。

Map 任务的输入数据一般保存在分布式文件系统(如 GFS 或 HDFS)的文件块中,这些文件块的格式是任意的,可以是文档格式,也可以是二进制格式。Map 任务接收<key, value>作为输入后,按一定的映射规则将其转换成多个<key, value>输出。

(2)写入缓存。

每个 Map 任务都会被分配一个缓存,Map 任务的输出结果不是立即写入磁盘,而是首先写入缓存。在缓存中积累一定数量的 Map 任务输出结果以后,再一次性批量写入磁盘,这样可以大大减少对磁盘 I/O 的影响。因为磁盘包含机械部件,它是通过磁头移动和盘片的转动来寻址定位数据的,每次寻址的开销很大,如果每个 Map 任务输出结果都直接写入磁盘,会引入很多次寻址开销,而一次性批量写入,就只需要一次寻址,连续写入,大大降低了开销。需要注意的是,在写入缓存之前,key 与 value 都会被序列化成字节数组。

(3)溢写(分区、排序和合并)。

提供给 MapReduce 的缓存的容量是有限的,默认大小是 100 MB。随着 Map 任务的执行,缓存中 Map 任务结果的数量会不断增加,很快占满整个缓存。这时,就必须启动溢写(Spill)操作,把缓存中的内容一次性写入磁盘,并清空缓存。溢写的过程通常是由另外一个单独的后台线程来完成的,不会影响 Map 结果往缓存写入。但是为了保证 Map 结果能够持续写入缓存,不受溢写过程的影响,就必须让缓存中一直有可用的空间,不能等到全部占满才启动溢写过程,所以一般会设置一个溢写比例,如 0.8。也就是说,当 100 MB 大小的缓存被填入 80 MB 数据时,就启动溢写过程,把已经写入的 80 MB 数据写入磁盘,剩余 20 MB 空间供 Map 结果继续写入。

但是,在溢写到磁盘之前,缓存中的数据首先会被分区。缓存中的数据是<key, value>形式的键值对,这些键值对最终需要交给不同的 Reduce 任务进行并行处理。MapReduce 通过 Partitioner 接口对这些键值对进行分区,默认的分区方式是先采用 Hash 函数对 key 进行哈希,再对 Reduce 任务的数量进行取模,可以表示成 hash(key) mod R,其中 R 表示 Reduce 任务的数量。这样,就可以把 Map 任务输出结果均匀地分配给这 R 个 Reduce 任务去并行处理了。当然,MapReduce 也允许用户通过重载 Partitioner 接口来自定义分区方式。

对于每个分区内的所有键值对,后台线程会根据 key 对它们进行内存排序,排序是 MapReduce 的默认操作。排序结束后,还有一个可选的合并操作。如果用户事先没有定义 Combiner 函数,就不用进行合并操作。如果用户事先定义了 Combiner 函数,则这个时候会执行合并操作,从而减少需要溢写到磁盘的数据量。

“合并”是指将那些具有相同 key 的<key,value>的 value 加起来。比如有两个键值对<“xmu”,1>和<“xmu”,1>,经过合并操作以后就可以得到一个键值对<“xmu”,2>,减少键值对的数量。这里需要注意,Map 端的这种合并操作,其实和 Reduce 的功能相似,但是由于这个操作发生在 Map 端,所以我们只能称之为“合并”,从而有别于 Reduce。不过,并非所有场合都可以使用 Combiner,因为 Combiner 的输出是 Reduce 任务的输入,Combiner 绝不能改变 Reduce 任务最终的计算结果。一般而言,累加、最大值等场景可以使用合并操作。

经过分区、排序以及可能发生的合并操作之后,这些缓存中的键值对可以被写入磁盘,并清空缓存。每次溢写操作都会在磁盘中生成一个新的溢写文件,写入溢写文件中的所有键值对都是
经过分区和排序的。

(4)文件归并。

每次溢写操作都会在磁盘中生成一个新的溢写文件,随着 MapReduce 任务的进行,磁盘中的溢写文件数量会越来越多。当然,如果 Map 任务输出结果很少,磁盘上只会存在一个溢写文件,但是通常都会存在多个溢写文件。最终,在 Map 任务全部结束之前,系统会对所有溢写文件中的数据进行归并,生成一个大的溢写文件,这个大的溢写文件中的所有键值对也是经过分区和排序的。

“归并”是指具有相同 key 的键值对会被归并成一个新的键值对。具体而言,若干个具有相同 key 的键值对< k 1 k_1 k1, v 1 v_1 v1>,< k 1 k_1 k1, v 2 v_2 v2>,…,< k 1 k_1 k1, v n v_n vn>会被归并成一个新的键值对< k 1 k_1 k1,< v 1 v_1 v1, v 2 v_2 v2,…, v n v_n vn>>。

另外,进行文件归并时,如果磁盘中已经生成的溢写文件的数量超过参数min.num.spills.for.combine 的值时(默认值是 3,用户可以修改),那么,就可以再次运行 Combiner,对数据进行合并操作,从而减少写入磁盘的数据量。但是,如果磁盘中只有一两个溢写文件,执行合并操作就会“得不偿失”,因为执行合并操作本身也需要代价,因此不需要运行 Combiner。

经过上述 4 个步骤以后,Map 端的 Shuffle 过程全部完成,最终生成一个会被存放在本地磁盘上的大文件。这个大文件中的数据是被分区的,不同的分区会被发送到不同的 Reduce 任务进行并行处理。JobTracker 会一直监测 Map 任务的执行,当监测到一个 Map 任务完成后,会立即通知相关的 Reduce 任务来“领取”数据,然后开始 Reduce 端的 Shuffle 过程。

(三)Reduce 端的 Shuffle 过程

相对于 Map 端而言,Reduce 端的 Shuffle 过程非常简单,只需要从 Map 端读取 Map 任务结果,然后执行归并操作,最后输送给 Reduce 任务进行处理。具体而言,Reduce 端的 Shuffle 过程包括 3 个步骤,如图5所示。

在这里插入图片描述

图5 Reduce端的Shuffle过程

(1)“领取”数据。

Map 端的 Shuffle 过程结束后,所有 Map 任务输出结果都保存在 Map 机器的本地磁盘上,Reduce 任务需要把这些数据“领取”(Fetch)回来存放到自己所在机器的本地磁盘上。因此,在每个 Reduce 任务真正开始之前,它大部分时间都在从 Map 端“领取”属于自己处理的那些分区的数据。每个Reduce任务会不断地通过 RPC向 JobTracker询问Map任务是否已经完成;JobTracker 监测到一个 Map 任务完成后,就会通知相关的 Reduce 任务来“领取”数据;一旦一个 Reduce 任务收到 JobTracker 的通知,它就会到该 Map 任务所在机器上把属于自己处理的分区数据领取到本地磁盘中。

(2)归并数据。

从 Map 端“领取”的数据会被存放在 Reduce 任务所在机器的缓存中,如果缓存被占满,就会像 Map 端一样被溢写到磁盘中。由于在 Shuffle 阶段 Reduce 任务还没有真正开始执行,因此,这时可以把内存的大部分空间分配给 Shuffle 过程作为缓存。需要注意的是,系统中一般存在多个Map 机器,Reduce 任务会从多个 Map 机器“领取”属于自己处理的那些分区的数据,因此缓存中的数据是来自不同的 Map 机器的,一般会存在很多可以合并的键值对。当溢写过程启动时,具有相同 key 的键值对会被归并,如果用户定义了 Combiner,则归并后的数据还可以执行合并操作,减少写入磁盘的数据量。每个溢写过程结束后,都会在磁盘中生成一个溢写文件,因此磁盘上会存在多个溢写文件。最终,当所有的 Map 端数据都已经被“领取“时,和 Map 端类似,多个溢写文件会被归并成一个大文件,归并的时候还会对键值对进行排序,从而使得最终大文件中的键值对都是有序的。当然,在数据很少的情形下,缓存可以存储所有数据,就不需要把数据溢写到磁盘,而是直接在内存中执行归并操作,然后直接输出给 Reduce 任务。需要说明的是,把磁盘上的多个溢写文件归并成一个大文件可能需要执行多轮归并操作。每轮归并操作可以归并的文件数量是由参数 io.sort.factor 的值来控制的(默认值是 10,用户可以修改)。假设磁盘中生成了 50 个溢写文件,每轮可以归并 10 个溢写文件,则需要经过 5 轮归并,得到 5 个归并后的大文件。

(3)把数据输入给 Reduce 任务。

磁盘中经过多轮归并后得到的若干个大文件,不会继续归并成一个新的大文件,而是直接输入 Reduce 任务,这样可以减少磁盘读写开销。由此,整个 Shuffle 过程顺利结束。接下来,Reduce 任务会执行 Reduce 函数中定义的各种映射,输出最终结果,并将其保存到分布式文件系统中(比如 GFS 或 HDFS)。

小结

MapReduce 采用“分而治之”思想,将大规模数据集拆分为多个小数据块并行处理。其工作流程包括:InputFormat 逻辑切分输入文件,RecordReader 转换为键值对,Map 任务执行自定义映射输出中间结果,Shuffle 过程对结果进行分区、排序、合并与归并,Reduce 任务汇总计算,最后 OutputFormat 输出到分布式文件系统。

Shuffle 是核心环节,分为 Map 端和 Reduce 端。Map 端:输出先写入缓存,溢写时进行分区、排序和可选合并,最终归并成一个大的分区文件。Reduce 端:从各 Map 节点领取属于自己的分区数据,经过归并排序后输入 Reduce 函数。整个过程中,Map 任务注重数据局部性, Reduce 任务无需考虑;中间结果存本地磁盘,最终结果存分布式文件系统。

欢迎 点赞👍 | 收藏⭐ | 评论✍ | 关注🤗

在这里插入图片描述

Logo

葡萄城是专业的软件开发技术和低代码平台提供商,聚焦软件开发技术,以“赋能开发者”为使命,致力于通过表格控件、低代码和BI等各类软件开发工具和服务

更多推荐