go 语言中 for 循环的并发安全问题

本贴最后更新于 1660 天前,其中的信息可能已经水流花落

hello,大家好,欢迎来到银之庭。我是 Z,一个普通的程序员。今天我们来看一个我在工作中刚发现的 go 语言里 for 循环的一个问题。

1. 结论

先说结论,用尽可能简练的语言描述就是:在 go 语言中,用 for 循环创建子协程,用 errgroup 管理这些协程,并会向子协程中传递参数的情况下,有可能产生并发安全问题,需要复制一下循环中的临时变量传给子协程,而不能直接把临时变量传过去。

2. 现象

在某个 go 语言开发的工作项目中,我实现了个小需求,代码逻辑抽象出来大概是这样:

package main import ( "context" "fmt" "golang.org/x/sync/errgroup" "time" ) type Param struct { Id int64 } func test1() { // 传递参数的管道 ch := make(chan *Param) // 创建errgroup,便于实现主协程等待子协程的功能 gCtx, _ := context.WithTimeout(context.Background(), 1*time.Hour) g, gCtx := errgroup.WithContext(gCtx) // 起子协程往管道里不断传递参数 g.Go(func() error { return send(ch) }) // 在主协程里从管道中取出参数,然后对每个参数起一个子协程去处理 for param := range ch { fmt.Printf("get param: %+v\n", param) g.Go(func() error { return dealParam(param) }) } // 等待所有子协程处理完毕 if err := g.Wait(); err != nil { fmt.Printf("err: %+v\n", err) } } func send(ch chan *Param) error { for i := 1; ; i++ { p := Param{Id: int64(i)} ch <- &p } } func dealParam(param *Param) error { fmt.Printf("deal param: %+v\n", param) return nil }

业务逻辑在代码中都已经注释出了,这里为了方便演示,直接死循环生产参数流,实际的业务代码里参数流是有限的。在从管道中接到参数和处理参数的地方我打了日志,尝试运行一下后通过看这两处的日志可以发现问题。我的一次运行的输入如下:

image.png

从上面红框里的内容可以很明显地看到,id 为 71648 的参数从管道中获取了一次,但却被两个子协程处理了,而且再仔细观察会发现,71646 和 71648 都被处理了两次,而 71647 明明从管道中取出来了,却一次都没被处理。可能有些小伙伴已经猜到原因了,这不是重复执行的问题,而是主协程和子协程并发读写了某个变量,导致子协程读取到了被覆盖后的内容。

3. 本质

问题实际出在 for 循环这里:

for param := range ch { fmt.Printf("get param: %+v\n", param) g.Go(func() error { return dealParam(param) }) }

根据现象可以想象到,go 语言对 for 循环的实现是:

  1. 开辟出一块内存空间,作为临时变量 param;
  2. 每次从管道中取到一个值,覆盖到 param 这块内存空间里。

g.Go() 函数内部是起了个子协程去运行我们传入的函数的,这样一来,在我们的函数运行时,param 的值如果已经被后续的值覆盖了,那么前后两个子协程就拿到相同的 param 的值了,而之前那个没来得及处理的 param 值,就没机会得到处理了。

下面我们来验证一下。主要是验证 param 这个临时变量的内存地址在每次循环中是否是同一个,可以在打印语句中把 &param 的值打印出来,如下:

for param := range ch { fmt.Printf("get param: %+v\n", &param) g.Go(func() error { return dealParam(param) }) }

我一次运行的结果如下:

image.png

可以看出,临时变量 param 确实是同一块内存地址,不断覆盖。这里额外说一点,我们的函数 dealParam 里接收到的 param 参数,是复制了一个传进来的,并不是原来的内存地址。

4. 解决方案

知道了原因,解决就比较简单了,只要在 g.Go() 前,把 param 临时变量复制一份出来,把复制的变量传到子协程里就行了,因为复制的变量不会被覆盖,所以就避免了这个问题,代码如下:

for param := range ch { // 复制param,防止param的值被后续的新值覆盖 tempParam := param fmt.Printf("get param: %+v\n", tempParam) g.Go(func() error { return dealParam(tempParam) }) }

再次运行,就可以发现没有上面的重复处理参数和漏处理参数的问题了:

image.png

这里还有个小细节:管道里传递的是参数对象的指针,而不是参数对象本身,这样在复制 param 时,就可以值复制一个指针,而不是复制一个对象,减少复制的开销。

当然,只有使用 errgroup 时才会有这个问题(下面会讲到),所以也可以直接使用 go 关键字起子协程,不用 errgroup,不过就需要自己做协程间的同步了。

5. 扩展问题

5.1 普通循环是否有问题

上面的 for 循环是从管道里取数据,如果直接对一个列表进行遍历,还会不会有这种情况呢?我们来实验一下,代码如下:

func test2() { params := []*Param{} for i := 0; i < 100; i++ { params = append(params, &Param{Id: int64(i)}) } gCtx, _ := context.WithTimeout(context.Background(), 1*time.Hour) g, gCtx := errgroup.WithContext(gCtx) for _, param := range params { g.Go(func() error { return dealParam(param) }) } if err := g.Wait(); err != nil { fmt.Printf("err: %+v\n", err) } }

我一次运行的结果如下:

image.png

可以发现,问题更加严重,因为主协程执行更快,会有更多子协程拿到同样的 param 的值。所以,可以确定,这是 fo 语言里 for 循环实现的特性,和管道无关只要是 for 循环创建协程,并传入了变量,就需要注意变量并发读写的问题

5.2 go 关键字是否有问题

上面的例子中,我们都使用了 errgroup 的 API,如果不用 errgroup,直接 go 出子协程呢?还会有这种问题吗?我们来实验一下,代码如下:

func test3() { params := []*Param{} for i := 0; i < 100; i++ { params = append(params, &Param{Id: int64(i)}) } for _, param := range params { go dealParam(param) } time.Sleep(10 * time.Second) }

这里为了方便,直接在主协程里休眠了 10 秒,确保子协程都执行完,实际上应该用锁来实现协程间同步。运行之后会发现,没有上面的问题,可以合理猜测:使用 go dealParam(param) 这句代码时,复制一个新参数传给 dealParam() 的动作也是主协程里做的,所以没有并发读写 param 的问题,而用 errgroup 的 API g.Go() 时,复制一个新参数传给 dealParam() 的动作是在子协程里做的,所以会产生并发读写 param 的问题

5.3 Java 是否有同样的问题

我写 Java 的时间更长,但好像从来没注意过这种问题,是有过这样的 bug 但我没注意,还是 Java 里就不存在这样的问题呢?我们来验证一下,代码如下:

import lombok.Builder; import lombok.Data; import java.util.concurrent.CountDownLatch; public class Test { public static void main(String[] args) throws InterruptedException { Param[] params = new Param[1000]; for (int i = 0; i < 1000; i++) { params[i] = Param.builder().id(i).build(); } CountDownLatch countDownLatch = new CountDownLatch(1000); for (Param param : params) { // System.identityHashCode可以打印对象的内存地址 System.out.println("get param: " + System.identityHashCode(param)); Worker worker = new Worker(param, countDownLatch); new Thread(worker).start(); } countDownLatch.await(); } } @Data @Builder class Param { private Integer id; } class Worker implements Runnable { private Param param; private CountDownLatch countDownLatch; public Worker(Param param, CountDownLatch countDownLatch) { this.param = param; this.countDownLatch = countDownLatch; } @Override public void run() { System.out.println("deal param: " + param.toString()); countDownLatch.countDown(); } }

大家如果运行一下会发现,并没有上面的问题,而且打印出来的 param 临时变量的地址并不是同一个,可以合理猜测:Java 的 for 循环并不会创建一个临时变量接收循环到的值,而是直接用原对象做处理

  • golang

    Go 语言是 Google 推出的一种全新的编程语言,可以在不损失应用程序性能的情况下降低代码的复杂性。谷歌首席软件工程师罗布派克(Rob Pike)说:我们之所以开发 Go,是因为过去 10 多年间软件开发的难度令人沮丧。Go 是谷歌 2009 发布的第二款编程语言。

    498 引用 • 1395 回帖 • 248 关注
  • 并发
    75 引用 • 73 回帖 • 1 关注

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...