MCPcopy
hub / github.com/IBM/sarama / main

Function main

examples/exactly_once/main.go:60–177  ·  view source on GitHub ↗
()

Source from the content-addressed store, hash-verified

58}
59
60func main() {
61 keepRunning := true
62 log.Println("Starting a new Sarama consumer")
63
64 if verbose {
65 sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)
66 }
67
68 version, err := sarama.ParseKafkaVersion(version)
69 if err != nil {
70 log.Panicf("Error parsing Kafka version: %v", err)
71 }
72
73 /**
74 * Construct a new Sarama configuration.
75 * The Kafka cluster version has to be defined before the consumer/producer is initialized.
76 */
77 config := sarama.NewConfig()
78 config.Version = version
79
80 switch assignor {
81 case "sticky":
82 config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.NewBalanceStrategySticky()}
83 case "roundrobin":
84 config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.NewBalanceStrategyRoundRobin()}
85 case "range":
86 config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.NewBalanceStrategyRange()}
87 default:
88 log.Panicf("Unrecognized consumer group partition assignor: %s", assignor)
89 }
90
91 if oldest {
92 config.Consumer.Offsets.Initial = sarama.OffsetOldest
93 }
94
95 config.Consumer.IsolationLevel = sarama.ReadCommitted
96 config.Consumer.Offsets.AutoCommit.Enable = false
97
98 producerProvider := newProducerProvider(strings.Split(brokers, ","), func() *sarama.Config {
99 producerConfig := sarama.NewConfig()
100 producerConfig.Version = version
101
102 producerConfig.Net.MaxOpenRequests = 1
103 producerConfig.Producer.RequiredAcks = sarama.WaitForAll
104 producerConfig.Producer.Idempotent = true
105 producerConfig.Producer.Transaction.ID = "sarama"
106 return producerConfig
107 })
108
109 /**
110 * Setup a new Sarama consumer group
111 */
112 consumer := Consumer{
113 groupId: group,
114 brokers: strings.Split(brokers, ","),
115 producerProvider: producerProvider,
116 ready: make(chan bool),
117 }

Callers

nothing calls this directly

Calls 15

ConsumeMethod · 0.95
CloseMethod · 0.95
ParseKafkaVersionFunction · 0.92
NewConfigFunction · 0.92
NewBalanceStrategyStickyFunction · 0.92
NewBalanceStrategyRangeFunction · 0.92
NewConsumerGroupFunction · 0.92
IsMethod · 0.80
newProducerProviderFunction · 0.70
toggleConsumptionFlowFunction · 0.70
PrintlnMethod · 0.65

Tested by

no test coverage detected