hyperledger fabric v0.6 pbft 源码分析(二)broadcast.go

本贴最后更新于 2686 天前,其中的信息可能已经斗转星移

这部分和 golang 相关的特性紧密相连,所以先大致讲一下 golang 的特性

go goroutine

先看一个例子:

// 例1 func main() { go fmt.Println("routine") fmt.Println("main") // 至此,程序运行结束, // 所有活跃的goroutine被杀死 }

这里的 go 关键字创建了一个 goroutine,它可以理解为一个轻量级线程。当 main 函数执行完时,会杀死所有 goroutine,所以运行这个代码有时候输出:

main

也可能输出:

main
routine

下面是个类似的例子:

// 例2 func main() { go other() fmt.Println("main") time.Sleep(time.Second * 1) // 等待1秒 } func other() { fmt.Println("before") time.Sleep(time.Second * 2) fmt.Println("after") }

这里 after 一定不会被输出,before 可能非常大会输出(除非 1s 等待后,other 还没有执行),main 一定会输出。

但是,这总有不确定的因素,一般来说,通过 sleep 的方式来实现线程之间的通信是不太可能的。下面介绍 golang 的另一个特性-channel

go channel

// 例3 var exitChan = make(chan struct{}) func main() { go other() fmt.Println("main") <-exitChan } func other() { fmt.Println("before") time.Sleep(time.Second * 2) fmt.Println("after") close(exitChan) }

channel 是有类型的,这里定义了一个 struct{} 类型的 channel,定义 channel 需要使用 chan 修饰。这里使用了空结构体的管道:struct{}。这明确地指明该管道仅用于发信号,而不是传递数据。
在主函数中,使用 <-exitChan 来读取 channel 内容,如果 channel 是空的,线程就会被阻塞,当调用 close(exitChan) 关闭管道时,会返回一个零值,使得主函数退出。这段代码一定会输出 3 个单词(顺序可能不一样)。

再看一个类似的例子

// 例4 var exit1Chan = make(chan struct{}) var exit2Chan = make(chan struct{}) func main() { go work1("work1") go work2("work2") <-exit1Chan <-exit2Chan fmt.Println("main") } func work1(text string) { time.Sleep(time.Second * 2) fmt.Println("working:" + text) close(exit1Chan) } func work2(text string) { time.Sleep(time.Second * 2) fmt.Println("working:" + text) close(exit2Chan) }

主 goroutine 会一直等待两个线程全部 完工 后才继续,这是典型的 master/slave 模式的实现。

死锁

再看一个非常相似的例子:

// 例5 var exit1Chan = make(chan struct{}) var exit2Chan = make(chan struct{}) var work1Chan = make(chan struct{}) var work2Chan = make(chan struct{}) func main() { go work1("work1") go work2("work2") <-exit1Chan <-exit2Chan fmt.Println("main") } func work1(text string) { time.Sleep(time.Second * 2) fmt.Println("working:" + text) <-work2Chan work1Chan <- struct{}{} close(exit1Chan) } func work2(text string) { time.Sleep(time.Second * 2) fmt.Println("working:" + text) <-work1Chan work2Chan <- struct{}{} close(exit2Chan) }

work1 与 work2 相互竞争彼此的资源,导致程序死锁。但 golang 对死锁提供了检测机制,使得死锁也不是那么难解决。

select

select 是 Go 语言并发工具集中非常重要的工具。select 用于从一组可能的分支中选择一个进行处理。如果任意一个分支都可以进一步处理,则从中随机选择一个,执行对应的语句。否则,如果又没有默认分支(default case),select 语句则会阻塞,直到其中一个分支可以处理。

// 例6 var okchanel = make(chan bool) func main() { go work1() select { case <-okchanel: fmt.Println("work1 ok") case <-time.After(time.Second * 2): fmt.Println("Time out") } go work2() select { case <-okchanel: fmt.Println("work2 ok") case <-time.After(time.Second * 2): fmt.Println("Time out") } fmt.Println("main") } func work1() { time.Sleep(time.Second) fmt.Print("finished work1\n") okchanel <- true } func work2() { time.Sleep(time.Second * 3) fmt.Print("finished work2\n") okchanel <- true }

输出结果为:

finished work1
work1 ok
Time out
main

work1 由于等待时间短,完成了任务,而 work2 等待时间过长,未完成任务,本例在实际场景中,使用的非常多。

代码分析

回到 hyperledger 来,我们还是从测试看起:

// consensus/pbft/broadcast_test.go func TestBroadcast(t *testing.T) { m := &mockComm{ self: 1, n: 4, msgCh: make(chan mockMsg, 4), } sent := make(map[string]int) go func() { for msg := range m.msgCh { sent[msg.dest.Name]++ } }() b := newBroadcaster(1, 4, 1, time.Second, m) msg := &pb.Message{Payload: []byte("hi")} b.Broadcast(msg) time.Sleep(100 * time.Millisecond) b.Close() sentCount := 0 for _, q := range sent { if q == 1 { sentCount++ } } if sentCount < 2 { t.Errorf("broadcast did not send to all peers: %v", sent) } }

