6.824 Lab1 MapReduce

计算机系统 / 2022-10-23

Github: https://github.com/Alex-Shen1121/MIT-6.824

Lab地址:http://nil.csail.mit.edu/6.824/2022/labs/lab-mr.html

介绍

在这个实验中,将构建一个MapReduce系统,并实现一个调用 MapReduce 函数并处理文件读写的 worker 进程,以及一个 coordinator 进程,它将任务分发给 worker 进程并处理挂掉的 worker 进程。

参考论文:MapReduce论文 (注意:本实验中使用的是“coordinator”而不是论文的“master”。)

快速开始

  1. 安装Go环境

    Go的环境安装方法,参考各个系统安装攻略

    mac可以使用homebrew进行安装 brew install go

  2. 下载源代码

    $ git clone git://g.csail.mit.edu/6.824-golabs-2022 6.824
    $ cd 6.824
    $ ls
    Makefile src
    $
    
  3. 快速测试

    mrapps/ 下的文件打包成build成插件,并运行 mrsequential.go

    $ cd ~/6.824
    $ cd src/main
    $ go build -race -buildmode=plugin ../mrapps/wc.go
    $ rm mr-out*
    $ go run -race mrsequential.go wc.so pg*.txt
    $ more mr-out-0
    A 509
    ABOUT 2
    ACT 8
    ...
    

    最后WordCount的结果写在了mr-out-0中。

Lab 任务

在开始代码编写之前,补充一些可能用到的数据结构(后续可能会修改)

  • 定义 Task 任务

    在 mr/worker.go 中定义Task数据结构。其中包括该任务所操作的文件、任务的类型(Map或Reduce)、任务唯一ID标识、任务时间戳。

    // Task info
    type Task struct {
    	Filename  string
    	TaskType  int
    	TaskID    int
    	TimeStamp int64 // in seconds
    }
    
    // Task type
    const (
    	Map    = 0
    	Reduce = 1
    	Wait   = 2
    	Completed = 3
    )
    
  • 定义 Coordinator

    利用map数据结构来存储Map与Reduce任务队列,并且定义一个mutex锁来保证队列修改时只有一个进程访问。

    type Coordinator struct {
    	// Your definitions here.
    	mutex sync.Mutex
    
    	mapTasksReady      map[int]Task
    	mapTasksInProgress map[int]Task
    
    	reduceTasksReady      map[int]Task
    	reduceTasksInProgress map[int]Task
    }
    
  • 定义 RPC

    worker 与 coordinator 通过RPC进行进程通讯

    // RPCArgs ...
    type RPCArgs struct {
    	TaskInfo Task
    }
    
    // RPCReply ...
    type RPCReply struct {
    	TaskInfo Task
    }
    

Worker

Worker整体架构

根据Lab hints第一条可知,我们需要首先建立Worker与coordinator之间的RPC通信

One way to get started is to modify mr/worker.go’s Worker() to send an RPC to the coordinator asking for a task. Then modify the coordinator to respond with the file name of an as-yet-unstarted map task. Then modify the worker to read that file and call the application Map function, as in mrsequential.go.

在worker.go中提供了CallExample方法,作为RPC调用的demo。根据该方法改写一下,建立RPC通信。同时在coordinator.go中,补充GiveTask方法(临时)。这样就可以成功调用了。

func Worker(mapf func(string, string) []KeyValue,
	reducef func(string, []string) string) {

	//// Your worker implementation here.
	// 不断向coordinator更新状态
	for {
		args := RPCArgs{}
		reply := RPCReply{}

		// 向coordinator索取任务
		ok := call("Coordinator.GiveTask", &args, &reply)

		if ok {
			fmt.Printf("reply.TaskInfo : %v\n", reply.TaskInfo)
		} else {
			fmt.Printf("RPC 调用失败")
		}
	}
}
func (c *Coordinator) GiveTask(args *RPCArgs, reply *RPCReply) error {
	reply.TaskInfo = args.TaskInfo
	return nil
}
image-20221022165812672

根据reply返回值,重新搭建Worker整体架构。根据reply中的task类型,分配不同的方法。暂时想到了四种可能的工作状态:

  • Map:执行doMap方法
  • Reduce:执行doReduce方法
  • Wait:表明当前没有Task需要执行,sleep 1秒后重新请求
  • Completed:表明所有Task都完成了,直接挂了就好了
