MCPcopy
hub / github.com/IBM/sarama / TestFuncConsumerGroupPartitioning

Function TestFuncConsumerGroupPartitioning

functional_consumer_group_test.go:20–53  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

18)
19
20func TestFuncConsumerGroupPartitioning(t *testing.T) {
21 t.Parallel()
22 checkKafkaVersion(t, "0.10.2")
23 setupFunctionalTest(t)
24 defer teardownFunctionalTest(t)
25
26 groupID := testFuncConsumerGroupID(t)
27
28 // start M1
29 m1 := runTestFuncConsumerGroupMember(t, groupID, "M1", 0, nil)
30 defer m1.Stop()
31 m1.WaitForState(2)
32 m1.WaitForClaims(map[string]int{"test.4": 4})
33 m1.WaitForHandlers(4)
34
35 // start M2
36 m2 := runTestFuncConsumerGroupMember(t, groupID, "M2", 0, nil, "test.1", "test.4")
37 defer m2.Stop()
38 m2.WaitForState(2)
39
40 // assert that claims are shared among both members
41 m1.WaitForClaims(map[string]int{"test.4": 2})
42 m1.WaitForHandlers(2)
43 m2.WaitForClaims(map[string]int{"test.1": 1, "test.4": 2})
44 m2.WaitForHandlers(3)
45
46 // shutdown M1, wait for M2 to take over
47 m1.AssertCleanShutdown()
48 m2.WaitForClaims(map[string]int{"test.1": 1, "test.4": 4})
49 m2.WaitForHandlers(5)
50
51 // shutdown M2
52 m2.AssertCleanShutdown()
53}
54
55func TestFuncConsumerGroupPartitioningStateful(t *testing.T) {
56 t.Parallel()

Callers

nothing calls this directly

Calls 10

checkKafkaVersionFunction · 0.85
setupFunctionalTestFunction · 0.85
teardownFunctionalTestFunction · 0.85
testFuncConsumerGroupIDFunction · 0.85
StopMethod · 0.80
WaitForStateMethod · 0.80
WaitForClaimsMethod · 0.80
WaitForHandlersMethod · 0.80
AssertCleanShutdownMethod · 0.80

Tested by

no test coverage detected