六、go-kit 微服务请求跟踪介绍

本贴最后更新于 2566 天前,其中的信息可能已经东海扬尘

介绍

go-kit 提供了两种 tracing 请求跟踪

1、opentracing【跟踪标准】

2、zipkin【zipkin 的 go 封装】
我们下面来介绍下 zipkin 在 go-kit 中的使用方法。

zipkin 安装启动

1、 java -version 现在安装 zipkin,必须使用 java8(即 java-1.8.0-openjdk)

2、 wget -O zipkin.jar 'https://search.maven.org/remote_content?g=io.zipkin.java&a=zipkin-server&v=LATEST&c=exec'

3、 java -jar zipkin.jar

哈哈, 只是大概介绍下 zipkin 的安装 出现问题需要自己解决了。

go-kit 的 zipkin

服务端 trace

//创建zipkin上报管理器 reporter := http.NewReporter("http://localhost:9411/api/v2/spans") // 运行结束,关闭上报管理器的for-select协程 defer reporter.Close() // 创建trace跟踪器 zkTracer, err := opzipkin.NewTracer(reporter) // 添加grpc请求的before after finalizer 事件对应要处理的trace操作方法 zkServerTrace := zipkin.GRPCServerTrace(zkTracer) // 通过options的方式运行trace bookListHandler := grpctransport.NewServer( bookListEndPoint, decodeRequest, encodeResponse, zkServerTrace, )

完整代码

我们还在之前的代码中加入,trace 的代码

