【6.824 笔记】LEC2: RPC and Threads

计算机系统 / 2022-10-17

为什么要用Go?

  • 对线程的良好支持
  • 方便的RPC
  • 类型安全
  • 收集的垃圾(释放问题后没有用)。
  • 线程+GC特别有吸引力!
  • 相对简单

线程 Threads

一个有用的结构化工具(structuring tool),但可能很棘手

Go称其为goroutines;其他人则称其为线程。

线程 = “执行的线程”(thread of execution)

线程允许一个程序同时做很多事情,每个线程都是串行执行的,就像一个普通的非线程程序。
线程会共享内存,每个线程都包括一些每线程的状态:程序计数器、寄存器、堆栈

为什么要用多线程?

它们表达了并发性,这是你在分布式系统中需要的。

  1. I/O并发性

客户端向许多服务器并行发送请求并等待回复。
服务器处理多个客户端请求;每个请求都可能被阻塞。
在等待磁盘为客户X读取数据时,可以处理一个来自客户Y的请求。

  1. 多核性能
    在几个核心上并行地执行代码。

  2. 方便
    在后台,每秒钟一次,检查每个工作者是否仍然活着。

线程的问题。

  1. 共享数据
    例如,如果两个线程同时做n = n + 1怎么办?或者,一个线程在读,另一个线程在增量?

解决方法:

  • 使用锁(Go的sync.Mutex)。
  • 或避免分享易变数据
  1. 线程之间的协调
    例如,一个线程在生产数据,另一个线程在消费数据。

消费者如何能够等待(并释放CPU)?生产者如何能唤醒消费者?

解决方法:

  • 使用Go channel或sync.Cond或WaitGroup
  1. 死锁
    锁 / 通信之间的调度关系陷入死循环(如RPC或Go channel)
    image-1666003630036

网页爬虫案例解析

老师通过以下的网页爬虫案例,来展示线程:

package main

import (
	"fmt"
	"sync"
)

//
// Several solutions to the crawler exercise from the Go tutorial
// https://tour.golang.org/concurrency/10
//

//
// Serial crawler
//

func Serial(url string, fetcher Fetcher, fetched map[string]bool) {
	if fetched[url] {
		return
	}
	fetched[url] = true
	urls, err := fetcher.Fetch(url)
	if err != nil {
		return
	}
	for _, u := range urls {
		Serial(u, fetcher, fetched)
	}
	return
}

//
// Concurrent crawler with shared state and Mutex
//

type fetchState struct {
	mu      sync.Mutex
	fetched map[string]bool
}

func ConcurrentMutex(url string, fetcher Fetcher, f *fetchState) {
	f.mu.Lock()
	already := f.fetched[url]
	f.fetched[url] = true
	f.mu.Unlock()

	if already {
		return
	}

	urls, err := fetcher.Fetch(url)
	if err != nil {
		return
	}
	var done sync.WaitGroup
	for _, u := range urls {
		done.Add(1)
        u2 := u
		go func() {
			defer done.Done()
			ConcurrentMutex(u2, fetcher, f)
		}()
		//go func(u string) {
		//	defer done.Done()
		//	ConcurrentMutex(u, fetcher, f)
		//}(u)
	}
	done.Wait()
	return
}

func makeState() *fetchState {
	f := &fetchState{}
	f.fetched = make(map[string]bool)
	return f
}

//
// Concurrent crawler with channels
//

func worker(url string, ch chan []string, fetcher Fetcher) {
	urls, err := fetcher.Fetch(url)
	if err != nil {
		ch <- []string{}
	} else {
		ch <- urls
	}
}

func master(ch chan []string, fetcher Fetcher) {
	n := 1
	fetched := make(map[string]bool)
	for urls := range ch {
		for _, u := range urls {
			if fetched[u] == false {
				fetched[u] = true
				n += 1
				go worker(u, ch, fetcher)
			}
		}
		n -= 1
		if n == 0 {
			break
		}
	}
}

func ConcurrentChannel(url string, fetcher Fetcher) {
	ch := make(chan []string)
	go func() {
		ch <- []string{url}
	}()
	master(ch, fetcher)
}

//
// main
//

func main() {
	fmt.Printf("=== Serial===\n")
	Serial("http://golang.org/", fetcher, make(map[string]bool))

	fmt.Printf("=== ConcurrentMutex ===\n")
	ConcurrentMutex("http://golang.org/", fetcher, makeState())

	fmt.Printf("=== ConcurrentChannel ===\n")
	ConcurrentChannel("http://golang.org/", fetcher)
}

//
// Fetcher
//

type Fetcher interface {
	// Fetch returns a slice of URLs found on the page.
	Fetch(url string) (urls []string, err error)
}

