1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
| // https://github.com/ringsaturn/valve/blob/main/example/main.go
package main
import (
"context"
"log"
"time"
"github.com/ringsaturn/valve"
"golang.org/x/sync/errgroup"
)
type Task struct {
Time time.Time
}
func Producer(valveCore *valve.Core, producerFreq time.Duration, timeout time.Duration) error {
ticker := time.NewTicker(producerFreq)
for {
select {
case <-ticker.C:
addFunc := func() error {
ctx, cancel := context.WithTimeout(context.TODO(), timeout)
defer cancel()
return valveCore.Add(ctx, Task{Time: time.Now()})
}
if err := addFunc(); err != nil {
return err
}
}
}
}
func Consumer(valveCore *valve.Core, ioTime time.Duration) error {
out, err := valveCore.Receive()
if err != nil {
return err
}
for {
select {
case batchItem := <-out:
valveCore.DoneInCounter()
log.Println("batchItem", valveCore.GetInCounter(), batchItem)
// mock IO
time.Sleep(ioTime)
}
log.Println("lag", valveCore.GetInCounter(), valveCore.GetOutinCounter())
}
}
func main() {
initCtx, cancel := context.WithCancel(context.Background())
defer cancel()
valveCore, err := valve.NewCore(time.NewTicker(100*time.Millisecond), 100, 100, 2)
if err != nil {
panic(err)
}
group, groupCtx := errgroup.WithContext(initCtx)
producerCount := 5
startWorkerCount := 5
consumerCount := 3
for i := 0; i < producerCount; i++ {
group.Go(func() error {
return Producer(valveCore, time.Millisecond, 10*time.Millisecond)
})
}
for i := 0; i < startWorkerCount; i++ {
group.Go(func() error {
return valveCore.Start(groupCtx)
})
}
for i := 0; i < consumerCount; i++ {
group.Go(func() error {
return Consumer(valveCore, 100*time.Millisecond)
})
}
panic(group.Wait())
}
|