Golang之流式编程( 二 )
<- group}close(source) }() return Range(source)}Reversereverse可以对流中元素进行反转处理:
文章插图
// 例子fx.Just(1, 2, 3, 4, 5).Reverse().ForEach(func(item interface{}) {fmt.Println(item)})// 源码func (p Stream) Reverse() Stream { var items []interface{}// 获取流中数据 for item := range p.source {items = append(items, item) } // 反转算法 for i := len(items)/2 - 1; i >= 0; i-- {opp := len(items) - 1 - iitems[i], items[opp] = items[opp], items[i] }// 写入流 return Just(items...)}
Distinctdistinct对流中元素进行去重 , 去重在业务开发中比较常用 , 经常需要对用户id等做去重操作:
// 例子fx.Just(1, 2, 2, 2, 3, 3, 4, 5, 6).Distinct(func(item interface{}) interface{} {return item}).ForEach(func(item interface{}) {fmt.Println(item)})// 结果为 1 , 2 , 3 , 4 , 5 , 6// 源码func (p Stream) Distinct(fn KeyFunc) Stream { source := make(chan interface{}) threading.GoSafe(func() {defer close(source)// 通过key进行去重 , 相同key只保留一个keys := make(map[interface{}]lang.PlaceholderType)for item := range p.source {key := fn(item)// key存在则不保留if _, ok := keys[key]; !ok {source <- itemkeys[key] = lang.Placeholder}} }) return Range(source)}
WalkWalk函数并发的作用在流中每一个item上 , 可以通过WithWorkers设置并发数 , 默认并发数为16 , 最小并发数为1 , 如设置unlimitedWorkers为true则并发数无限制 , 但并发写入流中的数据由defaultWorkers限制 , WalkFunc中用户可以自定义后续写入流中的元素 , 可以不写入也可以写入多个元素:
// 例子fx.Just("aaa", "bbb", "ccc").Walk(func(item interface{}, pipe chan<- interface{}) {newItem := strings.ToUpper(item.(string))pipe <- newItem}).ForEach(func(item interface{}) {fmt.Println(item)})// 源码func (p Stream) walkLimited(fn WalkFunc, option *rxOptions) Stream { pipe := make(chan interface{}, option.workers) go func() {var wg sync.WaitGrouppool := make(chan lang.PlaceholderType, option.workers)for {// 控制并发数量pool <- lang.Placeholderitem, ok := <-p.sourceif !ok {<-poolbreak}wg.Add(1)go func() {defer func() {wg.Done()<-pool}()// 作用在每个元素上fn(item, pipe)}()}// 等待处理完成wg.Wait()close(pipe) }() return Range(pipe)}
并发处理fx工具除了进行流数据处理以外还提供了函数并发功能 , 在微服务中实现某个功能往往需要依赖多个服务 , 并发的处理依赖可以有效的降低依赖耗时 , 提升服务的性能 。
文章插图
fx.Parallel(func() {userRPC() // 依赖1}, func() {accountRPC() // 依赖2}, func() {orderRPC() // 依赖3})
注意fx.Parallel进行依赖并行处理的时候不会有error返回 , 如需有error返回或者有一个依赖报错需要立马结束依赖请求请使用MapReduce工具进行处理 。
总结【Golang之流式编程】本篇文章介绍了流处理的基本概念和gozero中的流处理工具fx , 在实际的生产中流处理场景应用也非常多 , 希望本篇文章能给大家带来一定的启发 , 更好的应对工作中的流处理场景 。
- 纠结|硬杠红米Note9Pro?iQOO Z1跌至1575,对比之后纠结了!
- 王兴称美团优选目前重点是建设核心能力;苏宁旗下云网万店融资60亿元;阿里小米拟增资居然之家|8点1氪 | 美团
- 长安|长安傍上华为这个大腿,市值暴涨500亿!可见华为影响力之大?
- 巅峰|realme巅峰之作:120Hz+陶瓷机身+5000mAh 做到了颜值与性能并存
- 蛋壳公寓|官媒发声:绝不能让“割韭菜者”一跑了之!
- 看过明年的iPhone之后,现在下手的都哭了
- 直播销售员|石家庄桥西区插上“互联网+”智慧发展之翼
- 精英|业务流程图怎么绘制?销售精英的经验之谈
- 砍单|iPhone12之后,拼多多又将iPhone12Pro拉下水
- 报名啦!宿迁开展第五届“十大科技之星”评选