MCPcopy
hub / github.com/segmentio/kafka-go / run

Method run

reader.go:291–348  ·  view source on GitHub ↗

run provides the main consumer group management loop. Each iteration performs the handshake to join the Reader to the consumer group. This function is responsible for closing the consumer group upon exit.

(cg *ConsumerGroup)

Source from the content-addressed store, hash-verified

289//
290// This function is responsible for closing the consumer group upon exit.
291func (r *Reader) run(cg *ConsumerGroup) {
292 defer close(r.done)
293 defer cg.Close()
294
295 r.withLogger(func(l Logger) {
296 l.Printf("entering loop for consumer group, %v\n", r.config.GroupID)
297 })
298
299 for {
300 // Limit the number of attempts at waiting for the next
301 // consumer generation.
302 var err error
303 var gen *Generation
304 for attempt := 1; attempt <= r.config.MaxAttempts; attempt++ {
305 gen, err = cg.Next(r.stctx)
306 if err == nil {
307 break
308 }
309 if errors.Is(err, r.stctx.Err()) {
310 return
311 }
312 r.stats.errors.observe(1)
313 r.withErrorLogger(func(l Logger) {
314 l.Printf("%v", err)
315 })
316 // Continue with next attempt...
317 }
318 if err != nil {
319 // All attempts have failed.
320 select {
321 case r.runError <- err:
322 // If somebody's receiving on the runError, let
323 // them know the error occurred.
324 default:
325 // Otherwise, don't block to allow healing.
326 }
327 continue
328 }
329
330 r.stats.rebalances.observe(1)
331
332 r.subscribe(gen.Assignments)
333
334 gen.Start(func(ctx context.Context) {
335 r.commitLoop(ctx, gen)
336 })
337 gen.Start(func(ctx context.Context) {
338 // wait for the generation to end and then unsubscribe.
339 select {
340 case <-ctx.Done():
341 // continue to next generation
342 case <-r.stctx.Done():
343 // this will be the last loop because the reader is closed.
344 }
345 r.unsubscribe()
346 })
347 }
348}

Callers 2

NewReaderFunction · 0.95
startMethod · 0.45

Calls 12

withLoggerMethod · 0.95
withErrorLoggerMethod · 0.95
subscribeMethod · 0.95
StartMethod · 0.95
commitLoopMethod · 0.95
unsubscribeMethod · 0.95
DoneMethod · 0.80
PrintfMethod · 0.65
NextMethod · 0.65
CloseMethod · 0.45
ErrMethod · 0.45
observeMethod · 0.45

Tested by

no test coverage detected