package main import ( "grpc-test/pb" "context" grpctransport "github.com/go-kit/kit/transport/grpc" "github.com/go-kit/kit/endpoint" "google.golang.org/grpc" "net" "github.com/go-kit/kit/sd/etcdv3" "github.com/go-kit/kit/log" "time" "golang.org/x/time/rate" "github.com/go-kit/kit/ratelimit" opzipkin "github.com/openzipkin/zipkin-go" "github.com/openzipkin/zipkin-go/reporter/http" "github.com/go-kit/kit/tracing/zipkin" "math/rand" ) type BookServer struct { bookListHandler grpctransport.Handler bookInfoHandler grpctransport.Handler } //通过grpc调用GetBookInfo时,GetBookInfo只做数据透传, 调用BookServer中对应Handler.ServeGRPC转交给go-kit处理 func (s *BookServer) GetBookInfo(ctx context.Context, in *book.BookInfoParams) (*book.BookInfo, error) { _, rsp, err := s.bookInfoHandler.ServeGRPC(ctx, in) if err != nil { return nil, err } return rsp.(*book.BookInfo),err } //通过grpc调用GetBookList时,GetBookList只做数据透传, 调用BookServer中对应Handler.ServeGRPC转交给go-kit处理 func (s *BookServer) GetBookList(ctx context.Context, in *book.BookListParams) (*book.BookList, error) { _, rsp, err := s.bookListHandler.ServeGRPC(ctx, in) if err != nil { return nil, err } return rsp.(*book.BookList),err } //创建bookList的EndPoint func makeGetBookListEndpoint() endpoint.Endpoint { return func(ctx context.Context, request interface{}) (interface{}, error) { rand.Seed(time.Now().Unix()) randInt := rand.Int63n(200) time.Sleep( time.Duration(randInt) * time.Millisecond) //请求列表时返回 书籍列表 bl := new(book.BookList) bl.BookList = append(bl.BookList, &book.BookInfo{BookId:1,BookName:"21天精通php"}) bl.BookList = append(bl.BookList, &book.BookInfo{BookId:2,BookName:"21天精通java"}) return bl,nil } } //创建bookInfo的EndPoint func makeGetBookInfoEndpoint() endpoint.Endpoint { return func(ctx context.Context, request interface{}) (interface{}, error) { rand.Seed(time.Now().Unix()) randInt := rand.Int63n(200) time.Sleep( time.Duration(randInt) * time.Microsecond) //请求详情时返回 书籍信息 req := request.(*book.BookInfoParams) b := new(book.BookInfo) b.BookId = req.BookId b.BookName = "21天精通php" return b,nil } } func decodeRequest(_ context.Context, req interface{}) (interface{}, error) { return req, nil } func encodeResponse(_ context.Context, rsp interface{}) (interface{}, error) { return rsp, nil } func main() { var ( //etcd服务地址 etcdServer = "127.0.0.1:2379" //服务的信息目录 prefix = "/services/book/" //当前启动服务实例的地址 instance = "127.0.0.1:50051" //服务实例注册的路径 key = prefix + instance //服务实例注册的val value = instance ctx = context.Background() //服务监听地址 serviceAddress = ":50051" ) //etcd的连接参数 options := etcdv3.ClientOptions{ DialTimeout: time.Second * 3, DialKeepAlive: time.Second * 3, } //创建etcd连接 client, err := etcdv3.NewClient(ctx, []string{etcdServer}, options) if err != nil { panic(err) } // 创建注册器 registrar := etcdv3.NewRegistrar(client, etcdv3.Service{ Key: key, Value: value, }, log.NewNopLogger()) // 注册器启动注册 registrar.Register() reporter := http.NewReporter("http://localhost:9411/api/v2/spans") defer reporter.Close() zkTracer, err := opzipkin.NewTracer(reporter) zkServerTrace := zipkin.GRPCServerTrace(zkTracer) bookServer := new(BookServer) bookListEndPoint := makeGetBookListEndpoint() //创建限流器 1r/s limiter := rate.NewLimiter(rate.Every(time.Second * 1), 100000) //通过DelayingLimiter中间件,在bookListEndPoint的外层再包裹一层限流的endPoint bookListEndPoint = ratelimit.NewDelayingLimiter(limiter)(bookListEndPoint) bookListHandler := grpctransport.NewServer( bookListEndPoint, decodeRequest, encodeResponse, zkServerTrace, ) bookServer.bookListHandler = bookListHandler bookInfoEndPoint := makeGetBookInfoEndpoint() //通过DelayingLimiter中间件,在bookListEndPoint的外层再包裹一层限流的endPoint bookInfoEndPoint = ratelimit.NewDelayingLimiter(limiter)(bookInfoEndPoint) bookInfoHandler := grpctransport.NewServer( bookInfoEndPoint, decodeRequest, encodeResponse, zkServerTrace, ) bookServer.bookInfoHandler = bookInfoHandler ls, _ := net.Listen("tcp", serviceAddress) gs := grpc.NewServer(grpc.UnaryInterceptor(grpctransport.Interceptor)) book.RegisterBookServiceServer(gs, bookServer) gs.Serve(ls) }

客户端 trace

首先改造请求方式,之前我们是通过 grpc 的 idl 生成的代码直接发起的请求。我们要改成 go-kit endPoint 的方式,这样利于增加其它 middleware 或 options 扩展功能,通过 grpctransport.NewClient().Endpoint()可以获取到请求的 endPoint

原来的代码

//通过传入的 实例地址 创建对应的请求endPoint func reqFactory(instanceAddr string) (endpoint.Endpoint, io.Closer, error) { return func(ctx context.Context, request interface{}) (interface{}, error) { fmt.Println("请求服务: ", instanceAddr, "当前时间: ", time.Now().Format("2006-01-02 15:04:05.99")) conn, err := grpc.Dial(instanceAddr, grpc.WithInsecure()) if err != nil { fmt.Println(err) panic("connect error") } defer conn.Close() bookClient := book.NewBookServiceClient(conn) bi, err := bookClient.GetBookInfo(context.Background(), &book.BookInfoParams{BookId: 1}) fmt.Println(bi) fmt.Println(" ", "获取书籍详情") fmt.Println(" ", "bookId: 1", " => ", "bookName:", bi.BookName) return nil, nil }, nil, nil }

