( t *testing.T, config *Config, groupID string, maxMessages int32, sink *testFuncConsumerGroupSink, topics ...string, )
| 426 | } |
| 427 | |
| 428 | func runTestFuncConsumerGroupMemberWithConfig( |
| 429 | t *testing.T, |
| 430 | config *Config, |
| 431 | groupID string, |
| 432 | maxMessages int32, |
| 433 | sink *testFuncConsumerGroupSink, |
| 434 | topics ...string, |
| 435 | ) *testFuncConsumerGroupMember { |
| 436 | t.Helper() |
| 437 | |
| 438 | group, err := NewConsumerGroup(FunctionalTestEnv.KafkaBrokerAddrs, groupID, config) |
| 439 | if err != nil { |
| 440 | t.Fatal(err) |
| 441 | return nil |
| 442 | } |
| 443 | |
| 444 | if len(topics) == 0 { |
| 445 | topics = []string{"test.4"} |
| 446 | } |
| 447 | |
| 448 | member := &testFuncConsumerGroupMember{ |
| 449 | ConsumerGroup: group, |
| 450 | clientID: config.ClientID, |
| 451 | claims: make(map[string]int), |
| 452 | isCapped: maxMessages != 0, |
| 453 | sink: sink, |
| 454 | t: t, |
| 455 | } |
| 456 | member.maxMessages.Store(maxMessages) |
| 457 | go member.loop(topics) |
| 458 | return member |
| 459 | } |
| 460 | |
| 461 | func (m *testFuncConsumerGroupMember) AssertCleanShutdown() { |
| 462 | m.t.Helper() |
no test coverage detected