func Worker(mapf func(string, string) []KeyValue,
	reducef func(string, []string) string) {

	//// Your worker implementation here.
	// 不断向coordinator更新状态
	for {
		args := RPCArgs{}
		reply := RPCReply{}

		// 向coordinator索取任务
		ok := call("Coordinator.GiveTask", &args, &reply)

		if ok {
			//fmt.Printf("reply.TaskInfo : %v\n", reply.TaskInfo)
			switch reply.TaskInfo.TaskType {
			// 执行Map任务
			case Map:
				doMap(&reply.TaskInfo, mapf)
			// 执行Reduce任务
			case Reduce:
				doReduce(&reply.TaskInfo, reducef)
			// 闲置
			case Wait:
				fmt.Printf("当前 Worker 空闲")
				time.Sleep(time.Second)
				continue
			// 所有Task完成
			case Completed:
				fmt.Printf("完成所有 Task 任务")
				break
			}
			// 告知coordinator任务完成
			args.TaskInfo = reply.TaskInfo
			call("Coordinator.TaskDone", &args, &reply)
		} else {
			fmt.Printf("RPC 调用失败")
		}
	}
}

doMap 方法

接下来一步就是完成doMap方法。在Map方法中,会将中间文件存储为mr-X-Y的格式,其中X为当前任务ID,即TaskID;Y为分配给的Reduce任务的ID。

A reasonable naming convention for intermediate files is mr-X-Y, where X is the Map task number, and Y is the reduce task number.

因此需要在Task中添加reduce任务数量

// Task info
type Task struct {
	Filename  string
	TaskType  int
	TaskID    int
	TimeStamp int64 // in seconds
	NReduce   int // reduce 任务数量
}

然后就是编写doMap方法,可以参照mrsequential.go中的写法

// mrsequential.go 中对于Map阶段的描述

	intermediate := []mr.KeyValue{}
	for _, filename := range os.Args[2:] {
		file, err := os.Open(filename)
		if err != nil {
			log.Fatalf("cannot open %v", filename)
		}
		content, err := ioutil.ReadAll(file)
		if err != nil {
			log.Fatalf("cannot read %v", filename)
		}
		file.Close()
		kva := mapf(filename, string(content))
		intermediate = append(intermediate, kva...)
	}

根据上面的方法,改下成以下代码

  • 首先读取文件,并生成中间临时变量intermediate。
    第一维代表reduce task的id,第二维保存中间变量。
  • 然后将kv对通过ihash方法映射到对应的reduce task中
  • 写入本地文件中(在实际MR程序中,应该写入GFS中,此处为了简化直接写入本地)
// Map方法
func doMap(task *Task, mapf func(string, string) []KeyValue) {
	fmt.Printf("Map worker get task %d-%s\n", task.TaskID, task.Filename)

	// 创建中间临时文件
	// intermediate[i][] 说明 传入第i个 reduce task
	// 第二维表示中间结果
	intermediate := make([][]KeyValue, task.NReduce)
	for i := 0; i < task.NReduce; i++ {
		intermediate[i] = make([]KeyValue, 0)
	}
	filename := task.Filename
	file, err := os.Open(filename)
	if err != nil {
		log.Fatalf("cannot open %v", filename)
	}
	content, err := ioutil.ReadAll(file)
	if err != nil {
		log.Fatalf("cannot read %v", filename)
	}
	file.Close()

	// 执行map方法并写入中间文件
	kva := mapf(filename, string(content))
	for _, kv := range kva {
		intermediate[ihash(kv.Key)%task.NReduce] =
			append(intermediate[ihash(kv.Key)%task.NReduce], kv)
	}

	// 将intermediate写入临时文件夹
	// 命名规则参考 hint 第6条
	for i := 0; i < task.NReduce; i++ {
		// 改分区没有value要写
		if len(intermediate[i]) == 0 {
			continue
		}
		outputFileName := fmt.Sprintf("mr-%d-%d", task.TaskID, i)
		outputFile, _ := ioutil.TempFile("./", "tmp_")
		//outputFile, err := os.OpenFile("./tmp/"+outputFileName, os.O_WRONLY|os.O_CREATE, 0666)
		if err != nil {
			fmt.Printf("文件创建失败")
		}
		enc := json.NewEncoder(outputFile)
		for _, kv := range intermediate[i] {
			err := enc.Encode(&kv)
			if err != nil {
				log.Fatalf("Json encode error: Key-%s, Value-%s", kv.Key, kv.Value)
			}
		}
		outputFile.Close()
		os.Rename(outputFile.Name(), outputFileName)
	}
}

doReduce 方法

同样,需要增加MMap属性,告知map task的数量

// Task info
type Task struct {
	Filename  string
	TaskType  int
	TaskID    int
	TimeStamp int64 // in seconds
	NReduce   int   // reduce 任务数量
	MMap      int   // map 任务数量
}

