| 622 | } |
| 623 | |
| 624 | func (b *BlockBuilder) stopping(err error) error { |
| 625 | select { |
| 626 | case <-b.consumeStopped: |
| 627 | case <-time.After(60 * time.Second): |
| 628 | // 60s is the default terminationGracePeriod for the BlockBuilder's statefulSet |
| 629 | level.Error(b.logger).Log("msg", "failed to gracefully stop", "err", err) |
| 630 | } |
| 631 | if b.kafkaClient != nil { |
| 632 | b.kafkaClient.Close() |
| 633 | } |
| 634 | return err |
| 635 | } |
| 636 | |
| 637 | func (b *BlockBuilder) pushTraces(ts time.Time, tenantBytes, reqBytes []byte, p partitionSectionWriter) error { |
| 638 | req, err := b.decoder.Decode(reqBytes) |