Go 里聚合数据批量处理
好奇想试下能否实现聚合一些数据然后再处理,情况是:
- 有多个生产者在生产数据;
- 消费者尽量批量处理,比如一个批次是 10;
- 有个最小处理时间间隔, 比如生产者有 20ms 没有添加任务了,需要把积累的数据(如有 6 个)推到 out chan 中; 如果没有积累的数据就继续空转;
基本实现:
func (c *Core) Start(ctx context.Context) error {
outItem := make([]interface{}, 0)
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-c.Ticker.C:
// 最大间隔
if len(outItem) == 0 {
// 没有数据就继续空转
continue
}
// 有数据尝试发送到 out chan 中
select {
case c.out <- outItem:
outItem = make([]interface{}, 0)
atomic.AddInt64(&c.outCounter, 1)
default:
// out chan 满了,继续空转
continue
}
case item := <-c.in:
// 从外部生产者写入的 in chan 读数据
outItem = append(outItem, item)
atomic.AddInt64(&c.inCounter, -1)
}
// 积累的数据量达到了定义好的一次批次,尝试发出去
if len(outItem) >= int(c.BatchSize) {
select {
case c.out <- outItem:
outItem = make([]interface{}, 0)
atomic.AddInt64(&c.outCounter, 1)
default:
// out chan 满了,继续循转
continue
}
}
}
}
功能比较简陋,代码在 https://github.com/ringsaturn/valve。
使用 demo 如下:
// https://github.com/ringsaturn/valve/blob/main/example/main.go
package main
import (
"context"
"log"
"time"
"github.com/ringsaturn/valve"
"golang.org/x/sync/errgroup"
)
type Task struct {
Time time.Time
}
func Producer(valveCore *valve.Core, producerFreq time.Duration, timeout time.Duration) error {
ticker := time.NewTicker(producerFreq)
for {
select {
case <-ticker.C:
addFunc := func() error {
ctx, cancel := context.WithTimeout(context.TODO(), timeout)
defer cancel()
return valveCore.Add(ctx, Task{Time: time.Now()})
}
if err := addFunc(); err != nil {
return err
}
}
}
}
func Consumer(valveCore *valve.Core, ioTime time.Duration) error {
out, err := valveCore.Receive()
if err != nil {
return err
}
for {
select {
case batchItem := <-out:
valveCore.DoneInCounter()
log.Println("batchItem", valveCore.GetInCounter(), batchItem)
// mock IO
time.Sleep(ioTime)
}
log.Println("lag", valveCore.GetInCounter(), valveCore.GetOutinCounter())
}
}
func main() {
initCtx, cancel := context.WithCancel(context.Background())
defer cancel()
valveCore, err := valve.NewCore(time.NewTicker(100*time.Millisecond), 100, 100, 2)
if err != nil {
panic(err)
}
group, groupCtx := errgroup.WithContext(initCtx)
producerCount := 5
startWorkerCount := 5
consumerCount := 3
for i := 0; i < producerCount; i++ {
group.Go(func() error {
return Producer(valveCore, time.Millisecond, 10*time.Millisecond)
})
}
for i := 0; i < startWorkerCount; i++ {
group.Go(func() error {
return valveCore.Start(groupCtx)
})
}
for i := 0; i < consumerCount; i++ {
group.Go(func() error {
return Consumer(valveCore, 100*time.Millisecond)
})
}
panic(group.Wait())
}
Read other posts