go 语言中 for 循环的并发安全问题

本贴最后更新于 1527 天前,其中的信息可能已经水流花落

hello,大家好,欢迎来到银之庭。我是 Z,一个普通的程序员。今天我们来看一个我在工作中刚发现的 go 语言里 for 循环的一个问题。

1. 结论

先说结论,用尽可能简练的语言描述就是:在 go 语言中,用 for 循环创建子协程,用 errgroup 管理这些协程,并会向子协程中传递参数的情况下,有可能产生并发安全问题,需要复制一下循环中的临时变量传给子协程,而不能直接把临时变量传过去。

2. 现象

在某个 go 语言开发的工作项目中,我实现了个小需求,代码逻辑抽象出来大概是这样:

package main

import (
	"context"
	"fmt"
	"golang.org/x/sync/errgroup"
	"time"
)

type Param struct {
	Id int64
}

func test1() {
	// 传递参数的管道
	ch := make(chan *Param)

	// 创建errgroup,便于实现主协程等待子协程的功能
	gCtx, _ := context.WithTimeout(context.Background(), 1*time.Hour)
	g, gCtx := errgroup.WithContext(gCtx)

	// 起子协程往管道里不断传递参数
	g.Go(func() error {
		return send(ch)
	})

	// 在主协程里从管道中取出参数,然后对每个参数起一个子协程去处理
	for param := range ch {
		fmt.Printf("get param: %+v\n", param)
		g.Go(func() error {
			return dealParam(param)
		})
	}

	// 等待所有子协程处理完毕
	if err := g.Wait(); err != nil {
		fmt.Printf("err: %+v\n", err)
	}

}

func send(ch chan *Param) error {
	for i := 1; ; i++ {
		p := Param{Id: int64(i)}
		ch <- &p
	}
}

func dealParam(param *Param) error {
	fmt.Printf("deal param: %+v\n", param)

	return nil
}

业务逻辑在代码中都已经注释出了,这里为了方便演示,直接死循环生产参数流,实际的业务代码里参数流是有限的。在从管道中接到参数和处理参数的地方我打了日志,尝试运行一下后通过看这两处的日志可以发现问题。我的一次运行的输入如下:

image.png

从上面红框里的内容可以很明显地看到,id 为 71648 的参数从管道中获取了一次,但却被两个子协程处理了,而且再仔细观察会发现,71646 和 71648 都被处理了两次,而 71647 明明从管道中取出来了,却一次都没被处理。可能有些小伙伴已经猜到原因了,这不是重复执行的问题,而是主协程和子协程并发读写了某个变量,导致子协程读取到了被覆盖后的内容。

3. 本质

问题实际出在 for 循环这里:

for param := range ch {
	fmt.Printf("get param: %+v\n", param)
	g.Go(func() error {
		return dealParam(param)
	})
}

根据现象可以想象到,go 语言对 for 循环的实现是:

  1. 开辟出一块内存空间,作为临时变量 param;
  2. 每次从管道中取到一个值,覆盖到 param 这块内存空间里。

g.Go() 函数内部是起了个子协程去运行我们传入的函数的,这样一来,在我们的函数运行时,param 的值如果已经被后续的值覆盖了,那么前后两个子协程就拿到相同的 param 的值了,而之前那个没来得及处理的 param 值,就没机会得到处理了。

下面我们来验证一下。主要是验证 param 这个临时变量的内存地址在每次循环中是否是同一个,可以在打印语句中把 &param 的值打印出来,如下:

for param := range ch {
	fmt.Printf("get param: %+v\n", &param)
	g.Go(func() error {
		return dealParam(param)
	})
}

我一次运行的结果如下:

image.png

可以看出,临时变量 param 确实是同一块内存地址,不断覆盖。这里额外说一点,我们的函数 dealParam 里接收到的 param 参数,是复制了一个传进来的,并不是原来的内存地址。

4. 解决方案

知道了原因,解决就比较简单了,只要在 g.Go() 前,把 param 临时变量复制一份出来,把复制的变量传到子协程里就行了,因为复制的变量不会被覆盖,所以就避免了这个问题,代码如下:

for param := range ch {
	// 复制param,防止param的值被后续的新值覆盖
	tempParam := param
	fmt.Printf("get param: %+v\n", tempParam)
	g.Go(func() error {
		return dealParam(tempParam)
	})
}

再次运行,就可以发现没有上面的重复处理参数和漏处理参数的问题了:

