Start launches the provided function in a go routine and adds accounting such that when the function exits, it stops the current generation (if not already in the process of doing so). The provided function MUST support cancellation via the ctx argument and exit in a timely manner once the ctx is c
(fn func(ctx context.Context))
| 373 | // progress for this consumer and potentially cause consumer group membership |
| 374 | // churn. |
| 375 | func (g *Generation) Start(fn func(ctx context.Context)) { |
| 376 | g.lock.Lock() |
| 377 | defer g.lock.Unlock() |
| 378 | |
| 379 | // this is an edge case: if the generation has already closed, then it's |
| 380 | // possible that the close func has already waited on outstanding go |
| 381 | // routines and exited. |
| 382 | // |
| 383 | // nonetheless, it's important to honor that the fn is invoked in case the |
| 384 | // calling function is waiting e.g. on a channel send or a WaitGroup. in |
| 385 | // such a case, fn should immediately exit because ctx.Err() will return |
| 386 | // ErrGenerationEnded. |
| 387 | if g.closed { |
| 388 | go fn(genCtx{g}) |
| 389 | return |
| 390 | } |
| 391 | |
| 392 | // register that there is one more go routine that's part of this gen. |
| 393 | g.routines++ |
| 394 | |
| 395 | go func() { |
| 396 | fn(genCtx{g}) |
| 397 | g.lock.Lock() |
| 398 | // shut down the generation as soon as one function exits. this is |
| 399 | // different from close() in that it doesn't wait for all go routines in |
| 400 | // the generation to exit. |
| 401 | if !g.closed { |
| 402 | close(g.done) |
| 403 | g.closed = true |
| 404 | } |
| 405 | g.routines-- |
| 406 | // if this was the last go routine in the generation, close the joined |
| 407 | // chan so that close() can exit if it's waiting. |
| 408 | if g.routines == 0 { |
| 409 | close(g.joined) |
| 410 | } |
| 411 | g.lock.Unlock() |
| 412 | }() |
| 413 | } |
| 414 | |
| 415 | // CommitOffsets commits the provided topic+partition+offset combos to the |
| 416 | // consumer group coordinator. This can be used to reset the consumer to |
no outgoing calls