Golang之流式编程( 二 )

<- group}close(source) }() return Range(source)}Reversereverse可以对流中元素进行反转处理:
Golang之流式编程文章插图
// 例子fx.Just(1, 2, 3, 4, 5).Reverse().ForEach(func(item interface{}) {fmt.Println(item)})// 源码func (p Stream) Reverse() Stream { var items []interface{}// 获取流中数据 for item := range p.source {items = append(items, item) } // 反转算法 for i := len(items)/2 - 1; i >= 0; i-- {opp := len(items) - 1 - iitems[i], items[opp] = items[opp], items[i] }// 写入流 return Just(items...)}Distinctdistinct对流中元素进行去重 , 去重在业务开发中比较常用 , 经常需要对用户id等做去重操作:
// 例子fx.Just(1, 2, 2, 2, 3, 3, 4, 5, 6).Distinct(func(item interface{}) interface{} {return item}).ForEach(func(item interface{}) {fmt.Println(item)})// 结果为 1 , 2 , 3 , 4 , 5 , 6// 源码func (p Stream) Distinct(fn KeyFunc) Stream { source := make(chan interface{}) threading.GoSafe(func() {defer close(source)// 通过key进行去重 , 相同key只保留一个keys := make(map[interface{}]lang.PlaceholderType)for item := range p.source {key := fn(item)// key存在则不保留if _, ok := keys[key]; !ok {source <- itemkeys[key] = lang.Placeholder}} }) return Range(source)}WalkWalk函数并发的作用在流中每一个item上 , 可以通过WithWorkers设置并发数 , 默认并发数为16 , 最小并发数为1 , 如设置unlimitedWorkers为true则并发数无限制 , 但并发写入流中的数据由defaultWorkers限制 , WalkFunc中用户可以自定义后续写入流中的元素 , 可以不写入也可以写入多个元素:
// 例子fx.Just("aaa", "bbb", "ccc").Walk(func(item interface{}, pipe chan<- interface{}) {newItem := strings.ToUpper(item.(string))pipe <- newItem}).ForEach(func(item interface{}) {fmt.Println(item)})// 源码func (p Stream) walkLimited(fn WalkFunc, option *rxOptions) Stream { pipe := make(chan interface{}, option.workers) go func() {var wg sync.WaitGrouppool := make(chan lang.PlaceholderType, option.workers)for {// 控制并发数量pool <- lang.Placeholderitem, ok := <-p.sourceif !ok {<-poolbreak}wg.Add(1)go func() {defer func() {wg.Done()<-pool}()// 作用在每个元素上fn(item, pipe)}()}// 等待处理完成wg.Wait()close(pipe) }() return Range(pipe)}并发处理fx工具除了进行流数据处理以外还提供了函数并发功能 , 在微服务中实现某个功能往往需要依赖多个服务 , 并发的处理依赖可以有效的降低依赖耗时 , 提升服务的性能 。
Golang之流式编程文章插图
fx.Parallel(func() {userRPC() // 依赖1}, func() {accountRPC() // 依赖2}, func() {orderRPC() // 依赖3})注意fx.Parallel进行依赖并行处理的时候不会有error返回 , 如需有error返回或者有一个依赖报错需要立马结束依赖请求请使用MapReduce工具进行处理 。
总结【Golang之流式编程】本篇文章介绍了流处理的基本概念和gozero中的流处理工具fx , 在实际的生产中流处理场景应用也非常多 , 希望本篇文章能给大家带来一定的启发 , 更好的应对工作中的流处理场景 。