MCPcopy
hub / github.com/IBM/sarama / main

Function main

examples/consumergroup/main.go:52–149  ·  view source on GitHub ↗
()

Source from the content-addressed store, hash-verified

50}
51
52func main() {
53 keepRunning := true
54 log.Println("Starting a new Sarama consumer")
55
56 if verbose {
57 sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)
58 }
59
60 version, err := sarama.ParseKafkaVersion(version)
61 if err != nil {
62 log.Panicf("Error parsing Kafka version: %v", err)
63 }
64
65 /**
66 * Construct a new Sarama configuration.
67 * The Kafka cluster version has to be defined before the consumer/producer is initialized.
68 */
69 config := sarama.NewConfig()
70 config.Version = version
71
72 switch assignor {
73 case "sticky":
74 config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.NewBalanceStrategySticky()}
75 case "roundrobin":
76 config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.NewBalanceStrategyRoundRobin()}
77 case "range":
78 config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.NewBalanceStrategyRange()}
79 default:
80 log.Panicf("Unrecognized consumer group partition assignor: %s", assignor)
81 }
82
83 if oldest {
84 config.Consumer.Offsets.Initial = sarama.OffsetOldest
85 }
86
87 /**
88 * Setup a new Sarama consumer group
89 */
90 consumer := Consumer{
91 ready: make(chan bool),
92 }
93
94 ctx, cancel := context.WithCancel(context.Background())
95 client, err := sarama.NewConsumerGroup(strings.Split(brokers, ","), group, config)
96 if err != nil {
97 log.Panicf("Error creating consumer group client: %v", err)
98 }
99
100 consumptionIsPaused := false
101 wg := &sync.WaitGroup{}
102 wg.Add(1)
103 go func() {
104 defer wg.Done()
105 for {
106 // `Consume` should be called inside an infinite loop, when a
107 // server-side rebalance happens, the consumer session will need to be
108 // recreated to get the new claims
109 if err := client.Consume(ctx, strings.Split(topics, ","), &consumer); err != nil {

Callers

nothing calls this directly

Calls 13

ConsumeMethod · 0.95
CloseMethod · 0.95
ParseKafkaVersionFunction · 0.92
NewConfigFunction · 0.92
NewBalanceStrategyStickyFunction · 0.92
NewBalanceStrategyRangeFunction · 0.92
NewConsumerGroupFunction · 0.92
IsMethod · 0.80
toggleConsumptionFlowFunction · 0.70
PrintlnMethod · 0.65
DoneMethod · 0.65

Tested by

no test coverage detected