MCPcopy
hub / github.com/IBM/sarama / main

Function main

examples/consumer_load_aware/main.go:45–103  ·  view source on GitHub ↗
()

Source from the content-addressed store, hash-verified

43}
44
45func main() {
46 if verbose {
47 sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)
48 }
49
50 v, err := sarama.ParseKafkaVersion(version)
51 if err != nil {
52 log.Panicf("Error parsing Kafka version: %v", err)
53 }
54
55 config := sarama.NewConfig()
56 config.Version = v
57 config.Consumer.Offsets.Initial = sarama.OffsetOldest
58
59 consumer := &Consumer{ready: make(chan bool)}
60
61 // Plug in the load-aware sticky strategy. The observer is called once per
62 // JoinGroup cycle to capture a fresh sample of local load.
63 config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{
64 NewLoadAwareSticky(consumer.loadSample),
65 }
66
67 client, err := sarama.NewConsumerGroup(strings.Split(brokers, ","), group, config)
68 if err != nil {
69 log.Panicf("Error creating consumer group client: %v", err)
70 }
71
72 ctx, cancel := context.WithCancel(context.Background())
73 wg := &sync.WaitGroup{}
74 wg.Add(1)
75 go func() {
76 defer wg.Done()
77 for {
78 if err := client.Consume(ctx, strings.Split(topics, ","), consumer); err != nil {
79 if errors.Is(err, sarama.ErrClosedConsumerGroup) {
80 return
81 }
82 log.Panicf("Error from consumer: %v", err)
83 }
84 if ctx.Err() != nil {
85 return
86 }
87 consumer.ready = make(chan bool)
88 }
89 }()
90
91 <-consumer.ready
92 log.Println("load-aware sticky consumer running")
93
94 sigterm := make(chan os.Signal, 1)
95 signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
96 <-sigterm
97
98 cancel()
99 wg.Wait()
100 if err = client.Close(); err != nil {
101 log.Panicf("Error closing client: %v", err)
102 }

Callers

nothing calls this directly

Calls 10

ConsumeMethod · 0.95
CloseMethod · 0.95
ParseKafkaVersionFunction · 0.92
NewConfigFunction · 0.92
NewConsumerGroupFunction · 0.92
NewLoadAwareStickyFunction · 0.85
IsMethod · 0.80
DoneMethod · 0.65
PrintlnMethod · 0.65
AddMethod · 0.45

Tested by

no test coverage detected