介绍
go-kit 提供了三种熔断
1、 gobreaker
2、 handy
3、 hystrix-go
hystrix 在 java 中用的比较多,我们来介绍下 go-kit 中 hystrix 的使用方法
go-kit 的 hystrix
Middleware 的实现
1、 Hystrix 返回 Middleware 此中间件会在原来的 endPoint 包一层 Hystrix 的 endPoint
2、 hystrix 通过传入的 commanName 获取对应的 Hystrix 的设置,并设置 run 失败时运行的 fallback 函数为 nil
3、 我们也可以自己实现 middleware 包装 endPoint
func Hystrix(commandName string) endpoint.Middleware {
return func(next endpoint.Endpoint) endpoint.Endpoint {
return func(ctx context.Context, request interface{}) (response interface{}, err error) {
var resp interface{}
if err := hystrix.Do(commandName, func() (err error) {
resp, err = next(ctx, request)
return err
}, nil); err != nil {
return nil, err
}
return resp, nil
}
}
}
客户端 hystrix 配置
1、Timeout 【请求超时的时间】
2、ErrorPercentThreshold【允许出现的错误比例】
3、SleepWindow【熔断开启多久尝试发起一次请求】
4、MaxConcurrentRequests【允许的最大并发请求数】
5、RequestVolumeThreshold 【波动期内的最小请求数,默认波动期 10S】
commandName := "my-endpoint"
hystrix.ConfigureCommand(commandName, hystrix.CommandConfig{
Timeout: 1000 * 30,
ErrorPercentThreshold: 1,
SleepWindow: 10000,
MaxConcurrentRequests: 1000,
RequestVolumeThreshold: 5,
})
增加熔断中间件的包装
breakerMw := circuitbreaker.Hystrix(commandName)
// 增加熔断中间件
reqEndPoint = breakerMw(reqEndPoint)
完整代码
1、 关闭微服务的 service
2、 在之前我们的客户端代码增加熔断配置
3、 增加循环请求代码
package main
import (
"context"
"fmt"
"github.com/afex/hystrix-go/hystrix"
"github.com/go-kit/kit/circuitbreaker"
"github.com/go-kit/kit/endpoint"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/sd"
"github.com/go-kit/kit/sd/etcdv3"
"github.com/go-kit/kit/sd/lb"
"google.golang.org/grpc"
"grpc-test/pb"
"io"
"time"
)
func main() {
commandName := "my-endpoint"
hystrix.ConfigureCommand(commandName, hystrix.CommandConfig{
Timeout: 1000 * 30,
ErrorPercentThreshold: 1,
SleepWindow: 10000,
MaxConcurrentRequests: 1000,
RequestVolumeThreshold: 5,
})
breakerMw := circuitbreaker.Hystrix(commandName)
var (
//注册中心地址
etcdServer = "127.0.0.1:2379"
//监听的服务前缀
prefix = "/services/book/"
ctx = context.Background()
)
options := etcdv3.ClientOptions{
DialTimeout: time.Second * 3,
DialKeepAlive: time.Second * 3,
}
//连接注册中心
client, err := etcdv3.NewClient(ctx, []string{etcdServer}, options)
if err != nil {
panic(err)
}
logger := log.NewNopLogger()
//创建实例管理器, 此管理器会Watch监听etc中prefix的目录变化更新缓存的服务实例数据
instancer, err := etcdv3.NewInstancer(client, prefix, logger)
if err != nil {
panic(err)
}
//创建端点管理器, 此管理器根据Factory和监听的到实例创建endPoint并订阅instancer的变化动态更新Factory创建的endPoint
endpointer := sd.NewEndpointer(instancer, reqFactory, logger)
//创建负载均衡器
balancer := lb.NewRoundRobin(endpointer)
/**
我们可以通过负载均衡器直接获取请求的endPoint,发起请求
reqEndPoint,_ := balancer.Endpoint()
*/
/**
也可以通过retry定义尝试次数进行请求
*/
reqEndPoint := lb.Retry(3, 100*time.Second, balancer)
//增加熔断中间件
reqEndPoint = breakerMw(reqEndPoint)
//现在我们可以通过 endPoint 发起请求了
req := struct{}{}
for i := 1; i <= 20; i++ {
if _, err = reqEndPoint(ctx, req); err != nil {
fmt.Println("当前时间: ", time.Now().Format("2006-01-02 15:04:05.99"))
fmt.Println(err)
time.sleep(1 * time.Second)
}
}
}
// 通过传入的 实例地址 创建对应的请求endPoint
func reqFactory(instanceAddr string) (endpoint.Endpoint, io.Closer, error) {
return func(ctx context.Context, request interface{}) (interface{}, error) {
fmt.Println("请求服务: ", instanceAddr, "当前时间: ", time.Now().Format("2006-01-02 15:04:05.99"))
conn, err := grpc.Dial(instanceAddr, grpc.WithInsecure())
if err != nil {
fmt.Println(err)
panic("connect error")
}
defer conn.Close()
bookClient := book.NewBookServiceClient(conn)
bi, err := bookClient.GetBookInfo(context.Background(), &book.BookInfoParams{BookId: 1})
fmt.Println(bi)
fmt.Println(" ", "获取书籍详情")
fmt.Println(" ", "bookId: 1", " => ", "bookName:", bi.BookName)
return nil, nil
}, nil, nil
}
输出记录
当前时间: 2018-05-02 16:10:19.71
no endpoints available (previously: no endpoints available; no endpoints available)
当前时间: 2018-05-02 16:10:20.71
no endpoints available (previously: no endpoints available; no endpoints available)
当前时间: 2018-05-02 16:10:21.71
no endpoints available (previously: no endpoints available; no endpoints available)
当前时间: 2018-05-02 16:10:22.71
no endpoints available (previously: no endpoints available; no endpoints available)
当前时间: 2018-05-02 16:10:23.71
no endpoints available (previously: no endpoints available; no endpoints available)
2018/05/02 16:10:24 hystrix-go: opening circuit my-endpoint
当前时间: 2018-05-02 16:10:24.71
hystrix: circuit open
当前时间: 2018-05-02 16:10:25.71
hystrix: circuit open
当前时间: 2018-05-02 16:10:26.71
hystrix: circuit open
当前时间: 2018-05-02 16:10:27.71
hystrix: circuit open
当前时间: 2018-05-02 16:10:28.71
hystrix: circuit open
当前时间: 2018-05-02 16:10:29.71
hystrix: circuit open
当前时间: 2018-05-02 16:10:30.71
hystrix: circuit open
当前时间: 2018-05-02 16:10:31.71
hystrix: circuit open
当前时间: 2018-05-02 16:10:32.71
hystrix: circuit open
当前时间: 2018-05-02 16:10:33.71
hystrix: circuit open
当前时间: 2018-05-02 16:10:34.71
no endpoints available (previously: no endpoints available; no endpoints available)
2018/05/02 16:10:34 hystrix-go: allowing single test to possibly close circuit my-endpoint
当前时间: 2018-05-02 16:10:35.71
hystrix: circuit open
当前时间: 2018-05-02 16:10:36.71
hystrix: circuit open
当前时间: 2018-05-02 16:10:37.71
hystrix: circuit open
当前时间: 2018-05-02 16:10:38.71
hystrix: circuit open
Process finished with exit code 0
通过上面的输出记录可以验证我们的配置:
1、 前 5 条波动期内的错误,没有触发 circuit 开启
2、 circuit 开启后请求熔断生效
3、 circuit 开启 10S 后,SleepWindow 测试发起请求设置生效
欢迎来到这里!
我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。
注册 关于