()
| 50 | } |
| 51 | |
| 52 | func main() { |
| 53 | keepRunning := true |
| 54 | log.Println("Starting a new Sarama consumer") |
| 55 | |
| 56 | if verbose { |
| 57 | sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags) |
| 58 | } |
| 59 | |
| 60 | version, err := sarama.ParseKafkaVersion(version) |
| 61 | if err != nil { |
| 62 | log.Panicf("Error parsing Kafka version: %v", err) |
| 63 | } |
| 64 | |
| 65 | /** |
| 66 | * Construct a new Sarama configuration. |
| 67 | * The Kafka cluster version has to be defined before the consumer/producer is initialized. |
| 68 | */ |
| 69 | config := sarama.NewConfig() |
| 70 | config.Version = version |
| 71 | |
| 72 | switch assignor { |
| 73 | case "sticky": |
| 74 | config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.NewBalanceStrategySticky()} |
| 75 | case "roundrobin": |
| 76 | config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.NewBalanceStrategyRoundRobin()} |
| 77 | case "range": |
| 78 | config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.NewBalanceStrategyRange()} |
| 79 | default: |
| 80 | log.Panicf("Unrecognized consumer group partition assignor: %s", assignor) |
| 81 | } |
| 82 | |
| 83 | if oldest { |
| 84 | config.Consumer.Offsets.Initial = sarama.OffsetOldest |
| 85 | } |
| 86 | |
| 87 | /** |
| 88 | * Setup a new Sarama consumer group |
| 89 | */ |
| 90 | consumer := Consumer{ |
| 91 | ready: make(chan bool), |
| 92 | } |
| 93 | |
| 94 | ctx, cancel := context.WithCancel(context.Background()) |
| 95 | client, err := sarama.NewConsumerGroup(strings.Split(brokers, ","), group, config) |
| 96 | if err != nil { |
| 97 | log.Panicf("Error creating consumer group client: %v", err) |
| 98 | } |
| 99 | |
| 100 | consumptionIsPaused := false |
| 101 | wg := &sync.WaitGroup{} |
| 102 | wg.Add(1) |
| 103 | go func() { |
| 104 | defer wg.Done() |
| 105 | for { |
| 106 | // `Consume` should be called inside an infinite loop, when a |
| 107 | // server-side rebalance happens, the consumer session will need to be |
| 108 | // recreated to get the new claims |
| 109 | if err := client.Consume(ctx, strings.Split(topics, ","), &consumer); err != nil { |
nothing calls this directly
no test coverage detected