MCPcopy
hub / github.com/segmentio/kafka-go / LookupPartition

Method LookupPartition

dialer.go:162–209  ·  view source on GitHub ↗

LookupPartition searches for the description of specified partition id.

(ctx context.Context, network string, address string, topic string, partition int)

Source from the content-addressed store, hash-verified

160
161// LookupPartition searches for the description of specified partition id.
162func (d *Dialer) LookupPartition(ctx context.Context, network string, address string, topic string, partition int) (Partition, error) {
163 c, err := d.DialContext(ctx, network, address)
164 if err != nil {
165 return Partition{}, err
166 }
167 defer c.Close()
168
169 brkch := make(chan Partition, 1)
170 errch := make(chan error, 1)
171
172 go func() {
173 for attempt := 0; true; attempt++ {
174 if attempt != 0 {
175 if !sleep(ctx, backoff(attempt, 100*time.Millisecond, 10*time.Second)) {
176 errch <- ctx.Err()
177 return
178 }
179 }
180
181 partitions, err := c.ReadPartitions(topic)
182 if err != nil {
183 if isTemporary(err) {
184 continue
185 }
186 errch <- err
187 return
188 }
189
190 for _, p := range partitions {
191 if p.ID == partition {
192 brkch <- p
193 return
194 }
195 }
196 }
197
198 errch <- UnknownTopicOrPartition
199 }()
200
201 var prt Partition
202 select {
203 case prt = <-brkch:
204 case err = <-errch:
205 case <-ctx.Done():
206 err = ctx.Err()
207 }
208 return prt, err
209}
210
211// LookupPartitions returns the list of partitions that exist for the given topic.
212func (d *Dialer) LookupPartitions(ctx context.Context, network string, address string, topic string) ([]Partition, error) {

Callers 3

DialLeaderMethod · 0.95
LookupLeaderMethod · 0.95
LookupPartitionFunction · 0.80

Calls 8

DialContextMethod · 0.95
sleepFunction · 0.85
backoffFunction · 0.85
isTemporaryFunction · 0.85
DoneMethod · 0.80
CloseMethod · 0.45
ErrMethod · 0.45
ReadPartitionsMethod · 0.45

Tested by

no test coverage detected