// fakeFetcher is Fetcher that returns canned results.
type fakeFetcher map[string]*fakeResult

type fakeResult struct {
	body string
	urls []string
}

func (f fakeFetcher) Fetch(url string) ([]string, error) {
	if res, ok := f[url]; ok {
		fmt.Printf("found:   %s\n", url)
		return res.urls, nil
	}
	fmt.Printf("missing: %s\n", url)
	return nil, fmt.Errorf("not found: %s", url)
}

// fetcher is a populated fakeFetcher.
var fetcher = fakeFetcher{
	"http://golang.org/": &fakeResult{
		"The Go Programming Language",
		[]string{
			"http://golang.org/pkg/",
			"http://golang.org/cmd/",
		},
	},
	"http://golang.org/pkg/": &fakeResult{
		"Packages",
		[]string{
			"http://golang.org/",
			"http://golang.org/cmd/",
			"http://golang.org/pkg/fmt/",
			"http://golang.org/pkg/os/",
		},
	},
	"http://golang.org/pkg/fmt/": &fakeResult{
		"Package fmt",
		[]string{
			"http://golang.org/",
			"http://golang.org/pkg/",
		},
	},
	"http://golang.org/pkg/os/": &fakeResult{
		"Package os",
		[]string{
			"http://golang.org/",
			"http://golang.org/pkg/",
		},
	},
}

两种风格的解决方案:

串行爬虫:

通过递归序列调用进行DFS,同时利用Fetch这个Map来判断网页是否取到过

	if fetched[url] {
		return
	}
	fetched[url] = true

当我们给Serial()加上goroutine时,会取到多次网页
image-1666015497225

ConcurrentMutex爬虫。

为每个获取的页面创建一个线程
许多并发的取数,更高的取数率
"go func "创建了一个goroutine并开始运行。
func…是一个 “匿名函数”
这些线程共享 "获取的 "地图
因此,只有一个线程会获取任何特定的页面
为什么要使用Mutex(Lock()和Unlock())?
一个原因是。
两个不同的网页包含指向同一个URL的链接
两个线程同时获取这两个页面
T1读取的是[url],T2读取的是[url]。
都看到网址没有被取走(已经假)。
两者都获取,这是不对的
该锁导致检查和更新是原子性的
所以只有一个线程看到已经
false
另一个原因。
在内部,map是一个复杂的数据结构(树?可扩展的哈希?)
并发的更新/更新可能会破坏内部的不变性
同时进行的更新/读取可能会使读取崩溃
如果我把Lock()/Unlock()注释掉怎么办?
运行crawler.go
为什么它能发挥作用?
go run -race crawler.go
即使在输出正确的情况下,也会检测到竞赛!
ConcurrentMutex爬虫如何决定它已经完成?
同步.等待组
Wait()等待所有的Add()s被Done()s平衡。
即等待所有子线程的完成
[图示:goroutines的树,覆盖在循环URL图上] 。
树上的每个节点都有一个WaitGroup。
这个爬虫可能会创建多少个并发线程?

并行通道爬行器
a围棋频道。
通道是一个对象
ch := make(chan int)
一个通道让一个线程向另一个线程发送一个对象
ch <- x
发件人等待,直到某个goroutine收到
y := <- ch
对于y :=范围ch
接收者等待,直到某个goroutine发送了
渠道既能沟通又能同步
几个线程可以在一个通道上发送和接收。
渠道很便宜
记住:发送方阻断,直到接收方收到为止!
“同步”
谨防僵局
并发通道master()
master()创建一个worker goroutine来获取每个页面。
worker()在一个通道上发送页面URL的片断
多个工人在单一通道上发送
master()从通道中读取URL片断
主人在哪条线上等待?
主站在等待时是否使用CPU时间?
不需要锁定取来的地图,因为它不是共享的!
主人怎么知道它已经完成了?
保持n中工人的数量。
每个工人正好在通道上发送一个项目。

为什么多个线程使用同一个通道不是一场比赛?

当工人线程写进URL的片断时,是否存在竞赛。
而主线程读取该片断,不需要锁定?

  • 工作者只在*发送前写入分片
    主站只在接收后读取分片
    所以他们不能同时使用切片。

什么时候使用共享和锁,而不是渠道?
大多数问题都可以用两种方式解决
什么是最有意义的,取决于程序员如何思考
状态-共享和锁
沟通-渠道
对于6.824实验室,我建议共享+锁的状态。
和sync.Cond或channels或time.Sleep()用于等待/通知。

远程程序调用(RPC)
分布式系统机制的一个关键部分;所有实验室都使用RPC
目标:易于编程的客户/服务器通信
隐藏网络协议的细节
将数据(字符串、数组、地图等)转换成 “线格式”

