MCPcopy
hub / github.com/IBM/sarama / ConsumePartition

Method ConsumePartition

consumer.go:159–205  ·  view source on GitHub ↗
(topic string, partition int32, offset int64)

Source from the content-addressed store, hash-verified

157}
158
159func (c *consumer) ConsumePartition(topic string, partition int32, offset int64) (PartitionConsumer, error) {
160 child := &partitionConsumer{
161 consumer: c,
162 conf: c.conf,
163 topic: topic,
164 partition: partition,
165 messages: make(chan *ConsumerMessage, c.conf.ChannelBufferSize),
166 errors: make(chan *ConsumerError, c.conf.ChannelBufferSize),
167 feeder: make(chan *partitionConsumerResponse, 1),
168 leaderEpoch: invalidLeaderEpoch,
169 preferredReadReplica: invalidPreferredReplicaID,
170 trigger: make(chan none, 1),
171 dying: make(chan none),
172 dispatcherStop: make(chan none),
173 fetchSize: c.conf.Consumer.Fetch.Default,
174 }
175
176 if err := child.chooseStartingOffset(offset); err != nil {
177 return nil, err
178 }
179
180 leader, epoch, err := c.client.LeaderAndEpoch(child.topic, child.partition)
181 if err != nil {
182 return nil, err
183 }
184
185 if err := c.addChild(child); err != nil {
186 return nil, err
187 }
188
189 go withRecover(child.dispatcher)
190 go withRecover(child.responseFeeder)
191
192 child.leaderEpoch = epoch
193 for {
194 child.broker = c.refBrokerConsumer(leader)
195 child.brokerSubscription = newBrokerSubscription(child)
196 if child.broker.queueSubscription(child.brokerSubscription) {
197 break
198 }
199
200 child.brokerSubscription.release()
201 c.unrefBrokerConsumer(child.broker)
202 }
203
204 return child, nil
205}
206
207func (c *consumer) HighWaterMarks() map[string]map[int32]int64 {
208 c.lock.Lock()

Callers

nothing calls this directly

Calls 9

chooseStartingOffsetMethod · 0.95
addChildMethod · 0.95
refBrokerConsumerMethod · 0.95
unrefBrokerConsumerMethod · 0.95
withRecoverFunction · 0.85
newBrokerSubscriptionFunction · 0.85
queueSubscriptionMethod · 0.80
LeaderAndEpochMethod · 0.65
releaseMethod · 0.45

Tested by

no test coverage detected