稳定性平台的设计与实现( 三 )


func (lbrl *LeakyBucketRateLimiter) leak() {tickCh := time.Tick(lbrl.getTick())OUTER:for {select {case <-lbrl.stopCh: // stoppedbreak OUTERcase <-lbrl.changeCh: // rate limiter modifiednewTick := lbrl.getTick()tickCh = time.Tick(newTick)case <-tickCh:select {case waiterCh := <-lbrl.ch:waiterCh <- struct{}{}default:// pass}}}}func (lbrl *LeakyBucketRateLimiter) Limit() error {ch := make(chan struct{}, 1)lbrl.ch <- ch<-chreturn nil}func (lbrl *LeakyBucketRateLimiter) Close() {lbrl.stopCh <- struct{}{}}func (lbrl *LeakyBucketRateLimiter) ChangeQpsThreshold(newQpsThreshold int64) {atomic.StoreInt64( --tt-darkmode-bgcolor: #131313;">否决式限流器对于否决式限流器 , 我们选择通过滑动窗口来实现 , 并与熔断器复用同一个滑动窗口实现 。
类似地 , 限流器持有一个滑动窗口 。 滑动窗口拥有 10 个 cell , 每个 cell 负责 0.1s 时长 , 限流器统计这 1s 的请求数量 , 若未超出 qps 阈值 , 则通过 , 若超出则返回 error 。
否决式限流器的实现是非常简单的 , 无庸赘述 。
熔断器与否决式限流器都用到了滑动窗口 , 这里简单说一下滑动窗口的实现 。
一个滑动窗口 SlidingWindow 由若干个「格子」(Cell) 组成 , Cell 的数量及 Cell 的时长 , 在创建 SlidingWindow 时指定 。 大体逻辑是 , 自 epoch 以来 , 依据 Cell 的时长 , 将时间轴切成一个个的段 , 对应到 Cell 。 每个 Cell 记录了其开始时间并持有一个用于计数的 map 。
SlidingWindow 是 lazy 的 , 只有当访问某个 Cell 时 , 才会根据当前时间 , Cell 的开始时间及 Cell 的时长 , 来确定 Cell 是否过期 。 若过期则先进行重置 。 lazy 的 SlidingWindow , 使得我们不需要使用单独的 go routine 不时地更新 SlidingWindow 的内容 , 进而使得 SlidingWindow 内部不需要使用锁 。 另外 , SlidingWindow 中需要使用「当前时间」时也全由参数传入 , 尽量减少了 SlidingWindow 内部状态 , 使得其更易于测试 。
外围 API外围 API 只是对核心 API 的一层封装 , 目的是方便使用 。
以熔断器为例 , 我们定义了一个 Registry interface , 其有一个实现 EtcdRegistry。 模块中提供了 Init() 函数来创建一个 EtcdRegistry 实例 , 并赋值给私有全局 Registry 实例 。Init() 要求传入服务的标识 , 这样就能加载管理后台中各服务的配置了 。 模块的另一公开 API 是 func Do(ctx context.Context, name string, run runFunc, fallback fallbackFunc) error,name 指定调用的服务及接口名 , runFunc 为正常逻辑 , fallbackFunc 为兜底逻辑 。 这与 hystrix-go 对外暴露的接口是一致的 。Do() 内部 , 会根据 name 查到相应的熔断器 , 并调用熔断器的逻辑类似的 Do() 方法 。EtcdRegistry 持有多个 CircuitBreaker, 其在创建时 , 会开启一个 go routine , 监听 etcd 中配置的变化 , 如有变化 , 则调用相应 CircuitBreaker 的方法以修改其配置 。
这里使用全局变量 , 也是为了方便使用 。 使用者只须调用 Init() 和 Do() 函数即可 , 无须自己创建 EtcdRegistry 并监听 etcd 。 暴露的 API 尽量少 , 其他都作为实现细节隐藏 。 这样的话 , 与服务框架整合也更加方便 , 需要考虑的细节也少了很多 。
核心代码:
type Registry interface {// should be concurrency-safeGet(name string) *CircuitBreaker}func Init(servGroup, servName string) error {etcdRegistry, err := NewEtcdRegistry(servGroup, servName)if err != nil {return err}go func() {etcdRegistry.Watch()}()return InitWithRegistry(etcdRegistry)}func InitWithRegistry(newRegistry Registry) error {registry = newRegistryreturn nil}var (registry Registry = nil)func Do(ctx context.Context, name string, run runFunc, fallback fallbackFunc) error {if registry == nil {return ErrCircuitBreakerRegistryNotInited}cb := registry.Get(name)if cb == nil {return run(ctx)}return cb.Do(ctx, run, fallback)}限流器的外围 API 与之类似 , 无庸赘述 。
上线后的效果上线之后 , 查看相应服务的监控指标发现 , Go 协程数和堆对象数都有相当幅度的下降 , 甚至有些服务各方面的指标都提升了 , CPU 使用率、内存、响应时间都下降了 , 并且响应时间更加平稳 , 毛刺明显减少 。 如图:
上线当天(上线时间为 2020/08/19 11:40 左右):
稳定性平台的设计与实现文章插图
时间拉长到上线前一天及上线后两天:
稳定性平台的设计与实现文章插图
可以看到 , 即使运行几天之后 , 效果仍然明显 。
我们的服务框架 , 之前已集成了 hystrix-go 的熔断器 , 但没有限流器 。 升级之后 , 替换成了我们自己实现的熔断器与限流器 。
上线前后的不同之处在于 , 用我们的 SDK 取代了 hystrix-go 。 另外则是加上了限流器 。 当然限流器只可能增加各方面的开销 , 而不是减少(被观察的那个服务没有触发限流 , 不会因限流而减少服务的负载) 。 最终的结论是 , 我们的 SDK 性能方面优于 hystrix-go 。 至少从 go routine 方面看 , hystrix-go 每次调用至少开启两个 go routine , 而且使用了不少 channel 之类的同步设施 。 而我们的熔断器实现 , 只有全局一个 go routine 。 因此 go routine 及相关的开销是大小减少了的 。 另外 , 我们的熔断器只使用了锁这一种同步设施 , 没有使用 channel , 可能也能省掉一些开销 。