好奇想试下能否实现聚合一些数据然后再处理,情况是:

  1. 有多个生产者在生产数据;
  2. 消费者尽量批量处理,比如一个批次是 10;
  3. 有个最小处理时间间隔, 比如生产者有 20ms 没有添加任务了,需要把积累的数据(如有 6 个)推到 out chan 中; 如果没有积累的数据就继续空转;

基本实现:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
func (c *Core) Start(ctx context.Context) error {
	outItem := make([]interface{}, 0)
	for {
		select {
		case <-ctx.Done():
			return ctx.Err()
		case <-c.Ticker.C:
			// 最大间隔
			if len(outItem) == 0 {
				// 没有数据就继续空转
				continue
			}
			// 有数据尝试发送到 out chan 中
			select {
			case c.out <- outItem:
				outItem = make([]interface{}, 0)
				atomic.AddInt64(&c.outCounter, 1)
			default:
				// out chan 满了,继续空转
				continue
			}
		case item := <-c.in:
			// 从外部生产者写入的 in chan 读数据
			outItem = append(outItem, item)
			atomic.AddInt64(&c.inCounter, -1)
		}

		// 积累的数据量达到了定义好的一次批次,尝试发出去
		if len(outItem) >= int(c.BatchSize) {
			select {
			case c.out <- outItem:
				outItem = make([]interface{}, 0)
				atomic.AddInt64(&c.outCounter, 1)
			default:
				// out chan 满了,继续循转
				continue
			}
		}
	}
}

功能比较简陋,代码在 https://github.com/ringsaturn/valve

使用 demo 如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
// https://github.com/ringsaturn/valve/blob/main/example/main.go
package main

import (
	"context"
	"log"
	"time"

	"github.com/ringsaturn/valve"
	"golang.org/x/sync/errgroup"
)

type Task struct {
	Time time.Time
}

func Producer(valveCore *valve.Core, producerFreq time.Duration, timeout time.Duration) error {
	ticker := time.NewTicker(producerFreq)
	for {
		select {
		case <-ticker.C:
			addFunc := func() error {
				ctx, cancel := context.WithTimeout(context.TODO(), timeout)
				defer cancel()
				return valveCore.Add(ctx, Task{Time: time.Now()})
			}

			if err := addFunc(); err != nil {
				return err
			}
		}
	}
}

func Consumer(valveCore *valve.Core, ioTime time.Duration) error {
	out, err := valveCore.Receive()
	if err != nil {
		return err
	}
	for {
		select {
		case batchItem := <-out:
			valveCore.DoneInCounter()
			log.Println("batchItem", valveCore.GetInCounter(), batchItem)
			// mock IO
			time.Sleep(ioTime)
		}
		log.Println("lag", valveCore.GetInCounter(), valveCore.GetOutinCounter())
	}
}

func main() {
	initCtx, cancel := context.WithCancel(context.Background())
	defer cancel()

	valveCore, err := valve.NewCore(time.NewTicker(100*time.Millisecond), 100, 100, 2)
	if err != nil {
		panic(err)
	}

	group, groupCtx := errgroup.WithContext(initCtx)

	producerCount := 5
	startWorkerCount := 5
	consumerCount := 3

	for i := 0; i < producerCount; i++ {
		group.Go(func() error {
			return Producer(valveCore, time.Millisecond, 10*time.Millisecond)
		})
	}

	for i := 0; i < startWorkerCount; i++ {
		group.Go(func() error {
			return valveCore.Start(groupCtx)
		})
	}

	for i := 0; i < consumerCount; i++ {
		group.Go(func() error {
			return Consumer(valveCore, 100*time.Millisecond)
		})
	}

	panic(group.Wait())
}