三、go-kit 与 grpc 结合实现注册发现与负载均衡

本贴最后更新于 2434 天前,其中的信息可能已经沧海桑田

介绍

grpc 提供了简单的负载均衡,需要自己实现服务发现 resolve。我们既然要使用 go-kit 来治理微服务,那么我们就使用 go-kit 的注册发现、负载均衡机制。

go-kit 官方【stringsvc3】例子中使用的负载均衡方案是通过服务端转发进行,翻找下源码 go-kit 的服务注册发现、负载均衡在【sd】包中。下面我们介绍怎么通过 go-kit 进行客户端负载均衡。

go-kit 提供的注册中心

1、 etcd

2、 consul

3、 eureka

4、 zookeeper

go-kit 提供的负载均衡

1、 random[随机]

2、 roundRobin[轮询]

只需实现 Balancer 接口,我们可以很容易的增加其它负载均衡机制

type Balancer interface {
	Endpoint() (endpoint.Endpoint, error)
}

etcd 注册发现

etcd 和 zookeeper 类似是一个高可用、强一致性的存储仓库,拥有服务发现功能。 我们就通过 go-kit 提供的 etcd 包来实现服务注册发现

服务端代码

服务注册

1、连接注册中心

2、注册当前服务

var (
	//etcd服务地址
	etcdServer = "127.0.0.1:2379"
	//服务的信息目录
	prefix = "/services/book/"
	//当前启动服务实例的地址
	instance = "127.0.0.1:50052"
	//服务实例注册的路径
	key = prefix + instance
	//服务实例注册的val
	value = instance
	ctx   = context.Background()
	//服务监听地址
	serviceAddress = ":50052"
)

// etcd的连接参数
options := etcdv3.ClientOptions{
	DialTimeout:   time.Second * 3,
	DialKeepAlive: time.Second * 3,
}
// 创建etcd连接
client, err := etcdv3.NewClient(ctx, []string{etcdServer}, options)
if err != nil {
	panic(err)
}

// 创建注册器
registrar := etcdv3.NewRegistrar(client, etcdv3.Service{
	Key:   key,
	Value: value,
}, log.NewNopLogger())

// 注册器启动注册
registrar.Register()

完整代码

package main

import (
	"context"
	"github.com/go-kit/kit/endpoint"
	"github.com/go-kit/kit/log"
	"github.com/go-kit/kit/sd/etcdv3"
	grpc_transport "github.com/go-kit/kit/transport/grpc"
	"google.golang.org/grpc"
	"grpc-test/pb"
	"net"
	"time"
)

type BookServer struct {
	bookListHandler grpc_transport.Handler
	bookInfoHandler grpc_transport.Handler
}

// 通过grpc调用GetBookInfo时,GetBookInfo只做数据透传, 调用BookServer中对应Handler.ServeGRPC转交给go-kit处理
func (s *BookServer) GetBookInfo(ctx context.Context, in *book.BookInfoParams) (*book.BookInfo, error) {
	_, rsp, err := s.bookInfoHandler.ServeGRPC(ctx, in)
	if err != nil {
		return nil, err
	}
	return rsp.(*book.BookInfo), err
}

// 通过grpc调用GetBookList时,GetBookList只做数据透传, 调用BookServer中对应Handler.ServeGRPC转交给go-kit处理
func (s *BookServer) GetBookList(ctx context.Context, in *book.BookListParams) (*book.BookList, error) {
	_, rsp, err := s.bookListHandler.ServeGRPC(ctx, in)
	if err != nil {
		return nil, err
	}
	return rsp.(*book.BookList), err
}

// 创建bookList的EndPoint
func makeGetBookListEndpoint() endpoint.Endpoint {
	return func(ctx context.Context, request interface{}) (interface{}, error) {
		//请求列表时返回 书籍列表
		bl := new(book.BookList)
		bl.BookList = append(bl.BookList, &book.BookInfo{BookId: 1, BookName: "21天精通php"})
		bl.BookList = append(bl.BookList, &book.BookInfo{BookId: 2, BookName: "21天精通java"})
		return bl, nil
	}
}

// 创建bookInfo的EndPoint
func makeGetBookInfoEndpoint() endpoint.Endpoint {
	return func(ctx context.Context, request interface{}) (interface{}, error) {
		//请求详情时返回 书籍信息
		req := request.(*book.BookInfoParams)
		b := new(book.BookInfo)
		b.BookId = req.BookId
		b.BookName = "21天精通php"
		return b, nil
	}
}

