| 109 | func (r *Reader) useSyncCommits() bool { return r.config.CommitInterval == 0 } |
| 110 | |
| 111 | func (r *Reader) unsubscribe() { |
| 112 | r.cancel() |
| 113 | r.join.Wait() |
| 114 | // it would be interesting to drain the r.msgs channel at this point since |
| 115 | // it will contain buffered messages for partitions that may not be |
| 116 | // re-assigned to this reader in the next consumer group generation. |
| 117 | // however, draining the channel could race with the client calling |
| 118 | // ReadMessage, which could result in messages delivered and/or committed |
| 119 | // with gaps in the offset. for now, we will err on the side of caution and |
| 120 | // potentially have those messages be reprocessed in the next generation by |
| 121 | // another consumer to avoid such a race. |
| 122 | } |
| 123 | |
| 124 | func (r *Reader) subscribe(allAssignments map[string][]PartitionAssignment) { |
| 125 | offsets := make(map[topicPartition]int64) |