Close implements the Close method from the sarama.Consumer interface. It will close all registered PartitionConsumer instances.
()
| 119 | // Close implements the Close method from the sarama.Consumer interface. It will close |
| 120 | // all registered PartitionConsumer instances. |
| 121 | func (c *Consumer) Close() error { |
| 122 | c.l.Lock() |
| 123 | defer c.l.Unlock() |
| 124 | |
| 125 | for _, partitions := range c.partitionConsumers { |
| 126 | for _, partitionConsumer := range partitions { |
| 127 | _ = partitionConsumer.Close() |
| 128 | } |
| 129 | } |
| 130 | |
| 131 | return nil |
| 132 | } |
| 133 | |
| 134 | // Pause implements Consumer. |
| 135 | func (c *Consumer) Pause(topicPartitions map[string][]int32) { |