然后就是编写doReduce方法,可以参照mrsequential.go中的写法

	sort.Sort(ByKey(intermediate))

	oname := "mr-out-0"
	ofile, _ := os.Create(oname)

	//
	// call Reduce on each distinct key in intermediate[],
	// and print the result to mr-out-0.
	//
	i := 0
	for i < len(intermediate) {
		j := i + 1
		for j < len(intermediate) && intermediate[j].Key == intermediate[i].Key {
			j++
		}
		values := []string{}
		for k := i; k < j; k++ {
			values = append(values, intermediate[k].Value)
		}
		output := reducef(intermediate[i].Key, values)

		// this is the correct format for each line of Reduce output.
		fmt.Fprintf(ofile, "%v %v\n", intermediate[i].Key, output)

		i = j
	}

	ofile.Close()

根据上面的方法,改下成以下代码

  • 创建临时变量,然后读取临时文件
  • 根据key值进行排序
  • 写到output文件中
// Reduce方法
func doReduce(task *Task, reducef func(string, []string) string) {
	fmt.Printf("Reduce worker get task %d\n", task.TaskID)

	// 从tmp文件中读取中间文件
	intermediate := make([]KeyValue, 0)
	for i := 0; i < task.MMap; i++ {
		// 读取文件
		inputFileName := fmt.Sprintf("mr-%d-%d", i, task.TaskID)
		inputFile, err := os.OpenFile("./tmp/"+inputFileName, os.O_RDWR, 0666)
		if err != nil {
			fmt.Printf("打开文件失败")
		}
		dec := json.NewDecoder(inputFile)
		for {
			var kv KeyValue
			if err := dec.Decode(&kv); err != nil {
				break
			}
			intermediate = append(intermediate, kv)
		}

	}

	sort.Sort(ByKey(intermediate))

	oname := fmt.Sprintf("mr-out-%d", task.TaskID)
	ofile, _ := ioutil.TempFile("./", "tmp_")

	i := 0
	for i < len(intermediate) {
		j := i + 1
		for j < len(intermediate) && intermediate[j].Key == intermediate[i].Key {
			j++
		}
		values := []string{}
		for k := i; k < j; k++ {
			values = append(values, intermediate[k].Value)
		}
		output := reducef(intermediate[i].Key, values)

		// this is the correct format for each line of Reduce output.
		fmt.Fprintf(ofile, "%v %v\n", intermediate[i].Key, output)

		i = j
	}

	ofile.Close()
	os.Rename(ofile.Name(), oname)
}

Coordinator

初始化 Coordinator

第一步先完成 func MakeCoordinator(files []string, nReduce int) *Coordinator

补齐coordinator的初始化方法

每个输入的文件都作为一个单独的Map任务

func MakeCoordinator(files []string, nReduce int) *Coordinator {
	c := Coordinator{}

	// Your code here.
	// 初始化队列
	c.mapTasksReady = make(map[int]Task)
	c.reduceTasksReady = make(map[int]Task)
	c.mapTasksInProgress = make(map[int]Task)
	c.reduceTasksInProgress = make(map[int]Task)

	// 初始化
	numFile := len(files)
	for i, file := range files {
		c.mapTasksReady[i] = Task{
			Filename:  file,
			TaskType:  Map,
			TaskID:    i,
			NReduce:   nReduce,
			MMap:      numFile,
			TimeStamp: time.Now().Unix()}
	}
	//c.reduceReady = false // 这个字段还没提到,待会儿说
	c.NReduce = nReduce
	c.MMap = numFile

	c.server()
	return &c
}

然后补齐完成上一部分中预留的RPC

GiveTask 方法

整个MapReduce阶段分为Map和Reduce阶段。在之前的定义过程中,我们设置了四个数据结构。

分配任务包含如下步骤:

  1. 检查挂了的任务,然后重新分配到Ready队列中

  2. 处于Map阶段

    • Ready队列中还有任务,则分配给worker,并且加入InProgress队列。
    • Ready队列中无任务(全部完成),InProgress队列有任务,则等待1s,待Map阶段完成后再进入Reduce阶段

    当InProgress也无任务时,则说明Map阶段全部完成。

    利用reduceReady变量记录Map阶段是否完成。

    生成所有Reduce任务,并加入reduceTasksReady队列

  3. 处于Reduce阶段

    • Ready队列中还有任务,则分配给worker,并且加入InProgress队列。
    • Ready队列中无任务(全部完成),InProgress队列有任务,则等待1s,等待Reduce阶段完成。
    • Ready与InProgress队列清空后,则通知worker任务完成,
