MCPcopy
hub / github.com/IBM/sarama / ExampleConsumerGroup

Function ExampleConsumerGroup

consumer_group_example_test.go:22–54  ·  view source on GitHub ↗
()

Source from the content-addressed store, hash-verified

20}
21
22func ExampleConsumerGroup() {
23 config := NewTestConfig()
24 config.Version = V2_0_0_0 // specify appropriate version
25 config.Consumer.Return.Errors = true
26
27 group, err := NewConsumerGroup([]string{"localhost:9092"}, "my-group", config)
28 if err != nil {
29 panic(err)
30 }
31 defer func() { _ = group.Close() }()
32
33 // Track errors
34 go func() {
35 for err := range group.Errors() {
36 fmt.Println("ERROR", err)
37 }
38 }()
39
40 // Iterate over consumer sessions.
41 ctx := context.Background()
42 for {
43 topics := []string{"my-topic"}
44 handler := exampleConsumerGroupHandler{}
45
46 // `Consume` should be called inside an infinite loop, when a
47 // server-side rebalance happens, the consumer session will need to be
48 // recreated to get the new claims
49 err := group.Consume(ctx, topics, handler)
50 if err != nil {
51 panic(err)
52 }
53 }
54}

Callers

nothing calls this directly

Calls 6

CloseMethod · 0.95
ErrorsMethod · 0.95
ConsumeMethod · 0.95
NewConsumerGroupFunction · 0.85
NewTestConfigFunction · 0.70
PrintlnMethod · 0.65

Tested by

no test coverage detected