NewBatcher creates a new Batcher and starts it.
(ctx context.Context, opts ...BatcherOption)
| 86 | |
| 87 | // NewBatcher creates a new Batcher and starts it. |
| 88 | func NewBatcher(ctx context.Context, opts ...BatcherOption) (*DBBatcher, func(), error) { |
| 89 | b := &DBBatcher{} |
| 90 | b.log = slog.Make(sloghuman.Sink(os.Stderr)) |
| 91 | b.flushLever = make(chan struct{}, 1) // Buffered so that it doesn't block. |
| 92 | for _, opt := range opts { |
| 93 | opt(b) |
| 94 | } |
| 95 | |
| 96 | if b.store == nil { |
| 97 | return nil, nil, xerrors.Errorf("no store configured for batcher") |
| 98 | } |
| 99 | |
| 100 | if b.interval == 0 { |
| 101 | b.interval = defaultFlushInterval |
| 102 | } |
| 103 | |
| 104 | if b.batchSize == 0 { |
| 105 | b.batchSize = defaultBufferSize |
| 106 | } |
| 107 | |
| 108 | if b.tickCh == nil { |
| 109 | b.ticker = time.NewTicker(b.interval) |
| 110 | b.tickCh = b.ticker.C |
| 111 | } |
| 112 | |
| 113 | b.initBuf(b.batchSize) |
| 114 | |
| 115 | cancelCtx, cancelFunc := context.WithCancel(ctx) |
| 116 | done := make(chan struct{}) |
| 117 | go func() { |
| 118 | b.run(cancelCtx) |
| 119 | close(done) |
| 120 | }() |
| 121 | |
| 122 | closer := func() { |
| 123 | cancelFunc() |
| 124 | if b.ticker != nil { |
| 125 | b.ticker.Stop() |
| 126 | } |
| 127 | <-done |
| 128 | } |
| 129 | |
| 130 | return b, closer, nil |
| 131 | } |
| 132 | |
| 133 | // Add adds a stat to the batcher for the given workspace and agent. |
| 134 | func (b *DBBatcher) Add( |