// GiveTask : 返回reply,告知worker任务信息
func (c *Coordinator) GiveTask(args *RPCArgs, reply *RPCReply) error {
	// 操作队列的时候要加锁
	c.mutex.Lock()
	defer c.mutex.Unlock()

	// 重启挂了的任务(超过10s)
	curTime := time.Now().Unix()
	for i, task := range c.mapTasksInProgress {
		if curTime-task.TimeStamp > 10 {
			c.mapTasksReady[i] = task
			delete(c.mapTasksInProgress, i)
			fmt.Printf("重启Map任务 %d", i)

		}
	}
	for i, task := range c.reduceTasksInProgress {
		if curTime-task.TimeStamp > 10 {
			c.reduceTasksReady[i] = task
			delete(c.reduceTasksInProgress, i)
			fmt.Printf("重启Reduce任务 %d", i)

		}
	}

	// 处于map阶段
	if len(c.mapTasksReady) > 0 {
		// 因为map遍历是随机的,因此随机取一个任务
		for i, task := range c.mapTasksReady {
			// 设置当前task的时间戳
			task.TimeStamp = time.Now().Unix()
			// 将这个任务发给reply
			reply.TaskInfo = task
			// 将这个任务转移到inProgress队列
			c.mapTasksInProgress[i] = task
			delete(c.mapTasksReady, i)
			fmt.Printf("发送Map任务给Worker", reply.TaskInfo)
			return nil
		}
	} else if len(c.mapTasksInProgress) > 0 {
		// 仍然有map任务在执行,则等待
		reply.TaskInfo = Task{TaskType: Wait}
		return nil
	}

	// 检测是否完成map阶段
	// 在进入reduce阶段前,要生成所有的Reduce Task
	if !c.reduceReady {
		for i := 0; i < c.NReduce; i++ {
			c.reduceTasksReady[i] = Task{
				TaskType:  Reduce,
				TaskID:    i,
				NReduce:   c.NReduce,
				MMap:      c.MMap,
				TimeStamp: time.Now().Unix()}
		}
		c.reduceReady = true
	}

	// 处于reduce阶段
	if len(c.reduceTasksReady) > 0 {
		// 因为map遍历是随机的,因此随机取一个任务
		for i, task := range c.reduceTasksReady {
			// 设置当前task的时间戳
			task.TimeStamp = time.Now().Unix()
			// 将这个任务发给reply
			reply.TaskInfo = task
			// 将这个任务转移到inProgress队列
			c.reduceTasksInProgress[i] = task
			delete(c.reduceTasksReady, i)
			fmt.Printf("发送Reduce任务给Worker", reply.TaskInfo)
			return nil
		}
	} else if len(c.reduceTasksInProgress) > 0 {
		// 仍然有reduce任务在执行,则等待
		reply.TaskInfo = Task{TaskType: Wait}
		return nil
	} else {
		// 完成了所有任务
		reply.TaskInfo = Task{TaskType: Completed}
	}

	return nil
}

TaskDone 方法

worker会告知Coordinator完成了Task,通过该方法可以判断Map或Reduce任务,并且从队列中删除任务。

// 根据args判断任务完成情况
func (c *Coordinator) TaskDone(args *RPCArgs, reply *RPCReply) error {
	c.mutex.Lock()
	defer c.mutex.Unlock()

	switch args.TaskInfo.TaskType {
	case Map:
		delete(c.mapTasksInProgress, args.TaskInfo.TaskID)
		fmt.Printf("Map task %d done, %d tasks left\n",
			args.TaskInfo.TaskID, len(c.mapTasksInProgress)+len(c.mapTasksReady))
	case Reduce:
		delete(c.reduceTasksInProgress, args.TaskInfo.TaskID)
		fmt.Printf("Reduce task %d done, %d tasks left\n",
			args.TaskInfo.TaskID, len(c.reduceTasksInProgress)+len(c.reduceTasksReady))
	}
	return nil
}

Done 方法

如果四个队列全部清空,则说明完成了所有的任务

func (c *Coordinator) Done() bool {
	ret := false

	// Your code here.
	if len(c.mapTasksReady) == 0 &&
		len(c.mapTasksInProgress) == 0 &&
		len(c.reduceTasksReady) == 0 &&
		len(c.reduceTasksInProgress) == 0 {
		ret = true
	}

	return ret
}

遇到的问题

  1. 在使用Goland进行代码调试的时候,运行代码是正常的。

    运行参数,不能使用正则

    需要改成:wc.so pg-being_ernest.txt pg-dorian_gray.txt pg-frankenstein.txt pg-grimm.txt pg-huckleberry_finn.txt pg-metamorphosis.txt pg-sherlock_holmes.txt pg-tom_sawyer.txt

    Goland调试配置文件如下所示:

    image-20221019032131576

    但是调试的时候会出现plugin was built with a different version of package internal/cpu" 的问题。经过排查,发现是哈希验证时发生了问题。为了简单快速地解决问题,将go源码中runtime/plugin.go中第51-56行(如下所示)注释掉,即可成功调试。

    for _, pkghash := range md.pkghashes {
    		if pkghash.linktimehash != *pkghash.runtimehash {
    			md.bad = true
    			return "", nil, "plugin was built with a different version of package " + pkghash.modulename
    		}
    	}
    

参考资料

  1. https://www.guodong.plus/2020/1227-214432/

  2. [https://zhuanlan.zhihu.com/p/260752052](