微服务如何保证不会出现连锁反应?Go 实现的断路器了解下( 二 )
circuit.ClosedToOpentype ClosedToOpen interface {RunMetrics// 统计调用出现的各种情况Metrics// 统计状态切换的情况// 当出现ErrFailure和ErrTimeout的失败调用时 , 会调用ShouldOpen , ShouldOpen会根据RunMetrics信息决定是否切换到“打开”状态ShouldOpen(now time.Time) bool// 即使断路器处于“关闭”状态 , 也希望能阻止调用Prevent(now time.Time) bool}
circuit/v3/closers/hystrix/opener.go 是 hystrix 的默认实现 , 实现了 circuit.ClosedToOpen 接口 。
type Opener struct {errorsCountfaststats.RollingCounter // 统计调用出现的ErrFailure和ErrTimeout的情况legitimateAttemptsCount faststats.RollingCounter // 统计调用出现的ErrFailure和ErrTimeout , 以及Success的情况errorPercentagefaststats.AtomicInt64 // 错误阈值requestVolumeThreshold faststats.AtomicInt64 // 如果在一段时间窗口内的调用次数小于该阈值 , 则不会将断路器切换到“打开”状态musync.Mutex // 修改config时的互斥锁config ConfigureOpener}// faststats.RollingCounter 是滑动窗口计数器 , 用于统计一段时间窗口内 , 对每个时间片上发生的事件进行计数
circuit.OpenToClosedtype OpenToClosed interface {RunMetrics// 统计调用出现的各种情况Metrics// 统计状态切换的情况// 当调用成功时 , 会调用ShouldClose , ShouldClose会根据RunMetrics信息决定是否切换到“关闭”状态ShouldClose(now time.Time) bool// “半打开”状态的实现 , 用于在断路器处于“打开”状态时 , 允许部分调用执行Allow(now time.Time) bool}
circuit/v3/closers/hystrix/closer.go 是 hystrix 的默认实现 , 实现了 circuit.OpenToClosed 接口 。
type Closer struct {// 当断路器处于“打开”状态时 , 定时放行部分调用 , 被OpenToClosed.Allow调用reopenCircuitCheck faststats.TimedCheckconcurrentSuccessfulAttempts faststats.AtomicInt64 // 当调用为Success时 , 加1 , 当调用为ErrFailure、ErrTimeout时 , 重置为0closeOnCurrentCountfaststats.AtomicInt64 // 切换到“关闭”状态的阈值 , 在OpenToClosed.ShouldClose中使用musync.Mutex // 修改config时的互斥锁config ConfigureCloser}
cep21/circuit 实现的断路器模式有限状态机图解
文章插图
异常处理func (c *Circuit) Execute(ctx context.Context, runFunc func(context.Context) error, fallbackFunc func(context.Context, error) error) error 方法返回的 error 可能的情况有:
- runFunc 返回的 error , 包括 circuit.BadRequest
- circuit 返回的 - 当状态为“打开”: circuit.circuitError{concurrencyLimitReached: true, msg: "circuit is open"} - 当并发请求超过阈值:- runFunc 并发超过阈值: circuit.circuitError{concurrencyLimitReached: true, msg: "throttling connections to command"} - fallbackFunc 并发超过阈值: circuit.circuitError{circuitOpen: true, msg: "throttling concurrency to fallbacks"}
- fallbackFunc 返回的 error
- nil
- runFunc 返回的 error , 除了 circuit.BadRequest
- circuit 返回的
- runFunc 并发超过阈值: circuit.circuitError{concurrencyLimitReached: true, msg: "throttling connections to command"}
- 当状态为“打开”: circuit.circuitError{concurrencyLimitReached: true, msg: "circuit is open"}
- 当并发请求超过阈值:
- nil
- 当断路器进入“打开”状态时 , 启动下游服务健康检查定时器 , 通过模拟业务请求调用下游服务
- 如果调用成功了 , 并且累积成功调用次数达到一定阈值 , 此时 OpenToClosed.Allow(now time.Time) bool 根据一定概率返回 true , 断路器进入“半打开”状态
- 如果调用失败了 ,OpenToClosed.Allow(now time.Time) bool 返回 false
- 当断路器进入“半打开”状态
- 如果调用成功了 , 进行计数 , 达到阈值后 , 断路器进入“关闭”状态 , 并且停止下游健康检查定时器
- 如果调用失败了 , 断路器回到“打开”状态
package circuitimport ("math/rand""sync""time""github.com/cep21/circuit""github.com/cep21/circuit/v3/faststats")type ConfigureCloser struct {CloseOnSuccessfulAttemptsCount int64ReopenHealthCheck*HealthCheck}type Closer struct {reopenHealthCheck *HealthChecksuccessfulAttemptsfaststats.AtomicInt64closeOnSuccessfulAttemptsCount int64}func NewCloser(config ConfigureCloser) circuit.OpenToClosed {return &Closer{reopenHealthCheck:config.ReopenHealthCheck,closeOnSuccessfulAttemptsCount: config.CloseOnSuccessfulAttemptsCount,}}// start health check when circuit is openedfunc (c *Closer) Opened(now time.Time) {c.reopenHealthCheck.start()}// stop health check when circuit is closedfunc (c *Closer) Closed(now time.Time) {c.reopenHealthCheck.stop()}// half-openfunc (c *Closer) Allow(now time.Time) bool {return c.reopenHealthCheck.ok()}func (c *Closer) Success(now time.Time, duration time.Duration) {c.successfulAttempts.Add(1)}func (c *Closer) ErrBadRequest(now time.Time, duration time.Duration) {}func (c *Closer) ErrInterrupt(now time.Time, duration time.Duration) {}func (c *Closer) ErrConcurrencyLimitReject(now time.Time) {}func (c *Closer) ErrShortCircuit(now time.Time) {}func (c *Closer) ErrFailure(now time.Time, duration time.Duration) {c.successfulAttempts.Set(0)c.reopenHealthCheck.reset()}func (c *Closer) ErrTimeout(now time.Time, duration time.Duration) {c.successfulAttempts.Set(0)c.reopenHealthCheck.reset()}func (c *Closer) ShouldClose(now time.Time) bool {return c.successfulAttempts.Get() > c.closeOnSuccessfulAttemptsCount}type ConfigureHealthCheck struct {TickDurationtime.DurationRunfunc() boolThresholdint64AllowProbability int}type HealthCheck struct {runningboolstopSignalCh chan struct{}countfaststats.AtomicInt64musync.MutexconfigConfigureHealthCheck}func NewHealthCheck(config ConfigureHealthCheck) *HealthCheck {return &HealthCheck{stopSignalCh: make(chan struct{}),config:config,}}func (h *HealthCheck) start() {h.mu.Lock()defer h.mu.Unlock()if h.running {return}h.running = trueh.count.Set(0)go func() {tick := time.Tick(h.config.TickDuration)for {select {case
- 人民币|天猫国际新增“服务大类”,知舟集团提醒入驻这些类目的要注意
- 页面|如何简单、快速制作流程图?上班族的画图技巧get
- 培育|跨境电商人才如何培育,长沙有“谱”了
- 出海|出海日报丨短视频生产服务商小影科技完成近4亿元 C 轮融资;华为成为俄罗斯在线出售智能手机的第一品牌
- 抖音小店|抖音进军电商,短视频的商业模式与变现,创业者该如何抓住机遇?
- 计费|5G是如何计费的?
- 成为佛山移动服务体验官 表白留言赢取百元话费
- 车轮旋转|牵引力控制系统是如何工作的?它有什么作用?
- 正确|新昌消防丨听说,这才是微信新表情的正确打开方式
- 视频|短视频如何在前3秒吸引用户眼球?