改造后

//通过传入的 实例地址 创建对应的请求endPoint func reqFactory(instanceAddr string) (endpoint.Endpoint, io.Closer, error) { return func(ctx context.Context, request interface{}) (interface{}, error) { fmt.Println("请求服务: ", instanceAddr, "当前时间: ", time.Now().Format("2006-01-02 15:04:05.99")) conn, err := grpc.Dial(instanceAddr, grpc.WithInsecure()) if err != nil { fmt.Println(err) panic("connect error") } bookInfoRequest := grpctransport.NewClient( conn, "BookService", "GetBookInfo", func(_ context.Context, in interface{}) (interface{}, error) { return nil, nil }, func(_ context.Context, out interface{}) (interface{}, error) { return out, nil }, book.BookInfo{}, ).Endpoint() bookListRequest := grpctransport.NewClient( conn, "BookService", "GetBookList", func(_ context.Context, in interface{}) (interface{}, error) { return nil, nil }, func(_ context.Context, out interface{}) (interface{}, error) { return out, nil }, book.BookList{}, ).Endpoint() infoRet, _ := bookInfoRequest(ctx, request) bi := infoRet.(*book.BookInfo) fmt.Println("获取书籍详情") fmt.Println("bookId: 1", " => ", "bookName:", bi.BookName) listRet, _ := bookListRequest(ctx, request) bl := listRet.(*book.BookList) fmt.Println("获取书籍列表") for _, b := range bl.BookList { fmt.Println("bookId:", b.BookId, " => ", "bookName:", b.BookName) } return nil, nil }, nil, nil }

增加 trace 代码

与服务端 trace 的区别在于 kitzipkin.GRPCClientTrace

reporter := http.NewReporter("http://localhost:9411/api/v2/spans") defer reporter.Close() zkTracer, err := opzipkin.NewTracer(reporter) zkClientTrace := zipkin.GRPCClientTrace(zkTracer)

可以通过 span 组装 span 结构树

parentSpan := zkTracer.StartSpan("bookCaller") defer parentSpan.Flush() ctx = opzipkin.NewContext(context.Background(), parentSpan)

完整代码

