介绍
go-kit 提供了三种熔断
1、 gobreaker
2、 handy
3、 hystrix-go
hystrix 在 java 中用的比较多,我们来介绍下 go-kit 中 hystrix 的使用方法
go-kit 的 hystrix
Middleware 的实现
1、 Hystrix 返回 Middleware 此中间件会在原来的 endPoint 包一层 Hystrix 的 endPoint
2、 hystrix 通过传入的 commanName 获取对应的 Hystrix 的设置,并设置 run 失败时运行的 fallback 函数为 nil
3、 我们也可以自己实现 middleware 包装 endPoint
func Hystrix(commandName string) endpoint.Middleware { return func(next endpoint.Endpoint) endpoint.Endpoint { return func(ctx context.Context, request interface{}) (response interface{}, err error) { var resp interface{} if err := hystrix.Do(commandName, func() (err error) { resp, err = next(ctx, request) return err }, nil); err != nil { return nil, err } return resp, nil } } }
客户端 hystrix 配置
1、Timeout 【请求超时的时间】
2、ErrorPercentThreshold【允许出现的错误比例】
3、SleepWindow【熔断开启多久尝试发起一次请求】
4、MaxConcurrentRequests【允许的最大并发请求数】
5、RequestVolumeThreshold 【波动期内的最小请求数,默认波动期 10S】
commandName := "my-endpoint" hystrix.ConfigureCommand(commandName, hystrix.CommandConfig{ Timeout: 1000 * 30, ErrorPercentThreshold: 1, SleepWindow: 10000, MaxConcurrentRequests: 1000, RequestVolumeThreshold: 5, })
增加熔断中间件的包装
breakerMw := circuitbreaker.Hystrix(commandName) // 增加熔断中间件 reqEndPoint = breakerMw(reqEndPoint)
完整代码
1、 关闭微服务的 service
2、 在之前我们的客户端代码增加熔断配置
3、 增加循环请求代码
package main import ( "context" "fmt" "github.com/afex/hystrix-go/hystrix" "github.com/go-kit/kit/circuitbreaker" "github.com/go-kit/kit/endpoint" "github.com/go-kit/kit/log" "github.com/go-kit/kit/sd" "github.com/go-kit/kit/sd/etcdv3" "github.com/go-kit/kit/sd/lb" "google.golang.org/grpc" "grpc-test/pb" "io" "time" ) func main() { commandName := "my-endpoint" hystrix.ConfigureCommand(commandName, hystrix.CommandConfig{ Timeout: 1000 * 30, ErrorPercentThreshold: 1, SleepWindow: 10000, MaxConcurrentRequests: 1000, RequestVolumeThreshold: 5, }) breakerMw := circuitbreaker.Hystrix(commandName) var ( //注册中心地址 etcdServer = "127.0.0.1:2379" //监听的服务前缀 prefix = "/services/book/" ctx = context.Background() ) options := etcdv3.ClientOptions{ DialTimeout: time.Second * 3, DialKeepAlive: time.Second * 3, } //连接注册中心 client, err := etcdv3.NewClient(ctx, []string{etcdServer}, options) if err != nil { panic(err) } logger := log.NewNopLogger() //创建实例管理器, 此管理器会Watch监听etc中prefix的目录变化更新缓存的服务实例数据 instancer, err := etcdv3.NewInstancer(client, prefix, logger) if err != nil { panic(err) } //创建端点管理器, 此管理器根据Factory和监听的到实例创建endPoint并订阅instancer的变化动态更新Factory创建的endPoint endpointer := sd.NewEndpointer(instancer, reqFactory, logger) //创建负载均衡器 balancer := lb.NewRoundRobin(endpointer) /** 我们可以通过负载均衡器直接获取请求的endPoint,发起请求 reqEndPoint,_ := balancer.Endpoint() */ /** 也可以通过retry定义尝试次数进行请求 */ reqEndPoint := lb.Retry(3, 100*time.Second, balancer) //增加熔断中间件 reqEndPoint = breakerMw(reqEndPoint) //现在我们可以通过 endPoint 发起请求了 req := struct{}{} for i := 1; i <= 20; i++ { if _, err = reqEndPoint(ctx, req); err != nil { fmt.Println("当前时间: ", time.Now().Format("2006-01-02 15:04:05.99")) fmt.Println(err) time.sleep(1 * time.Second) } } } // 通过传入的 实例地址 创建对应的请求endPoint func reqFactory(instanceAddr string) (endpoint.Endpoint, io.Closer, error) { return func(ctx context.Context, request interface{}) (interface{}, error) { fmt.Println("请求服务: ", instanceAddr, "当前时间: ", time.Now().Format("2006-01-02 15:04:05.99")) conn, err := grpc.Dial(instanceAddr, grpc.WithInsecure()) if err != nil { fmt.Println(err) panic("connect error") } defer conn.Close() bookClient := book.NewBookServiceClient(conn) bi, err := bookClient.GetBookInfo(context.Background(), &book.BookInfoParams{BookId: 1}) fmt.Println(bi) fmt.Println(" ", "获取书籍详情") fmt.Println(" ", "bookId: 1", " => ", "bookName:", bi.BookName) return nil, nil }, nil, nil }
输出记录
当前时间: 2018-05-02 16:10:19.71 no endpoints available (previously: no endpoints available; no endpoints available) 当前时间: 2018-05-02 16:10:20.71 no endpoints available (previously: no endpoints available; no endpoints available) 当前时间: 2018-05-02 16:10:21.71 no endpoints available (previously: no endpoints available; no endpoints available) 当前时间: 2018-05-02 16:10:22.71 no endpoints available (previously: no endpoints available; no endpoints available) 当前时间: 2018-05-02 16:10:23.71 no endpoints available (previously: no endpoints available; no endpoints available) 2018/05/02 16:10:24 hystrix-go: opening circuit my-endpoint 当前时间: 2018-05-02 16:10:24.71 hystrix: circuit open 当前时间: 2018-05-02 16:10:25.71 hystrix: circuit open 当前时间: 2018-05-02 16:10:26.71 hystrix: circuit open 当前时间: 2018-05-02 16:10:27.71 hystrix: circuit open 当前时间: 2018-05-02 16:10:28.71 hystrix: circuit open 当前时间: 2018-05-02 16:10:29.71 hystrix: circuit open 当前时间: 2018-05-02 16:10:30.71 hystrix: circuit open 当前时间: 2018-05-02 16:10:31.71 hystrix: circuit open 当前时间: 2018-05-02 16:10:32.71 hystrix: circuit open 当前时间: 2018-05-02 16:10:33.71 hystrix: circuit open 当前时间: 2018-05-02 16:10:34.71 no endpoints available (previously: no endpoints available; no endpoints available) 2018/05/02 16:10:34 hystrix-go: allowing single test to possibly close circuit my-endpoint 当前时间: 2018-05-02 16:10:35.71 hystrix: circuit open 当前时间: 2018-05-02 16:10:36.71 hystrix: circuit open 当前时间: 2018-05-02 16:10:37.71 hystrix: circuit open 当前时间: 2018-05-02 16:10:38.71 hystrix: circuit open Process finished with exit code 0
通过上面的输出记录可以验证我们的配置:
1、 前 5 条波动期内的错误,没有触发 circuit 开启
2、 circuit 开启后请求熔断生效
3、 circuit 开启 10S 后,SleepWindow 测试发起请求设置生效
欢迎来到这里!
我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。
注册 关于