MCPcopy
hub / github.com/segmentio/kafka-go / unsubscribe

Method unsubscribe

reader.go:111–122  ·  view source on GitHub ↗
()

Source from the content-addressed store, hash-verified

109func (r *Reader) useSyncCommits() bool { return r.config.CommitInterval == 0 }
110
111func (r *Reader) unsubscribe() {
112 r.cancel()
113 r.join.Wait()
114 // it would be interesting to drain the r.msgs channel at this point since
115 // it will contain buffered messages for partitions that may not be
116 // re-assigned to this reader in the next consumer group generation.
117 // however, draining the channel could race with the client calling
118 // ReadMessage, which could result in messages delivered and/or committed
119 // with gaps in the offset. for now, we will err on the side of caution and
120 // potentially have those messages be reprocessed in the next generation by
121 // another consumer to avoid such a race.
122}
123
124func (r *Reader) subscribe(allAssignments map[string][]PartitionAssignment) {
125 offsets := make(map[topicPartition]int64)

Callers 1

runMethod · 0.95

Calls

no outgoing calls

Tested by

no test coverage detected