It retry until Kafka broker is ready
(ctx context.Context, c *kgo.Client, l log.Logger)
| 53 | |
| 54 | // It retry until Kafka broker is ready |
| 55 | func WaitForKafkaBroker(ctx context.Context, c *kgo.Client, l log.Logger) error { |
| 56 | boff := backoff.New(ctx, backoff.Config{ |
| 57 | MinBackoff: 100 * time.Millisecond, |
| 58 | MaxBackoff: time.Minute, // If there is a network hiccup, we prefer to wait longer retrying, than fail the service. |
| 59 | MaxRetries: 10, |
| 60 | }) |
| 61 | |
| 62 | for boff.Ongoing() { |
| 63 | err := c.Ping(ctx) |
| 64 | if err == nil { |
| 65 | break |
| 66 | } |
| 67 | level.Warn(l).Log("msg", "ping kafka; will retry", "err", err) |
| 68 | boff.Wait() |
| 69 | } |
| 70 | if err := boff.ErrCause(); err != nil { |
| 71 | return fmt.Errorf("kafka broker not ready after %d retries: %w", boff.NumRetries(), err) |
| 72 | } |
| 73 | return nil |
| 74 | } |
| 75 | |
| 76 | func HandleKafkaError(err error, forceMetadataRefresh func()) { |
| 77 | if err == nil { |