(allAssignments map[string][]PartitionAssignment)
| 122 | } |
| 123 | |
| 124 | func (r *Reader) subscribe(allAssignments map[string][]PartitionAssignment) { |
| 125 | offsets := make(map[topicPartition]int64) |
| 126 | for topic, assignments := range allAssignments { |
| 127 | for _, assignment := range assignments { |
| 128 | key := topicPartition{ |
| 129 | topic: topic, |
| 130 | partition: int32(assignment.ID), |
| 131 | } |
| 132 | offsets[key] = assignment.Offset |
| 133 | } |
| 134 | } |
| 135 | |
| 136 | r.mutex.Lock() |
| 137 | r.start(offsets) |
| 138 | r.mutex.Unlock() |
| 139 | |
| 140 | r.withLogger(func(l Logger) { |
| 141 | l.Printf("subscribed to topics and partitions: %+v", offsets) |
| 142 | }) |
| 143 | } |
| 144 | |
| 145 | // commitOffsetsWithRetry attempts to commit the specified offsets and retries |
| 146 | // up to the specified number of times. |
no test coverage detected