Run runs the batcher.
(ctx context.Context)
| 180 | |
| 181 | // Run runs the batcher. |
| 182 | func (b *DBBatcher) run(ctx context.Context) { |
| 183 | // nolint:gocritic // This is only ever used for one thing - inserting agent stats. |
| 184 | authCtx := dbauthz.AsSystemRestricted(ctx) |
| 185 | for { |
| 186 | select { |
| 187 | case <-b.tickCh: |
| 188 | b.flush(authCtx, false, "scheduled") |
| 189 | case <-b.flushLever: |
| 190 | // If the flush lever is depressed, flush the buffer immediately. |
| 191 | b.flush(authCtx, true, "reaching capacity") |
| 192 | case <-ctx.Done(): |
| 193 | b.log.Debug(ctx, "context done, flushing before exit") |
| 194 | |
| 195 | // We must create a new context here as the parent context is done. |
| 196 | ctxTimeout, cancel := context.WithTimeout(context.Background(), 15*time.Second) |
| 197 | defer cancel() //nolint:revive // We're returning, defer is fine. |
| 198 | |
| 199 | // nolint:gocritic // This is only ever used for one thing - inserting agent stats. |
| 200 | b.flush(dbauthz.AsSystemRestricted(ctxTimeout), true, "exit") |
| 201 | return |
| 202 | } |
| 203 | } |
| 204 | } |
| 205 | |
| 206 | // flush flushes the batcher's buffer. |
| 207 | func (b *DBBatcher) flush(ctx context.Context, forced bool, reason string) { |
no test coverage detected