论文精读: MapReduce

论文精读 / 2022-10-19

MapReduce: Simplified Data Processing on Large Cluster

论文下载链接:http://nil.csail.mit.edu/6.824/2022/papers/mapreduce.pdf

中文翻译版本: https://pan.baidu.com/s/10my10q9Aq7mrcLrjA2wCiA?pwd=2j44 提取码: 2j44

Abstract

MapReduce是一种用于处理与生成大数据集的编程模型和实现。

MapReduce中分为map functionreduce function。map方法处理kv对,并生成中间kv对(intermediate),reduce方法将中间kv对根据相同的key合并起来。

程序自动化并行地运行在集群上,提供的run-time system会负责:

  1. 对输入数据进行分区
  2. 在集群上调度程序执行
  3. 处理机器故障
  4. 管理机器间通信

Introduction

主要贡献:一个能够实现大规模计算的自动并行和分布的接口,并结合该接口的实现,在大型商用计算集群上实现高性能计算。

Programming Model

用户使用MapReduce库时,需要编写Map和Reduce方法。

  • Map方法:接受一个输入对并产生一组中间kv对。MapReduce库将所有与相同中间key I相关的中间值组合在一起,并将它们传递给Reduce函数。

  • Reduce方法:接受一个中间key I和该key对应的一组value。然后,将这些值合并在一起,形成一个可能更小的value集。

    通常每个reduce方法只有0个或1个输出

Example

以WordCout为例:

map(String Key, String value):
	// key: document name
	// value: document contents
	for each word w in value:
		EmitIntermediate(w, "1");

reduce(String key, Iterator values):
	// key: a word
	// values: a list of counts
	int result = 0;
	for each v in values:
		result += ParseInt(v);
	Emit(AsString(result));

原文还举了一些其他可以使用MapReduce架构的例子:Distributed Grep、Count of URL Access Frequency …

Implementation

MapReduce可以实现在不同的机器上:

  • 小型共享内存机器
  • 大型NUMA多处理器
  • 大规模网络机

本文介绍的是Google使用的大型商用PC集群(以太网相连)

Execution Overview

image-20221019184202939
  1. 用户程序中的MapReduce库首先将输入文件分成M个片段,每个片段通常为16mb64mb(用户可通过可选参数控制)。然后在一个机器集群上启动该程序的多个副本。

  2. 其中一个副本程序是特别的 - master。其余的是都是worker(通过master来分配任务)。Master会把M个map任务和R个Reduce任务分配给空闲的worker

  3. 分配到map任务的worker会读取相应输入(分割后的内容)。然后会解析输入的kv对,并传递给自定义的map函数。`map函数输出的中间kv对被缓存在内存中。

  4. 缓存的中间kv对会定时地partitioning 方法划分为R个分区,然后存储在本地磁盘(GFS)。中间kv对的位置存储信息会被传回给master(后续master会把这些位置信息告诉reduce worker

  5. reduce workermaster通知这些位置时,它使用RPC从map worker的本地磁盘读取缓冲kv对。当reduce worker读取了所有数据后,根据key对数据进行排序,确保相同key的kv对都被分组在一起。

    因为通常许多不同的key会被映射到同一个reduce任务,所以需要根据key进行排序。

    如果数据量太大,无法装入内存,则会使用外部排序。

  6. reduce worker 遍历排好序的中间数据,然后将 key 和其对应的中间 value 集合传递给用户定义的 Reduce函数Reduce函数的输出会写到一个对应输出文件当中。

  7. 当所有的 MapReduce 任务都完成之后,master 唤醒用户程序。在这个时候,在用户程序里的对 MapReduce 调用才返回。

当所有任务完成后,mapreduce 的输出会放在 R 个输出文件当中(每个 reduce 任务对应一个文件)。

通常不需要将输出文件进行合并

输出文件会用作下一个MR程序的输入,或者其他分布式应用。

Master Data Structure

master 会存储以下信息:

  1. 每个task的工作状态 — 空闲 idle 、 工作中 in-progress 、完成 completed
  2. 每个(非空闲)worker的身份 — map、reduce
  3. 中间文件的存储地址与大小(随着map task实时更新)

Fault Tolerance

  • Worker Failure

    master 会定期ping每个worker,如果在一定时间内没有收到响应,则master将该worker标记为failed。由该worker完成的任何map任务都被重置为idle状态,并其他worker上重新调度。同样,正在进行中的mapreduce任务也会被重置为idle`,并重新调度。

    对于已完成的map任务,需要重新执行,因为它们的output存储在宕机机器的本地磁盘上(其他人无法访问)。对于已完成的reduce任务不需要重新执行,因为它们的输出存储在全局文件系统(global file system)中。

    并且 master 还会通知所有reduce worker,若 reduce worker 之前是从故障机器中读取数据的,则现在开始从新的机器读取数据。

  • Master Failure

    可以通过对master做定期检查点,如果挂了,就从最新的检查点重新创建一个master

    本文实现中只有一个master,因此不太容易挂。如果挂了就重启MR程序。

  • Semantics in the Presence of Failures

    当用户提供的 MapReduce 操作是输入确定性函数(即相同的输入产生相同的输出)时,分布式实现顺序执行程序产生的输出是一样的(依靠原子性提交 atomic commit 保证)

    当用户提供的 MapReduce 操作是输入不确定性函数,啥啥啥啥?(暂时没理解)

  • Locality

    为了解决网络带宽的问题,master会优先在有数据副本的机器上进行调度,如果失败了,则会在副本附近机器上进行调度(例如在同一交换机上)。这样做,可以保证在大规模MR操作中,大部分数据都是本地读取的。

  • Task Granularity

    M的设置策略:使得每个独立task达到输入文件的16MB到64MB(优化存储)

    R的设置策略:通常为想要输出的文件数量(参考值为 比worker的数量稍大几倍)

    M = 20000, R = 5000, worker = 2000

  • Backup Tasks

    有时候会有“掉队者 Straggler”拖慢 MapReduce 的执行速度,比如机器的硬盘出现问题,读取速度变慢了,或者是对 CPU 等资源的竞争等导致的。

    这里用一个通用的机制来缓解这种问题。当一个 MapReduce 操作快完成时,master 会安排备用任务 backup task 来执行剩下的 in-progress(一个状态,表明正在执行) 任务,无论是备用任务还是主任务先完成操作,这个任务都会被标记为已完成。通过这种方式来增加快完成的任务的计算资源。实践也证明了这种策略的可行性。

后面都是一些优化以及性能测试,有机会再继续读。