MCPcopy
hub / github.com/IBM/sarama / ExpectConsumePartition

Method ExpectConsumePartition

mocks/consumer.go:211–239  ·  view source on GitHub ↗

ExpectConsumePartition will register a topic/partition, so you can set expectations on it. The registered PartitionConsumer will be returned, so you can set expectations on it using method chaining. Once a topic/partition is registered, you are expected to start consuming it using ConsumePartition.

(topic string, partition int32, offset int64)

Source from the content-addressed store, hash-verified

209// and be fully consumed once the mock consumer is closed if ExpectMessagesDrainedOnClose or
210// ExpectErrorsDrainedOnClose have been called.
211func (c *Consumer) ExpectConsumePartition(topic string, partition int32, offset int64) *PartitionConsumer {
212 c.l.Lock()
213 defer c.l.Unlock()
214
215 if c.partitionConsumers[topic] == nil {
216 c.partitionConsumers[topic] = make(map[int32]*PartitionConsumer)
217 }
218
219 if c.partitionConsumers[topic][partition] == nil {
220 highWatermarkOffset := offset
221 if offset == sarama.OffsetOldest {
222 highWatermarkOffset = 0
223 }
224
225 consumer := &PartitionConsumer{
226 t: c.t,
227 topic: topic,
228 partition: partition,
229 offset: offset,
230 messages: make(chan *sarama.ConsumerMessage, c.config.ChannelBufferSize),
231 suppressedMessages: make(chan *sarama.ConsumerMessage, c.config.ChannelBufferSize),
232 errors: make(chan *sarama.ConsumerError, c.config.ChannelBufferSize),
233 }
234 consumer.highWaterMarkOffset.Store(highWatermarkOffset)
235 c.partitionConsumers[topic][partition] = consumer
236 }
237
238 return c.partitionConsumers[topic][partition]
239}
240
241///////////////////////////////////////////////////
242// PartitionConsumer mock type

Implementers 2

consumerconsumer.go
Consumermocks/consumer.go

Calls

no outgoing calls