(topic string, partition int32, offset int64)
| 157 | } |
| 158 | |
| 159 | func (c *consumer) ConsumePartition(topic string, partition int32, offset int64) (PartitionConsumer, error) { |
| 160 | child := &partitionConsumer{ |
| 161 | consumer: c, |
| 162 | conf: c.conf, |
| 163 | topic: topic, |
| 164 | partition: partition, |
| 165 | messages: make(chan *ConsumerMessage, c.conf.ChannelBufferSize), |
| 166 | errors: make(chan *ConsumerError, c.conf.ChannelBufferSize), |
| 167 | feeder: make(chan *partitionConsumerResponse, 1), |
| 168 | leaderEpoch: invalidLeaderEpoch, |
| 169 | preferredReadReplica: invalidPreferredReplicaID, |
| 170 | trigger: make(chan none, 1), |
| 171 | dying: make(chan none), |
| 172 | dispatcherStop: make(chan none), |
| 173 | fetchSize: c.conf.Consumer.Fetch.Default, |
| 174 | } |
| 175 | |
| 176 | if err := child.chooseStartingOffset(offset); err != nil { |
| 177 | return nil, err |
| 178 | } |
| 179 | |
| 180 | leader, epoch, err := c.client.LeaderAndEpoch(child.topic, child.partition) |
| 181 | if err != nil { |
| 182 | return nil, err |
| 183 | } |
| 184 | |
| 185 | if err := c.addChild(child); err != nil { |
| 186 | return nil, err |
| 187 | } |
| 188 | |
| 189 | go withRecover(child.dispatcher) |
| 190 | go withRecover(child.responseFeeder) |
| 191 | |
| 192 | child.leaderEpoch = epoch |
| 193 | for { |
| 194 | child.broker = c.refBrokerConsumer(leader) |
| 195 | child.brokerSubscription = newBrokerSubscription(child) |
| 196 | if child.broker.queueSubscription(child.brokerSubscription) { |
| 197 | break |
| 198 | } |
| 199 | |
| 200 | child.brokerSubscription.release() |
| 201 | c.unrefBrokerConsumer(child.broker) |
| 202 | } |
| 203 | |
| 204 | return child, nil |
| 205 | } |
| 206 | |
| 207 | func (c *consumer) HighWaterMarks() map[string]map[int32]int64 { |
| 208 | c.lock.Lock() |
nothing calls this directly
no test coverage detected