MCPcopy
hub / github.com/segmentio/kafka-go / subscribe

Method subscribe

reader.go:124–143  ·  view source on GitHub ↗
(allAssignments map[string][]PartitionAssignment)

Source from the content-addressed store, hash-verified

122}
123
124func (r *Reader) subscribe(allAssignments map[string][]PartitionAssignment) {
125 offsets := make(map[topicPartition]int64)
126 for topic, assignments := range allAssignments {
127 for _, assignment := range assignments {
128 key := topicPartition{
129 topic: topic,
130 partition: int32(assignment.ID),
131 }
132 offsets[key] = assignment.Offset
133 }
134 }
135
136 r.mutex.Lock()
137 r.start(offsets)
138 r.mutex.Unlock()
139
140 r.withLogger(func(l Logger) {
141 l.Printf("subscribed to topics and partitions: %+v", offsets)
142 })
143}
144
145// commitOffsetsWithRetry attempts to commit the specified offsets and retries
146// up to the specified number of times.

Callers 1

runMethod · 0.95

Calls 3

startMethod · 0.95
withLoggerMethod · 0.95
PrintfMethod · 0.65

Tested by

no test coverage detected