()
| 58 | } |
| 59 | |
| 60 | func main() { |
| 61 | keepRunning := true |
| 62 | log.Println("Starting a new Sarama consumer") |
| 63 | |
| 64 | if verbose { |
| 65 | sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags) |
| 66 | } |
| 67 | |
| 68 | version, err := sarama.ParseKafkaVersion(version) |
| 69 | if err != nil { |
| 70 | log.Panicf("Error parsing Kafka version: %v", err) |
| 71 | } |
| 72 | |
| 73 | /** |
| 74 | * Construct a new Sarama configuration. |
| 75 | * The Kafka cluster version has to be defined before the consumer/producer is initialized. |
| 76 | */ |
| 77 | config := sarama.NewConfig() |
| 78 | config.Version = version |
| 79 | |
| 80 | switch assignor { |
| 81 | case "sticky": |
| 82 | config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.NewBalanceStrategySticky()} |
| 83 | case "roundrobin": |
| 84 | config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.NewBalanceStrategyRoundRobin()} |
| 85 | case "range": |
| 86 | config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.NewBalanceStrategyRange()} |
| 87 | default: |
| 88 | log.Panicf("Unrecognized consumer group partition assignor: %s", assignor) |
| 89 | } |
| 90 | |
| 91 | if oldest { |
| 92 | config.Consumer.Offsets.Initial = sarama.OffsetOldest |
| 93 | } |
| 94 | |
| 95 | config.Consumer.IsolationLevel = sarama.ReadCommitted |
| 96 | config.Consumer.Offsets.AutoCommit.Enable = false |
| 97 | |
| 98 | producerProvider := newProducerProvider(strings.Split(brokers, ","), func() *sarama.Config { |
| 99 | producerConfig := sarama.NewConfig() |
| 100 | producerConfig.Version = version |
| 101 | |
| 102 | producerConfig.Net.MaxOpenRequests = 1 |
| 103 | producerConfig.Producer.RequiredAcks = sarama.WaitForAll |
| 104 | producerConfig.Producer.Idempotent = true |
| 105 | producerConfig.Producer.Transaction.ID = "sarama" |
| 106 | return producerConfig |
| 107 | }) |
| 108 | |
| 109 | /** |
| 110 | * Setup a new Sarama consumer group |
| 111 | */ |
| 112 | consumer := Consumer{ |
| 113 | groupId: group, |
| 114 | brokers: strings.Split(brokers, ","), |
| 115 | producerProvider: producerProvider, |
| 116 | ready: make(chan bool), |
| 117 | } |
nothing calls this directly
no test coverage detected