Golang之流式编程

流处理(Stream processing)是一种计算机编程范式 , 其允许给定一个数据序列(流处理数据源) , 一系列数据操作(函数)被应用到流中的每个元素 。 同时流处理工具可以显著提高程序员的开发效率 , 允许他们编写有效、干净和简洁的代码 。
流数据处理在我们的日常工作中非常常见 , 举个例子 , 我们在业务开发中往往会记录许多业务日志 , 这些日志一般是先发送到Kafka , 然后再由Job消费Kafaka写到elasticsearch , 在进行日志流处理的过程中 , 往往还会对日志做一些处理 , 比如过滤无效的日志 , 做一些计算以及重新组合日志等等 , 示意图如下:
Golang之流式编程文章插图
流处理工具fxgozero是一个功能完备的微服务框架 , 框架中内置了很多非常实用的工具 , 其中就包含流数据处理工具fx , 下面我们通过一个简单的例子来认识下该工具:
package mainimport ( "fmt" "os" "os/signal" "syscall" "time" "github.com/tal-tech/go-zero/core/fx")func main() { ch := make(chan int) go inputStream(ch) go outputStream(ch) c := make(chan os.Signal, 1) signal.Notify(c, syscall.SIGTERM, syscall.SIGINT) <-c}func inputStream(ch chan int) { count := 0 for {ch <- counttime.Sleep(time.Millisecond * 500)count++ }}func outputStream(ch chan int) { fx.From(func(source chan<- interface{}) {for c := range ch {source <- c} }).Walk(func(item interface{}, pipe chan<- interface{}) {count := item.(int)pipe <- count }).Filter(func(item interface{}) bool {itemInt := item.(int)if itemInt%2 == 0 {return true}return false }).ForEach(func(item interface{}) {fmt.Println(item) })}inputStream函数模拟了流数据的产生 , outputStream函数模拟了流数据的处理过程 , 其中From函数为流的输入 , Walk函数并发的作用在每一个item上 , Filter函数对item进行过滤为true保留为false不保留 , ForEach函数遍历输出每一个item元素 。
流数据处理中间操作一个流的数据处理可能存在许多的中间操作 , 每个中间操作都可以作用在流上 。 就像流水线上的工人一样 , 每个工人操作完零件后都会返回处理完成的新零件 , 同理流处理中间操作完成后也会返回一个新的流 。
Golang之流式编程文章插图
fx的流处理中间操作:
操作函数功能输入Distinct去除重复的itemKeyFunc , 返回需要去重的keyFilter过滤不满足条件的itemFilterFunc , Option控制并发量Group对item进行分组KeyFunc , 以key进行分组Head取出前n个item , 返回新streamint64保留数量Map对象转换MapFunc , Option控制并发量Merge合并item到slice并生成新streamReverse反转itemSort对item进行排序LessFunc实现排序算法Tail与Head功能类似 , 取出后n个item组成新streamint64保留数量Walk作用在每个item上WalkFunc , Option控制并发量
下图展示了每个步骤和每个步骤的结果:
Golang之流式编程文章插图
用法与原理分析From通过From函数构建流并返回Stream , 流数据通过channel进行存储:
// 例子s := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 0}fx.From(func(source chan<- interface{}) {for _, v := range s {source <- v}})// 源码func From(generate GenerateFunc) Stream { source := make(chan interface{}) go func() {defer close(source)// 构造流数据写入channelgenerate(source) }() return Range(source)}FilterFilter函数提供过滤item的功能 , FilterFunc定义过滤逻辑true保留item , false则不保留:
// 例子 保留偶数s := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 0}fx.From(func(source chan<- interface{}) {for _, v := range s {source <- v}}).Filter(func(item interface{}) bool {if item.(int)%2 == 0 {return true}return false})// 源码func (p Stream) Filter(fn FilterFunc, opts ...Option) Stream { return p.Walk(func(item interface{}, pipe chan<- interface{}) {// 执行过滤函数true保留 , false丢弃if fn(item) {pipe <- item} }, opts...)}GroupGroup对流数据进行分组 , 需定义分组的key , 数据分组后以slice存入channel:
// 例子 按照首字符"g"或者"p"分组 , 没有则分到另一组ss := []string{"golang", "google", "php", "python", "java", "c++"}fx.From(func(source chan<- interface{}) {for _, s := range ss {source <- s} }).Group(func(item interface{}) interface{} {if strings.HasPrefix(item.(string), "g") {return "g"} else if strings.HasPrefix(item.(string), "p") {return "p"}return "" }).ForEach(func(item interface{}) {fmt.Println(item) })}// 源码func (p Stream) Group(fn KeyFunc) Stream {// 定义分组存储map groups := make(map[interface{}][]interface{}) for item := range p.source {// 用户自定义分组keykey := fn(item)// key相同分到一组groups[key] = append(groups[key], item) } source := make(chan interface{}) go func() {for _, group := range groups {// 相同key的一组数据写入到channelsource