先构造了一个 mockComm,它实现了 communicator 所有接口。

m := &mockComm{ self: 1, n: 4, msgCh: make(chan mockMsg, 4), }

指定了自己的编号 1,节点数 4,消息通道缓冲大小为 4

sent := make(map[string]int) go func() { for msg := range m.msgCh { sent[msg.dest.Name]++ } }()

这里开启了一个 goroutine,一个带有 range 子句的 for 语句会依次读取发往管道的值,直到该管道关闭。这里读取 m.msgCh 后,将对应节点的消息数加一。

b := newBroadcaster(1, 4, 1, time.Second, m) msg := &pb.Message{Payload: []byte("hi")} b.Broadcast(msg) time.Sleep(100 * time.Millisecond) b.Close() sentCount := 0 for _, q := range sent { if q == 1 { sentCount++ } } if sentCount < 2 { t.Errorf("broadcast did not send to all peers: %v", sent) }

构造了一个新的 Broadcaster 产生一个消息,并广播,然后收集消息数为 1 的节点(因为广播要保证消息只能被目标节点群各接收 1 遍),当节点数为 3 的时候表示测试成功。

继续跟踪到源码

func newBroadcaster(self uint64, N int, f int, broadcastTimeout time.Duration, c communicator) *broadcaster { queueSize := 10 // XXX increase after testing chans := make(map[uint64]chan *sendRequest) b := &broadcaster{ comm: c, f: f, broadcastTimeout: broadcastTimeout, msgChans: chans, closedCh: make(chan struct{}), } for i := 0; i < N; i++ { if uint64(i) == self { continue } chans[uint64(i)] = make(chan *sendRequest, queueSize) } // We do not start the go routines in the above loop to avoid concurrent map read/writes for i := 0; i < N; i++ { if uint64(i) == self { continue } go b.drainer(uint64(i)) } return b }

先创建了一个 broadcaster 对象,其中比较关键的是 msgChans 成员,它是一个 map,键对应的是 peer 的 id,值对应的是 sendRequest 类型的 channel,并且将它的缓冲区设置为 queueSize。msgChans 不包括自己 id 的 channel( != self)。

创建完后,针对每一个 id 启动了 go b.drainer(uint64(i))

func (b *broadcaster) drainer(dest uint64) { successLastTime := false destChan, exsit := b.msgChans[dest] // Avoid doing the map lookup every send if !exsit { logger.Warningf("could not get message channel for replica %d", dest) return } for { select { case send := <-destChan: successLastTime = b.drainerSend(dest, send, successLastTime) case <-b.closedCh: for { // Drain the message channel to free calling waiters before we shut down select { case send := <-destChan: send.done <- false b.closed.Done() default: return } } } } }

先取出 id 对应的 channel,然后就是个死循环。

当 destChan 有值的时候,调用 drainerSend 进行发送。
当 b.closedCh 关闭时,将对应的 destChan 的 msg 取出来,置为 false,然后返回。

初始的时候 destChan 没有值,所以阻塞到 send 函数被调用。

  • b.Broadcast(msg)
func (b *broadcaster) Broadcast(msg *pb.Message) error { return b.send(msg, nil) }

继续看

func (b *broadcaster) send(msg *pb.Message, dest *uint64) error { select { case <-b.closedCh: return fmt.Errorf("broadcaster closed") default: } var destCount int var required int if dest != nil { destCount = 1 required = 1 } else { destCount = len(b.msgChans) required = destCount - b.f } wait := make(chan bool, destCount) if dest != nil { b.closed.Add(1) b.unicastOne(msg, *dest, wait) } else { b.closed.Add(len(b.msgChans)) for i := range b.msgChans { b.unicastOne(msg, i, wait) } } succeeded := 0 timer := time.NewTimer(b.broadcastTimeout) // This loop will try to send, until one of: // a) the required number of sends succeed // b) all sends complete regardless of success // c) the timeout expires and the required number of sends have returned outer: for i := 0; i < destCount; i++ { select { case success := <-wait: if success { succeeded++ if succeeded >= required { break outer } } case <-timer.C: for i := i; i < required; i++ { <-wait } break outer } } return nil }

先确定 destCount(目标发送的个数)和 required(pbft 要求的个数 2f+1),然后一个个调用 b.unicastOne(msg, i, wait)(这个函数很简单,就是向 destChan 放入 msg),后面使用一个死循环来监视发送的进程,需要满足 3 个条件之一才能退出循环:

1.收到了 required 个 ok
2.收到了所有的回复
3.如果超时,收到了 required 个消息

与此同时,阻塞的函数 drainer 由于有了 msg,于是调用 drainerSend 进行真正的发送,由于这里具体发送依赖于 Unicast 的实现,测试端已经实现了这个函数,就是将所有需要发送的消息,放入 m.msgCh 所以测试代码最开始才有:

for msg := range m.msgCh { sent[msg.dest.Name]++ }

看到这里基本上逻辑上就通了。还有就是这个 send.done 其实就是 wait 这个 channel,每当一个消息发送成功的时候就向 wait 写入一个 true,否则写入 false。

总的流程如下:
绘图文件
44.png

所以正常情况下,只要收到正确的 2 个以上消息,就会测试成功。

看下一个例子:

func TestBroadcastStuck(t *testing.T) { m := &mockStuckComm{ mockComm: mockComm{ self: 1, n: 4, msgCh: make(chan mockMsg), }, done: make(chan struct{}), } sent := make(map[string][]string) go func() { for msg := range m.msgCh { key := string(msg.msg.Payload) sent[key] = append(sent[key], msg.dest.Name) } }() b := newBroadcaster(1, 4, 1, time.Second, m) maxc := 20 for c := 0; c < maxc; c++ { b.Broadcast(&pb.Message{Payload: []byte(fmt.Sprintf("%d", c))}) } done := make(chan struct{}) go func() { select { case <-done: return case <-time.After(time.Second): t.Fatal("blocked") } }() time.Sleep(100 * time.Millisecond) close(m.done) b.Close() close(done) sendDone := 0 for _, q := range sent { if len(q) >= 2 { sendDone++ } } if sendDone != maxc { t.Errorf("expected %d sent messages: %v", maxc, sent) } }

这个例子和上面的差不多,但是重写了 Unicast,它将自己设定为 vp1,并且当收到 vp0 消息时,总是返回错误(超时或者 channel 关闭错误)。定义了一个新的 map————sent,它将每一个消息及其收到这个消息的节点存起来,最后看节点数超过两个的消息个数等不等于预设的 maxc 值,相等表示测试成功。

for _, q := range sent { fmt.Printf("----%d\n", len(q)) if len(q) >= 2 { sendDone++ } } if sendDone != maxc { t.Errorf("expected %d sent messages: %v", maxc, sent) }

(ps..这里我觉得可以改成 : len(q) > 2)

接下来的 func TestBroadcastUnicast(t *testing.T) 非常简单,就是测试单播的函数

然后

func TestBroadcastAllFail(t *testing.T)
将接收到的消息全部失败,如果不阻塞测试成功,否则会抛出超时的错误。

下面这个例子有点意思

func TestBroadcastTimeout(t *testing.T) { expectTime := 10 * time.Second deltaTime := 50 * time.Millisecond m := &mockIndefinitelyStuckComm{ mockComm: mockComm{ self: 1, n: 4, msgCh: make(chan mockMsg), }, done: make(chan struct{}), } b := newBroadcaster(1, 4, 1, expectTime, m) broadcastDone := make(chan time.Time) beginTime := time.Now() go func() { b.Broadcast(&pb.Message{Payload: []byte(fmt.Sprintf("%d", 1))}) broadcastDone <- time.Now() }() checkTime := expectTime + deltaTime select { case endTime := <-broadcastDone: t.Log("Broadcast consume time: ", endTime.Sub(beginTime)) close(broadcastDone) close(m.done) return case <-time.After(checkTime): close(broadcastDone) close(m.done) t.Fatalf("Broadcast timeout after %v, expected %v", checkTime, expectTime) } }

它在启动的时候,设置了一个带有超时时间的 Broadcaster,然后将得到当前时间记为 beginTime,调用发送广播的函数,而函数一定 vp0 阻塞,且 vp2,vp3 立即失败,所以 Broadcaster 超时后返回,返回后将当前时间写下来,看一共花了多久。误差不超过 expectTime + deltaTime,算测试成功。

下一个测试 TestBroadcastIndefinitelyStuck 与之前的比较类似,区别在于把超时时间降低了,然后让它不断超时,但是不能超时到 10s,若正常退出则成功。

总结

通过几个测试的案例,基本上覆盖了全部的源码,从中不仅学到了源码的设计思想,而且学到了测试的一些方法。

  • golang

    Go 语言是 Google 推出的一种全新的编程语言,可以在不损失应用程序性能的情况下降低代码的复杂性。谷歌首席软件工程师罗布派克(Rob Pike)说:我们之所以开发 Go,是因为过去 10 多年间软件开发的难度令人沮丧。Go 是谷歌 2009 发布的第二款编程语言。

    498 引用 • 1395 回帖 • 254 关注
  • hyperledger
    3 引用

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...