MCPcopy
hub / github.com/IBM/sarama / runTestFuncConsumerGroupMemberWithConfig

Function runTestFuncConsumerGroupMemberWithConfig

functional_consumer_group_test.go:428–459  ·  view source on GitHub ↗
(
	t *testing.T,
	config *Config,
	groupID string,
	maxMessages int32,
	sink *testFuncConsumerGroupSink,
	topics ...string,
)

Source from the content-addressed store, hash-verified

426}
427
428func runTestFuncConsumerGroupMemberWithConfig(
429 t *testing.T,
430 config *Config,
431 groupID string,
432 maxMessages int32,
433 sink *testFuncConsumerGroupSink,
434 topics ...string,
435) *testFuncConsumerGroupMember {
436 t.Helper()
437
438 group, err := NewConsumerGroup(FunctionalTestEnv.KafkaBrokerAddrs, groupID, config)
439 if err != nil {
440 t.Fatal(err)
441 return nil
442 }
443
444 if len(topics) == 0 {
445 topics = []string{"test.4"}
446 }
447
448 member := &testFuncConsumerGroupMember{
449 ConsumerGroup: group,
450 clientID: config.ClientID,
451 claims: make(map[string]int),
452 isCapped: maxMessages != 0,
453 sink: sink,
454 t: t,
455 }
456 member.maxMessages.Store(maxMessages)
457 go member.loop(topics)
458 return member
459}
460
461func (m *testFuncConsumerGroupMember) AssertCleanShutdown() {
462 m.t.Helper()

Calls 4

loopMethod · 0.95
NewConsumerGroupFunction · 0.85
HelperMethod · 0.80
FatalMethod · 0.80

Tested by

no test coverage detected