Consumer interface implementation ConsumePartition implements the ConsumePartition method from the sarama.Consumer interface. Before you can start consuming a partition, you have to set expectations on it using ExpectConsumePartition. You can only consume a partition once per consumer.
(topic string, partition int32, offset int64)
| 46 | // Before you can start consuming a partition, you have to set expectations on it using |
| 47 | // ExpectConsumePartition. You can only consume a partition once per consumer. |
| 48 | func (c *Consumer) ConsumePartition(topic string, partition int32, offset int64) (sarama.PartitionConsumer, error) { |
| 49 | c.l.Lock() |
| 50 | defer c.l.Unlock() |
| 51 | |
| 52 | if c.partitionConsumers[topic] == nil || c.partitionConsumers[topic][partition] == nil { |
| 53 | c.t.Errorf("No expectations set for %s/%d", topic, partition) |
| 54 | return nil, errOutOfExpectations |
| 55 | } |
| 56 | |
| 57 | pc := c.partitionConsumers[topic][partition] |
| 58 | if pc.consumed { |
| 59 | return nil, sarama.ConfigurationError("The topic/partition is already being consumed") |
| 60 | } |
| 61 | |
| 62 | if pc.offset != AnyOffset && pc.offset != offset { |
| 63 | c.t.Errorf("Unexpected offset when calling ConsumePartition for %s/%d. Expected %d, got %d.", topic, partition, pc.offset, offset) |
| 64 | } |
| 65 | |
| 66 | pc.consumed = true |
| 67 | return pc, nil |
| 68 | } |
| 69 | |
| 70 | // Topics returns a list of topics, as registered with SetTopicMetadata |
| 71 | func (c *Consumer) Topics() ([]string, error) { |