微服务如何保证不会出现连锁反应?Go 实现的断路器了解下( 二 )

circuit.ClosedToOpentype ClosedToOpen interface {RunMetrics// 统计调用出现的各种情况Metrics// 统计状态切换的情况// 当出现ErrFailure和ErrTimeout的失败调用时 , 会调用ShouldOpen , ShouldOpen会根据RunMetrics信息决定是否切换到“打开”状态ShouldOpen(now time.Time) bool// 即使断路器处于“关闭”状态 , 也希望能阻止调用Prevent(now time.Time) bool}circuit/v3/closers/hystrix/opener.go 是 hystrix 的默认实现 , 实现了 circuit.ClosedToOpen 接口 。
type Opener struct {errorsCountfaststats.RollingCounter // 统计调用出现的ErrFailure和ErrTimeout的情况legitimateAttemptsCount faststats.RollingCounter // 统计调用出现的ErrFailure和ErrTimeout , 以及Success的情况errorPercentagefaststats.AtomicInt64 // 错误阈值requestVolumeThreshold faststats.AtomicInt64 // 如果在一段时间窗口内的调用次数小于该阈值 , 则不会将断路器切换到“打开”状态musync.Mutex // 修改config时的互斥锁config ConfigureOpener}// faststats.RollingCounter 是滑动窗口计数器 , 用于统计一段时间窗口内 , 对每个时间片上发生的事件进行计数circuit.OpenToClosedtype OpenToClosed interface {RunMetrics// 统计调用出现的各种情况Metrics// 统计状态切换的情况// 当调用成功时 , 会调用ShouldClose , ShouldClose会根据RunMetrics信息决定是否切换到“关闭”状态ShouldClose(now time.Time) bool// “半打开”状态的实现 , 用于在断路器处于“打开”状态时 , 允许部分调用执行Allow(now time.Time) bool}circuit/v3/closers/hystrix/closer.go 是 hystrix 的默认实现 , 实现了 circuit.OpenToClosed 接口 。
type Closer struct {// 当断路器处于“打开”状态时 , 定时放行部分调用 , 被OpenToClosed.Allow调用reopenCircuitCheck faststats.TimedCheckconcurrentSuccessfulAttempts faststats.AtomicInt64 // 当调用为Success时 , 加1 , 当调用为ErrFailure、ErrTimeout时 , 重置为0closeOnCurrentCountfaststats.AtomicInt64 // 切换到“关闭”状态的阈值 , 在OpenToClosed.ShouldClose中使用musync.Mutex // 修改config时的互斥锁config ConfigureCloser}cep21/circuit 实现的断路器模式有限状态机图解
微服务如何保证不会出现连锁反应?Go 实现的断路器了解下文章插图
异常处理func (c *Circuit) Execute(ctx context.Context, runFunc func(context.Context) error, fallbackFunc func(context.Context, error) error) error 方法返回的 error 可能的情况有:

  • runFunc 返回的 error , 包括 circuit.BadRequest
  • circuit 返回的 - 当状态为“打开”: circuit.circuitError{concurrencyLimitReached: true, msg: "circuit is open"} - 当并发请求超过阈值:- runFunc 并发超过阈值: circuit.circuitError{concurrencyLimitReached: true, msg: "throttling connections to command"} - fallbackFunc 并发超过阈值: circuit.circuitError{circuitOpen: true, msg: "throttling concurrency to fallbacks"}
  • fallbackFunc 返回的 error
  • nil
其中 fallbackFunc 的 error 参数可能为:
  • runFunc 返回的 error , 除了 circuit.BadRequest
  • circuit 返回的
    • runFunc 并发超过阈值: circuit.circuitError{concurrencyLimitReached: true, msg: "throttling connections to command"}
    • 当状态为“打开”: circuit.circuitError{concurrencyLimitReached: true, msg: "circuit is open"}
    • 当并发请求超过阈值:
  • nil
业务系统中的实际应用在我的业务系统中 ,OpenToClosed 接口的实现 , 不打算使用 hystrix 默认的进入“半打开”的逻辑:定时放行部分调用 。 因为这样可能会影响到上游的业务请求 , 并且在 fallbackFunc 中 , 我会去调用异地热备服务 。 所以进入“半打开”状态 , 我选择自己实现 OpenToClosed 接口 , 策略如下:
  • 当断路器进入“打开”状态时 , 启动下游服务健康检查定时器 , 通过模拟业务请求调用下游服务
    • 如果调用成功了 , 并且累积成功调用次数达到一定阈值 , 此时 OpenToClosed.Allow(now time.Time) bool 根据一定概率返回 true , 断路器进入“半打开”状态
    • 如果调用失败了 ,OpenToClosed.Allow(now time.Time) bool 返回 false
  • 当断路器进入“半打开”状态
    • 如果调用成功了 , 进行计数 , 达到阈值后 , 断路器进入“关闭”状态 , 并且停止下游健康检查定时器
    • 如果调用失败了 , 断路器回到“打开”状态
实现代码:
package circuitimport ("math/rand""sync""time""github.com/cep21/circuit""github.com/cep21/circuit/v3/faststats")type ConfigureCloser struct {CloseOnSuccessfulAttemptsCount int64ReopenHealthCheck*HealthCheck}type Closer struct {reopenHealthCheck *HealthChecksuccessfulAttemptsfaststats.AtomicInt64closeOnSuccessfulAttemptsCount int64}func NewCloser(config ConfigureCloser) circuit.OpenToClosed {return &Closer{reopenHealthCheck:config.ReopenHealthCheck,closeOnSuccessfulAttemptsCount: config.CloseOnSuccessfulAttemptsCount,}}// start health check when circuit is openedfunc (c *Closer) Opened(now time.Time) {c.reopenHealthCheck.start()}// stop health check when circuit is closedfunc (c *Closer) Closed(now time.Time) {c.reopenHealthCheck.stop()}// half-openfunc (c *Closer) Allow(now time.Time) bool {return c.reopenHealthCheck.ok()}func (c *Closer) Success(now time.Time, duration time.Duration) {c.successfulAttempts.Add(1)}func (c *Closer) ErrBadRequest(now time.Time, duration time.Duration) {}func (c *Closer) ErrInterrupt(now time.Time, duration time.Duration) {}func (c *Closer) ErrConcurrencyLimitReject(now time.Time) {}func (c *Closer) ErrShortCircuit(now time.Time) {}func (c *Closer) ErrFailure(now time.Time, duration time.Duration) {c.successfulAttempts.Set(0)c.reopenHealthCheck.reset()}func (c *Closer) ErrTimeout(now time.Time, duration time.Duration) {c.successfulAttempts.Set(0)c.reopenHealthCheck.reset()}func (c *Closer) ShouldClose(now time.Time) bool {return c.successfulAttempts.Get() > c.closeOnSuccessfulAttemptsCount}type ConfigureHealthCheck struct {TickDurationtime.DurationRunfunc() boolThresholdint64AllowProbability int}type HealthCheck struct {runningboolstopSignalCh chan struct{}countfaststats.AtomicInt64musync.MutexconfigConfigureHealthCheck}func NewHealthCheck(config ConfigureHealthCheck) *HealthCheck {return &HealthCheck{stopSignalCh: make(chan struct{}),config:config,}}func (h *HealthCheck) start() {h.mu.Lock()defer h.mu.Unlock()if h.running {return}h.running = trueh.count.Set(0)go func() {tick := time.Tick(h.config.TickDuration)for {select {case