merge updates the offsetStash with the offsets from the provided messages.
(commits []commit)
| 170 | |
| 171 | // merge updates the offsetStash with the offsets from the provided messages. |
| 172 | func (o offsetStash) merge(commits []commit) { |
| 173 | for _, c := range commits { |
| 174 | offsetsByPartition, ok := o[c.topic] |
| 175 | if !ok { |
| 176 | offsetsByPartition = map[int]int64{} |
| 177 | o[c.topic] = offsetsByPartition |
| 178 | } |
| 179 | |
| 180 | if offset, ok := offsetsByPartition[c.partition]; !ok || c.offset > offset { |
| 181 | offsetsByPartition[c.partition] = c.offset |
| 182 | } |
| 183 | } |
| 184 | } |
| 185 | |
| 186 | // reset clears the contents of the offsetStash. |
| 187 | func (o offsetStash) reset() { |
no outgoing calls