MCPcopy
hub / github.com/IBM/sarama / ConsumePartition

Method ConsumePartition

mocks/consumer.go:48–68  ·  view source on GitHub ↗

Consumer interface implementation ConsumePartition implements the ConsumePartition method from the sarama.Consumer interface. Before you can start consuming a partition, you have to set expectations on it using ExpectConsumePartition. You can only consume a partition once per consumer.

(topic string, partition int32, offset int64)

Source from the content-addressed store, hash-verified

46// Before you can start consuming a partition, you have to set expectations on it using
47// ExpectConsumePartition. You can only consume a partition once per consumer.
48func (c *Consumer) ConsumePartition(topic string, partition int32, offset int64) (sarama.PartitionConsumer, error) {
49 c.l.Lock()
50 defer c.l.Unlock()
51
52 if c.partitionConsumers[topic] == nil || c.partitionConsumers[topic][partition] == nil {
53 c.t.Errorf("No expectations set for %s/%d", topic, partition)
54 return nil, errOutOfExpectations
55 }
56
57 pc := c.partitionConsumers[topic][partition]
58 if pc.consumed {
59 return nil, sarama.ConfigurationError("The topic/partition is already being consumed")
60 }
61
62 if pc.offset != AnyOffset && pc.offset != offset {
63 c.t.Errorf("Unexpected offset when calling ConsumePartition for %s/%d. Expected %d, got %d.", topic, partition, pc.offset, offset)
64 }
65
66 pc.consumed = true
67 return pc, nil
68}
69
70// Topics returns a list of topics, as registered with SetTopicMetadata
71func (c *Consumer) Topics() ([]string, error) {

Implementers 2

consumerconsumer.go
Consumermocks/consumer.go

Calls 2

ConfigurationErrorTypeAlias · 0.92
ErrorfMethod · 0.65