image.png

这里还有个小细节:管道里传递的是参数对象的指针,而不是参数对象本身,这样在复制 param 时,就可以值复制一个指针,而不是复制一个对象,减少复制的开销。

当然,只有使用 errgroup 时才会有这个问题(下面会讲到),所以也可以直接使用 go 关键字起子协程,不用 errgroup,不过就需要自己做协程间的同步了。

5. 扩展问题

5.1 普通循环是否有问题

上面的 for 循环是从管道里取数据,如果直接对一个列表进行遍历,还会不会有这种情况呢?我们来实验一下,代码如下:

func test2() {
	params := []*Param{}
	for i := 0; i < 100; i++ {
		params = append(params, &Param{Id: int64(i)})
	}

	gCtx, _ := context.WithTimeout(context.Background(), 1*time.Hour)
	g, gCtx := errgroup.WithContext(gCtx)

	for _, param := range params {
		g.Go(func() error {
			return dealParam(param)
		})
	}

	if err := g.Wait(); err != nil {
		fmt.Printf("err: %+v\n", err)
	}
}

我一次运行的结果如下:

image.png

可以发现,问题更加严重,因为主协程执行更快,会有更多子协程拿到同样的 param 的值。所以,可以确定,这是 fo 语言里 for 循环实现的特性,和管道无关只要是 for 循环创建协程,并传入了变量,就需要注意变量并发读写的问题

5.2 go 关键字是否有问题

上面的例子中,我们都使用了 errgroup 的 API,如果不用 errgroup,直接 go 出子协程呢?还会有这种问题吗?我们来实验一下,代码如下:

func test3() {
	params := []*Param{}
	for i := 0; i < 100; i++ {
		params = append(params, &Param{Id: int64(i)})
	}

	for _, param := range params {
		go dealParam(param)
	}

	time.Sleep(10 * time.Second)
}

这里为了方便,直接在主协程里休眠了 10 秒,确保子协程都执行完,实际上应该用锁来实现协程间同步。运行之后会发现,没有上面的问题,可以合理猜测:使用 go dealParam(param) 这句代码时,复制一个新参数传给 dealParam() 的动作也是主协程里做的,所以没有并发读写 param 的问题,而用 errgroup 的 API g.Go() 时,复制一个新参数传给 dealParam() 的动作是在子协程里做的,所以会产生并发读写 param 的问题

5.3 Java 是否有同样的问题

我写 Java 的时间更长,但好像从来没注意过这种问题,是有过这样的 bug 但我没注意,还是 Java 里就不存在这样的问题呢?我们来验证一下,代码如下:

import lombok.Builder;
import lombok.Data;

import java.util.concurrent.CountDownLatch;

public class Test {

    public static void main(String[] args) throws InterruptedException {
        Param[] params = new Param[1000];
        for (int i = 0; i < 1000; i++) {
            params[i] = Param.builder().id(i).build();
        }

        CountDownLatch countDownLatch = new CountDownLatch(1000);
        for (Param param : params) {
	    // System.identityHashCode可以打印对象的内存地址
            System.out.println("get param: " + System.identityHashCode(param));
            Worker worker = new Worker(param, countDownLatch);
            new Thread(worker).start();
        }

        countDownLatch.await();
    }
}

@Data
@Builder
class Param {
    private Integer id;
}

class Worker implements Runnable {

    private Param param;

    private CountDownLatch countDownLatch;

    public Worker(Param param, CountDownLatch countDownLatch) {
        this.param = param;
        this.countDownLatch = countDownLatch;
    }

    @Override
    public void run() {
        System.out.println("deal param: " + param.toString());
        countDownLatch.countDown();
    }
}

大家如果运行一下会发现,并没有上面的问题,而且打印出来的 param 临时变量的地址并不是同一个,可以合理猜测:Java 的 for 循环并不会创建一个临时变量接收循环到的值,而是直接用原对象做处理

  • golang

    Go 语言是 Google 推出的一种全新的编程语言,可以在不损失应用程序性能的情况下降低代码的复杂性。谷歌首席软件工程师罗布派克(Rob Pike)说:我们之所以开发 Go,是因为过去 10 多年间软件开发的难度令人沮丧。Go 是谷歌 2009 发布的第二款编程语言。

    497 引用 • 1388 回帖 • 279 关注
  • 并发
    75 引用 • 73 回帖 • 1 关注

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...