RPC消息图。
客户端服务器
要求—>
<–回应

软件结构
客户端应用程序处理程序fns
桩基消防队调度员
RPC lib RPC lib
网 ------------ 网

去的例子:时间表上的kv.go页面
一个玩具钥匙/价值存储服务器 – Put(key,value), Get(key)->value
使用Go的RPC库
常见的是。
为每个服务器处理程序声明Args和Reply结构。
客户。
connect()的Dial()创建一个与服务器的TCP连接。
get()和put()是客户端的 “存根”
Call()要求RPC库执行调用。
你可以指定服务器函数的名称、参数、放置回复的位置。
库中的marshalls args,发送请求,等待,取消marshalls reply
Call()的返回值表明它是否得到了回复
通常你也会有一个 reply.Err 表示服务级别的失败。
服务器。
Go要求服务器声明一个带有方法的对象作为RPC处理程序
然后服务器将该对象注册到RPC库中
服务器接受TCP连接,将其交给RPC库
RPC库
读取每个请求
为这个请求创建一个新的goroutine
沼泽地要求
查找命名的对象(在Register()创建的表中)。
调用对象的命名方法(dispatch)。
marshalls回复
在TCP连接上写入回复
服务器的Get()和Put()处理程序
必须锁定,因为RPC库为每个请求创建一个新的goroutine
读取args; 修改回复

一些细节。
绑定:客户机如何知道要与哪台服务器计算机对话?
对于Go的RPC,服务器名称/端口是Dial的一个参数
大型系统有某种名称或配置服务器
集合:将数据格式化为数据包
Go的RPC库可以传递字符串、数组、对象、地图,等等。
Go通过复制指向的数据来传递指针。
不能传递通道或功能

RPC问题:如何应对失败?
例如,丢失数据包、网络故障、服务器速度慢、服务器崩溃等。

对于客户端RPC库来说,失败是什么样子的?
客户端从未看到服务器的响应
客户端并不*知道服务器是否看到了这个请求!
[各点的损失示意图]
也许服务器从未见过这个请求
可能是服务器执行了,在发送回复前崩溃了
可能是服务器执行了,但在发送回复之前网络就中断了。

最简单的故障处理方案。“尽力而为”
Call()等待响应一段时间
如果没有到达,则重新发送请求
多做几次
然后放弃并返回一个错误

问:"最佳努力 "对应用程序来说是否容易应对?

一个特别糟糕的情况。
客户端执行
Put(“k”, 10);
Put(“k”, 20);
双双成功
Get(“k”)会产生什么?
[图,超时,重发,原件晚到]

问:最好的努力是否可以?
只读操作
如果重复操作,则什么都不做
例如,DB检查记录是否已经被插入

更好的RPC行为。“最多一次”
理念:服务器RPC代码检测重复请求
返回之前的回复,而不是重新运行处理程序
问:如何检测一个重复的请求?
客户端在每个请求中包括唯一的ID(XID)。
使用相同的XID进行再发送
服务器。
如果看到[xid]。
r = old[xid]
否则
r = handler()
old[xid] = r
seen[xid] = true

一些最一次的复杂情况
这将在实验室3中出现。
如果两个客户使用同一个XID怎么办?
大的随机数?
将唯一的客户ID(ip地址)与序列#结合起来?
服务器最终必须丢弃有关旧RPC的信息
什么时候丢弃是安全的?
的想法。
每个客户都有一个独特的ID(也许是一个大的随机数)。
每个客户的RPC序列号
客户端在每个RPC中包括 “看到的所有回复<=X”。
很像TCP序列号和acks
或者一次只允许客户端有一个未完成的RPC
seq+1的到来允许服务器丢弃所有<= seq的内容。
如何在原请求仍在执行的情况下处理重复的请求?
服务器还不知道回复
理念:每个正在执行的RPC的 "待定 "标志;等待或忽略

如果一个最多只有一次的服务器崩溃并重新启动怎么办?
如果内存中的信息最多只有一次重复,服务器会忘记
并在重新启动后接受重复的请求
也许它应该把重复的信息写到磁盘上
也许复制服务器也应该复制重复的信息

Go RPC是 "最多一次 "的一种简单形式
打开TCP连接
向TCP连接写请求
Go RPC永远不会重新发送一个请求
这样服务器就不会看到重复的请求
如果没有得到回复,Go RPC代码会返回一个错误。
也许在超时后(来自TCP)
也许服务器没有看到请求
也许服务器处理了请求,但在回复回来之前,服务器/网络发生了故障。

那 "正好一次 "呢?
无限制的重试加重复检测加容错服务
实验室3