ExpectConsumePartition will register a topic/partition, so you can set expectations on it. The registered PartitionConsumer will be returned, so you can set expectations on it using method chaining. Once a topic/partition is registered, you are expected to start consuming it using ConsumePartition.
(topic string, partition int32, offset int64)
| 209 | // and be fully consumed once the mock consumer is closed if ExpectMessagesDrainedOnClose or |
| 210 | // ExpectErrorsDrainedOnClose have been called. |
| 211 | func (c *Consumer) ExpectConsumePartition(topic string, partition int32, offset int64) *PartitionConsumer { |
| 212 | c.l.Lock() |
| 213 | defer c.l.Unlock() |
| 214 | |
| 215 | if c.partitionConsumers[topic] == nil { |
| 216 | c.partitionConsumers[topic] = make(map[int32]*PartitionConsumer) |
| 217 | } |
| 218 | |
| 219 | if c.partitionConsumers[topic][partition] == nil { |
| 220 | highWatermarkOffset := offset |
| 221 | if offset == sarama.OffsetOldest { |
| 222 | highWatermarkOffset = 0 |
| 223 | } |
| 224 | |
| 225 | consumer := &PartitionConsumer{ |
| 226 | t: c.t, |
| 227 | topic: topic, |
| 228 | partition: partition, |
| 229 | offset: offset, |
| 230 | messages: make(chan *sarama.ConsumerMessage, c.config.ChannelBufferSize), |
| 231 | suppressedMessages: make(chan *sarama.ConsumerMessage, c.config.ChannelBufferSize), |
| 232 | errors: make(chan *sarama.ConsumerError, c.config.ChannelBufferSize), |
| 233 | } |
| 234 | consumer.highWaterMarkOffset.Store(highWatermarkOffset) |
| 235 | c.partitionConsumers[topic][partition] = consumer |
| 236 | } |
| 237 | |
| 238 | return c.partitionConsumers[topic][partition] |
| 239 | } |
| 240 | |
| 241 | /////////////////////////////////////////////////// |
| 242 | // PartitionConsumer mock type |
no outgoing calls