hello,大家好,欢迎来到银之庭。我是 Z,一个普通的程序员。今天我们来看一个我在工作中刚发现的 go 语言里 for 循环的一个问题。
1. 结论
先说结论,用尽可能简练的语言描述就是:在 go 语言中,用 for 循环创建子协程,用 errgroup 管理这些协程,并会向子协程中传递参数的情况下,有可能产生并发安全问题,需要复制一下循环中的临时变量传给子协程,而不能直接把临时变量传过去。
2. 现象
在某个 go 语言开发的工作项目中,我实现了个小需求,代码逻辑抽象出来大概是这样:
package main
import (
"context"
"fmt"
"golang.org/x/sync/errgroup"
"time"
)
type Param struct {
Id int64
}
func test1() {
// 传递参数的管道
ch := make(chan *Param)
// 创建errgroup,便于实现主协程等待子协程的功能
gCtx, _ := context.WithTimeout(context.Background(), 1*time.Hour)
g, gCtx := errgroup.WithContext(gCtx)
// 起子协程往管道里不断传递参数
g.Go(func() error {
return send(ch)
})
// 在主协程里从管道中取出参数,然后对每个参数起一个子协程去处理
for param := range ch {
fmt.Printf("get param: %+v\n", param)
g.Go(func() error {
return dealParam(param)
})
}
// 等待所有子协程处理完毕
if err := g.Wait(); err != nil {
fmt.Printf("err: %+v\n", err)
}
}
func send(ch chan *Param) error {
for i := 1; ; i++ {
p := Param{Id: int64(i)}
ch <- &p
}
}
func dealParam(param *Param) error {
fmt.Printf("deal param: %+v\n", param)
return nil
}
业务逻辑在代码中都已经注释出了,这里为了方便演示,直接死循环生产参数流,实际的业务代码里参数流是有限的。在从管道中接到参数和处理参数的地方我打了日志,尝试运行一下后通过看这两处的日志可以发现问题。我的一次运行的输入如下:
从上面红框里的内容可以很明显地看到,id 为 71648 的参数从管道中获取了一次,但却被两个子协程处理了,而且再仔细观察会发现,71646 和 71648 都被处理了两次,而 71647 明明从管道中取出来了,却一次都没被处理。可能有些小伙伴已经猜到原因了,这不是重复执行的问题,而是主协程和子协程并发读写了某个变量,导致子协程读取到了被覆盖后的内容。
3. 本质
问题实际出在 for 循环这里:
for param := range ch {
fmt.Printf("get param: %+v\n", param)
g.Go(func() error {
return dealParam(param)
})
}
根据现象可以想象到,go 语言对 for 循环的实现是:
- 开辟出一块内存空间,作为临时变量 param;
- 每次从管道中取到一个值,覆盖到 param 这块内存空间里。
而 g.Go()
函数内部是起了个子协程去运行我们传入的函数的,这样一来,在我们的函数运行时,param 的值如果已经被后续的值覆盖了,那么前后两个子协程就拿到相同的 param 的值了,而之前那个没来得及处理的 param 值,就没机会得到处理了。
下面我们来验证一下。主要是验证 param 这个临时变量的内存地址在每次循环中是否是同一个,可以在打印语句中把 ¶m
的值打印出来,如下:
for param := range ch {
fmt.Printf("get param: %+v\n", ¶m)
g.Go(func() error {
return dealParam(param)
})
}
我一次运行的结果如下:
可以看出,临时变量 param 确实是同一块内存地址,不断覆盖。这里额外说一点,我们的函数 dealParam
里接收到的 param 参数,是复制了一个传进来的,并不是原来的内存地址。
4. 解决方案
知道了原因,解决就比较简单了,只要在 g.Go()
前,把 param 临时变量复制一份出来,把复制的变量传到子协程里就行了,因为复制的变量不会被覆盖,所以就避免了这个问题,代码如下:
for param := range ch {
// 复制param,防止param的值被后续的新值覆盖
tempParam := param
fmt.Printf("get param: %+v\n", tempParam)
g.Go(func() error {
return dealParam(tempParam)
})
}
再次运行,就可以发现没有上面的重复处理参数和漏处理参数的问题了:
这里还有个小细节:管道里传递的是参数对象的指针,而不是参数对象本身,这样在复制 param 时,就可以值复制一个指针,而不是复制一个对象,减少复制的开销。
当然,只有使用 errgroup 时才会有这个问题(下面会讲到),所以也可以直接使用 go
关键字起子协程,不用 errgroup,不过就需要自己做协程间的同步了。
5. 扩展问题
5.1 普通循环是否有问题
上面的 for 循环是从管道里取数据,如果直接对一个列表进行遍历,还会不会有这种情况呢?我们来实验一下,代码如下:
func test2() {
params := []*Param{}
for i := 0; i < 100; i++ {
params = append(params, &Param{Id: int64(i)})
}
gCtx, _ := context.WithTimeout(context.Background(), 1*time.Hour)
g, gCtx := errgroup.WithContext(gCtx)
for _, param := range params {
g.Go(func() error {
return dealParam(param)
})
}
if err := g.Wait(); err != nil {
fmt.Printf("err: %+v\n", err)
}
}
我一次运行的结果如下:
可以发现,问题更加严重,因为主协程执行更快,会有更多子协程拿到同样的 param 的值。所以,可以确定,这是 fo 语言里 for 循环实现的特性,和管道无关。只要是 for 循环创建协程,并传入了变量,就需要注意变量并发读写的问题。
5.2 go 关键字是否有问题
上面的例子中,我们都使用了 errgroup 的 API,如果不用 errgroup,直接 go
出子协程呢?还会有这种问题吗?我们来实验一下,代码如下:
func test3() {
params := []*Param{}
for i := 0; i < 100; i++ {
params = append(params, &Param{Id: int64(i)})
}
for _, param := range params {
go dealParam(param)
}
time.Sleep(10 * time.Second)
}
这里为了方便,直接在主协程里休眠了 10 秒,确保子协程都执行完,实际上应该用锁来实现协程间同步。运行之后会发现,没有上面的问题,可以合理猜测:使用 go dealParam(param)
这句代码时,复制一个新参数传给 dealParam()
的动作也是主协程里做的,所以没有并发读写 param
的问题,而用 errgroup 的 API g.Go()
时,复制一个新参数传给 dealParam()
的动作是在子协程里做的,所以会产生并发读写 param
的问题。
5.3 Java 是否有同样的问题
我写 Java 的时间更长,但好像从来没注意过这种问题,是有过这样的 bug 但我没注意,还是 Java 里就不存在这样的问题呢?我们来验证一下,代码如下:
import lombok.Builder;
import lombok.Data;
import java.util.concurrent.CountDownLatch;
public class Test {
public static void main(String[] args) throws InterruptedException {
Param[] params = new Param[1000];
for (int i = 0; i < 1000; i++) {
params[i] = Param.builder().id(i).build();
}
CountDownLatch countDownLatch = new CountDownLatch(1000);
for (Param param : params) {
// System.identityHashCode可以打印对象的内存地址
System.out.println("get param: " + System.identityHashCode(param));
Worker worker = new Worker(param, countDownLatch);
new Thread(worker).start();
}
countDownLatch.await();
}
}
@Data
@Builder
class Param {
private Integer id;
}
class Worker implements Runnable {
private Param param;
private CountDownLatch countDownLatch;
public Worker(Param param, CountDownLatch countDownLatch) {
this.param = param;
this.countDownLatch = countDownLatch;
}
@Override
public void run() {
System.out.println("deal param: " + param.toString());
countDownLatch.countDown();
}
}
大家如果运行一下会发现,并没有上面的问题,而且打印出来的 param 临时变量的地址并不是同一个,可以合理猜测:Java 的 for 循环并不会创建一个临时变量接收循环到的值,而是直接用原对象做处理。
欢迎来到这里!
我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。
注册 关于