func decodeRequest(_ context.Context, req interface{}) (interface{}, error) {
	return req, nil
}

func encodeResponse(_ context.Context, rsp interface{}) (interface{}, error) {
	return rsp, nil
}

func main() {
	var (
		//etcd服务地址
		etcdServer = "127.0.0.1:2379"
		//服务的信息目录
		prefix = "/services/book/"
		//当前启动服务实例的地址
		instance = "127.0.0.1:50052"
		//服务实例注册的路径
		key = prefix + instance
		//服务实例注册的val
		value = instance
		ctx   = context.Background()
		//服务监听地址
		serviceAddress = ":50052"
	)

	//etcd的连接参数
	options := etcdv3.ClientOptions{
		DialTimeout:   time.Second * 3,
		DialKeepAlive: time.Second * 3,
	}
	//创建etcd连接
	client, err := etcdv3.NewClient(ctx, []string{etcdServer}, options)
	if err != nil {
		panic(err)
	}

	// 创建注册器
	registrar := etcdv3.NewRegistrar(client, etcdv3.Service{
		Key:   key,
		Value: value,
	}, log.NewNopLogger())

	// 注册器启动注册
	registrar.Register()

	bookServer := new(BookServer)
	bookListHandler := grpc_transport.NewServer(
		makeGetBookListEndpoint(),
		decodeRequest,
		encodeResponse,
	)
	bookServer.bookListHandler = bookListHandler

	bookInfoHandler := grpc_transport.NewServer(
		makeGetBookInfoEndpoint(),
		decodeRequest,
		encodeResponse,
	)
	bookServer.bookInfoHandler = bookInfoHandler

	ls, _ := net.Listen("tcp", serviceAddress)
	gs := grpc.NewServer(grpc.UnaryInterceptor(grpc_transport.Interceptor))
	book.RegisterBookServiceServer(gs, bookServer)
	gs.Serve(ls)
}

客户端代码

客户端流程

1、 连接注册中心

2、 获取提供的服务

3、 监听服务目录变化,目录变化更新本地缓存

4、 创建负载均衡器

5、 获取请求的 endPoint

完整代码

package main

import (
	"context"
	"fmt"
	"github.com/go-kit/kit/endpoint"
	"github.com/go-kit/kit/log"
	"github.com/go-kit/kit/sd"
	"github.com/go-kit/kit/sd/etcdv3"
	"github.com/go-kit/kit/sd/lb"
	"google.golang.org/grpc"
	"grpc-test/pb"
	"io"
	"time"
)

func main() {

	var (
		//注册中心地址
		etcdServer = "127.0.0.1:2379"
		//监听的服务前缀
		prefix = "/services/book/"
		ctx    = context.Background()
	)
	options := etcdv3.ClientOptions{
		DialTimeout:   time.Second * 3,
		DialKeepAlive: time.Second * 3,
	}
	//连接注册中心
	client, err := etcdv3.NewClient(ctx, []string{etcdServer}, options)
	if err != nil {
		panic(err)
	}
	logger := log.NewNopLogger()
	//创建实例管理器, 此管理器会Watch监听etc中prefix的目录变化更新缓存的服务实例数据
	instancer, err := etcdv3.NewInstancer(client, prefix, logger)
	if err != nil {
		panic(err)
	}
	//创建端点管理器, 此管理器根据Factory和监听的到实例创建endPoint并订阅instancer的变化动态更新Factory创建的endPoint
	endpointer := sd.NewEndpointer(instancer, reqFactory, logger)
	//创建负载均衡器
	balancer := lb.NewRoundRobin(endpointer)

	/**
	  我们可以通过负载均衡器直接获取请求的endPoint,发起请求
	  reqEndPoint,_ := balancer.Endpoint()
	*/

	/**
	  也可以通过retry定义尝试次数进行请求
	*/
	reqEndPoint := lb.Retry(3, 3*time.Second, balancer)

	//现在我们可以通过 endPoint 发起请求了
	req := struct{}{}
	if _, err = reqEndPoint(ctx, req); err != nil {
		panic(err)
	}
}

