MCPcopy
hub / github.com/grafana/tempo / WaitForKafkaBroker

Function WaitForKafkaBroker

pkg/ingest/util.go:55–74  ·  view source on GitHub ↗

It retry until Kafka broker is ready

(ctx context.Context, c *kgo.Client, l log.Logger)

Source from the content-addressed store, hash-verified

53
54// It retry until Kafka broker is ready
55func WaitForKafkaBroker(ctx context.Context, c *kgo.Client, l log.Logger) error {
56 boff := backoff.New(ctx, backoff.Config{
57 MinBackoff: 100 * time.Millisecond,
58 MaxBackoff: time.Minute, // If there is a network hiccup, we prefer to wait longer retrying, than fail the service.
59 MaxRetries: 10,
60 })
61
62 for boff.Ongoing() {
63 err := c.Ping(ctx)
64 if err == nil {
65 break
66 }
67 level.Warn(l).Log("msg", "ping kafka; will retry", "err", err)
68 boff.Wait()
69 }
70 if err := boff.ErrCause(); err != nil {
71 return fmt.Errorf("kafka broker not ready after %d retries: %w", boff.NumRetries(), err)
72 }
73 return nil
74}
75
76func HandleKafkaError(err error, forceMetadataRefresh func()) {
77 if err == nil {

Callers 3

startingMethod · 0.92
startingMethod · 0.92
startKafkaIngestPathMethod · 0.92

Calls 3

PingMethod · 0.80
LogMethod · 0.65
WaitMethod · 0.65

Tested by

no test coverage detected