MCPcopy
hub / github.com/segmentio/kafka-go / Start

Method Start

consumergroup.go:375–413  ·  view source on GitHub ↗

Start launches the provided function in a go routine and adds accounting such that when the function exits, it stops the current generation (if not already in the process of doing so). The provided function MUST support cancellation via the ctx argument and exit in a timely manner once the ctx is c

(fn func(ctx context.Context))

Source from the content-addressed store, hash-verified

373// progress for this consumer and potentially cause consumer group membership
374// churn.
375func (g *Generation) Start(fn func(ctx context.Context)) {
376 g.lock.Lock()
377 defer g.lock.Unlock()
378
379 // this is an edge case: if the generation has already closed, then it's
380 // possible that the close func has already waited on outstanding go
381 // routines and exited.
382 //
383 // nonetheless, it's important to honor that the fn is invoked in case the
384 // calling function is waiting e.g. on a channel send or a WaitGroup. in
385 // such a case, fn should immediately exit because ctx.Err() will return
386 // ErrGenerationEnded.
387 if g.closed {
388 go fn(genCtx{g})
389 return
390 }
391
392 // register that there is one more go routine that's part of this gen.
393 g.routines++
394
395 go func() {
396 fn(genCtx{g})
397 g.lock.Lock()
398 // shut down the generation as soon as one function exits. this is
399 // different from close() in that it doesn't wait for all go routines in
400 // the generation to exit.
401 if !g.closed {
402 close(g.done)
403 g.closed = true
404 }
405 g.routines--
406 // if this was the last go routine in the generation, close the joined
407 // chan so that close() can exit if it's waiting.
408 if g.routines == 0 {
409 close(g.joined)
410 }
411 g.lock.Unlock()
412 }()
413}
414
415// CommitOffsets commits the provided topic+partition+offset combos to the
416// consumer group coordinator. This can be used to reset the consumer to

Callers 5

runMethod · 0.95
heartbeatLoopMethod · 0.95
partitionWatcherMethod · 0.95

Calls

no outgoing calls