// 通过传入的 实例地址  创建对应的请求endPoint
func reqFactory(instanceAddr string) (endpoint.Endpoint, io.Closer, error) {
	return func(ctx context.Context, request interface{}) (interface{}, error) {
		fmt.Println("请求服务: ", instanceAddr)
		conn, err := grpc.Dial(instanceAddr, grpc.WithInsecure())
		if err != nil {
			fmt.Println(err)
			panic("connect error")
		}
		defer conn.Close()
		bookClient := book.NewBookServiceClient(conn)
		bi, _ := bookClient.GetBookInfo(context.Background(), &book.BookInfoParams{BookId: 1})
		fmt.Println("获取书籍详情")
		fmt.Println("bookId: 1", " => ", "bookName:", bi.BookName)

		bl, _ := bookClient.GetBookList(context.Background(), &book.BookListParams{Page: 1, Limit: 10})
		fmt.Println("获取书籍列表")
		for _, b := range bl.BookList {
			fmt.Println("bookId:", b.BookId, " => ", "bookName:", b.BookName)
		}
		return nil, nil
	}, nil, nil
}

测试

请求测试

请求服务: 127.0.0.1:50052

获取书籍详情
bookId: 1  =>  bookName: 21天精通php
获取书籍列表
bookId: 1  =>  bookName: 21天精通php
bookId: 2  =>  bookName: 21天精通java

负载均衡测试

1、 修改 server 的注册监听端口,启动多个 server

instance = "127.0.0.1:50052"
serviceAddress = ":50052"

2、client 发起多次请求

req := struct{}{}
for i := 1; i <= 8; i++ {
	if _, err = reqEndPoint(ctx, req); err != nil {
		panic(err)
	}
}

通过返回结果中记录的请求地址,我们可以看到已经按照轮询的方式请求不同的微服务实例。

请求服务:  127.0.0.1:50051

        获取书籍详情
        bookId: 1  =>  bookName: 21天精通php
        获取书籍列表
        bookId: 1  =>  bookName: 21天精通php
        bookId: 2  =>  bookName: 21天精通java
请求服务:  127.0.0.1:50052
        获取书籍详情
        bookId: 1  =>  bookName: 21天精通php
        获取书籍列表
        bookId: 1  =>  bookName: 21天精通php
        bookId: 2  =>  bookName: 21天精通java
请求服务:  127.0.0.1:50051
        获取书籍详情
        bookId: 1  =>  bookName: 21天精通php
        获取书籍列表
        bookId: 1  =>  bookName: 21天精通php
        bookId: 2  =>  bookName: 21天精通java
请求服务:  127.0.0.1:50052
        获取书籍详情
        bookId: 1  =>  bookName: 21天精通php
        获取书籍列表
        bookId: 1  =>  bookName: 21天精通php
        bookId: 2  =>  bookName: 21天精通java
请求服务:  127.0.0.1:50051
        获取书籍详情
        bookId: 1  =>  bookName: 21天精通php
        获取书籍列表
        bookId: 1  =>  bookName: 21天精通php
        bookId: 2  =>  bookName: 21天精通java
请求服务:  127.0.0.1:50052
        获取书籍详情
        bookId: 1  =>  bookName: 21天精通php
        获取书籍列表
        bookId: 1  =>  bookName: 21天精通php
        bookId: 2  =>  bookName: 21天精通java
请求服务:  127.0.0.1:50051
        获取书籍详情
        bookId: 1  =>  bookName: 21天精通php
        获取书籍列表
        bookId: 1  =>  bookName: 21天精通php
        bookId: 2  =>  bookName: 21天精通java
请求服务:  127.0.0.1:50052
        获取书籍详情
        bookId: 1  =>  bookName: 21天精通php
        获取书籍列表
        bookId: 1  =>  bookName: 21天精通php
        bookId: 2  =>  bookName: 21天精通java

Process finished with exit code 0
  • golang

    Go 语言是 Google 推出的一种全新的编程语言,可以在不损失应用程序性能的情况下降低代码的复杂性。谷歌首席软件工程师罗布派克(Rob Pike)说:我们之所以开发 Go,是因为过去 10 多年间软件开发的难度令人沮丧。Go 是谷歌 2009 发布的第二款编程语言。

    497 引用 • 1388 回帖 • 278 关注
  • gRpc
    11 引用 • 9 回帖 • 71 关注

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...
  • kwame

    grpc-test/pb 文件呢?

  • 其他回帖
  • kwame

    grpc-test/pb 文件呢?😄 找到了,看前面一贴