package main import ( "context" "fmt" "github.com/afex/hystrix-go/hystrix" "github.com/go-kit/kit/circuitbreaker" "github.com/go-kit/kit/endpoint" "github.com/go-kit/kit/log" "github.com/go-kit/kit/sd" "github.com/go-kit/kit/sd/etcdv3" "github.com/go-kit/kit/sd/lb" "github.com/go-kit/kit/tracing/zipkin" grpctransport "github.com/go-kit/kit/transport/grpc" opzipkin "github.com/openzipkin/zipkin-go" "github.com/openzipkin/zipkin-go/reporter/http" "google.golang.org/grpc" "grpc-test/pb" "io" "time" ) func main() { commandName := "my-endpoint" hystrix.ConfigureCommand(commandName, hystrix.CommandConfig{ Timeout: 1000 * 30, ErrorPercentThreshold: 1, SleepWindow: 10000, MaxConcurrentRequests: 1000, RequestVolumeThreshold: 5, }) breakerMw := circuitbreaker.Hystrix(commandName) var ( //注册中心地址 etcdServer = "127.0.0.1:2379" //监听的服务前缀 prefix = "/services/book/" ctx = context.Background() ) options := etcdv3.ClientOptions{ DialTimeout: time.Second * 3, DialKeepAlive: time.Second * 3, } //连接注册中心 client, err := etcdv3.NewClient(ctx, []string{etcdServer}, options) if err != nil { panic(err) } logger := log.NewNopLogger() //创建实例管理器, 此管理器会Watch监听etc中prefix的目录变化更新缓存的服务实例数据 instancer, err := etcdv3.NewInstancer(client, prefix, logger) if err != nil { panic(err) } //创建端点管理器, 此管理器根据Factory和监听的到实例创建endPoint并订阅instancer的变化动态更新Factory创建的endPoint endpointer := sd.NewEndpointer(instancer, reqFactory, logger) //创建负载均衡器 balancer := lb.NewRoundRobin(endpointer) /** 我们可以通过负载均衡器直接获取请求的endPoint,发起请求 reqEndPoint,_ := balancer.Endpoint() */ /** 也可以通过retry定义尝试次数进行请求 */ reqEndPoint := lb.Retry(3, 100*time.Second, balancer) //增加熔断中间件 reqEndPoint = breakerMw(reqEndPoint) //现在我们可以通过 endPoint 发起请求了 req := struct{}{} for i := 1; i <= 1; i++ { if _, err = reqEndPoint(ctx, req); err != nil { fmt.Println(err) } } } // 通过传入的 实例地址 创建对应的请求endPoint func reqFactory(instanceAddr string) (endpoint.Endpoint, io.Closer, error) { return func(ctx context.Context, request interface{}) (interface{}, error) { fmt.Println("请求服务: ", instanceAddr, "当前时间: ", time.Now().Format("2006-01-02 15:04:05.99")) conn, err := grpc.Dial(instanceAddr, grpc.WithInsecure()) if err != nil { fmt.Println(err) panic("connect error") } reporter := http.NewReporter("http://localhost:9411/api/v2/spans") defer reporter.Close() zkTracer, err := opzipkin.NewTracer(reporter) zkClientTrace := zipkin.GRPCClientTrace(zkTracer) bookInfoRequest := grpctransport.NewClient( conn, "BookService", "GetBookInfo", func(_ context.Context, in interface{}) (interface{}, error) { return nil, nil }, func(_ context.Context, out interface{}) (interface{}, error) { return out, nil }, book.BookInfo{}, zkClientTrace, ).Endpoint() bookListRequest := grpctransport.NewClient( conn, "BookService", "GetBookList", func(_ context.Context, in interface{}) (interface{}, error) { return nil, nil }, func(_ context.Context, out interface{}) (interface{}, error) { return out, nil }, book.BookList{}, zkClientTrace, ).Endpoint() parentSpan := zkTracer.StartSpan("bookCaller") defer parentSpan.Flush() ctx = opzipkin.NewContext(ctx, parentSpan) infoRet, _ := bookInfoRequest(ctx, request) bi := infoRet.(*book.BookInfo) fmt.Println("获取书籍详情") fmt.Println("bookId: 1", " => ", "bookName:", bi.BookName) listRet, _ := bookListRequest(ctx, request) bl := listRet.(*book.BookList) fmt.Println("获取书籍列表") for _, b := range bl.BookList { fmt.Println("bookId:", b.BookId, " => ", "bookName:", b.BookName) } return nil, nil }, nil, nil }

运行

请求服务: 127.0.0.1:50051 当前时间: 2018-05-04 10:34:55.91 获取书籍详情 bookId: 1 => bookName: 21天精通php 获取书籍列表 bookId: 1 => bookName: 21天精通php bookId: 2 => bookName: 21天精通java Process finished with exit code 0

访问 http://localhost:9411/zipkin/ 查看我们的 zipkin 中的记录
zkpng

总结

至此基于 go-kit 的功能基本都介绍了,log 模块和其它 middleware 差不多就不再描述。

  • golang

    Go 语言是 Google 推出的一种全新的编程语言,可以在不损失应用程序性能的情况下降低代码的复杂性。谷歌首席软件工程师罗布派克(Rob Pike)说:我们之所以开发 Go,是因为过去 10 多年间软件开发的难度令人沮丧。Go 是谷歌 2009 发布的第二款编程语言。

    499 引用 • 1395 回帖 • 247 关注
  • gRpc
    11 引用 • 9 回帖 • 91 关注

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...
  • wzy via macOS

    client 端 getBookInfo 中请求是如何设置 bookId=1 的?