好奇想试下能否实现聚合一些数据然后再处理,情况是:

  1. 有多个生产者在生产数据;
  2. 消费者尽量批量处理,比如一个批次是 10;
  3. 有个最小处理时间间隔, 比如生产者有 20ms 没有添加任务了,需要把积累的数据(如有 6 个)推到 out chan 中; 如果没有积累的数据就继续空转;

基本实现:

func (c *Core) Start(ctx context.Context) error {
	outItem := make([]interface{}, 0)
	for {
		select {
		case <-ctx.Done():
			return ctx.Err()
		case <-c.Ticker.C:
			// 最大间隔
			if len(outItem) == 0 {
				// 没有数据就继续空转
				continue
			}
			// 有数据尝试发送到 out chan 中
			select {
			case c.out <- outItem:
				outItem = make([]interface{}, 0)
				atomic.AddInt64(&c.outCounter, 1)
			default:
				// out chan 满了,继续空转
				continue
			}
		case item := <-c.in:
			// 从外部生产者写入的 in chan 读数据
			outItem = append(outItem, item)
			atomic.AddInt64(&c.inCounter, -1)
		}

		// 积累的数据量达到了定义好的一次批次,尝试发出去
		if len(outItem) >= int(c.BatchSize) {
			select {
			case c.out <- outItem:
				outItem = make([]interface{}, 0)
				atomic.AddInt64(&c.outCounter, 1)
			default:
				// out chan 满了,继续循转
				continue
			}
		}
	}
}

功能比较简陋,代码在 https://github.com/ringsaturn/valve

使用 demo 如下:

// https://github.com/ringsaturn/valve/blob/main/example/main.go
package main

import (
	"context"
	"log"
	"time"

	"github.com/ringsaturn/valve"
	"golang.org/x/sync/errgroup"
)

type Task struct {
	Time time.Time
}

func Producer(valveCore *valve.Core, producerFreq time.Duration, timeout time.Duration) error {
	ticker := time.NewTicker(producerFreq)
	for {
		select {
		case <-ticker.C:
			addFunc := func() error {
				ctx, cancel := context.WithTimeout(context.TODO(), timeout)
				defer cancel()
				return valveCore.Add(ctx, Task{Time: time.Now()})
			}

			if err := addFunc(); err != nil {
				return err
			}
		}
	}
}

func Consumer(valveCore *valve.Core, ioTime time.Duration) error {
	out, err := valveCore.Receive()
	if err != nil {
		return err
	}
	for {
		select {
		case batchItem := <-out:
			valveCore.DoneInCounter()
			log.Println("batchItem", valveCore.GetInCounter(), batchItem)
			// mock IO
			time.Sleep(ioTime)
		}
		log.Println("lag", valveCore.GetInCounter(), valveCore.GetOutinCounter())
	}
}

func main() {
	initCtx, cancel := context.WithCancel(context.Background())
	defer cancel()

	valveCore, err := valve.NewCore(time.NewTicker(100*time.Millisecond), 100, 100, 2)
	if err != nil {
		panic(err)
	}

	group, groupCtx := errgroup.WithContext(initCtx)

	producerCount := 5
	startWorkerCount := 5
	consumerCount := 3

	for i := 0; i < producerCount; i++ {
		group.Go(func() error {
			return Producer(valveCore, time.Millisecond, 10*time.Millisecond)
		})
	}

	for i := 0; i < startWorkerCount; i++ {
		group.Go(func() error {
			return valveCore.Start(groupCtx)
		})
	}

	for i := 0; i < consumerCount; i++ {
		group.Go(func() error {
			return Consumer(valveCore, 100*time.Millisecond)
		})
	}

	panic(group.Wait())
}