LookupPartition searches for the description of specified partition id.
(ctx context.Context, network string, address string, topic string, partition int)
| 160 | |
| 161 | // LookupPartition searches for the description of specified partition id. |
| 162 | func (d *Dialer) LookupPartition(ctx context.Context, network string, address string, topic string, partition int) (Partition, error) { |
| 163 | c, err := d.DialContext(ctx, network, address) |
| 164 | if err != nil { |
| 165 | return Partition{}, err |
| 166 | } |
| 167 | defer c.Close() |
| 168 | |
| 169 | brkch := make(chan Partition, 1) |
| 170 | errch := make(chan error, 1) |
| 171 | |
| 172 | go func() { |
| 173 | for attempt := 0; true; attempt++ { |
| 174 | if attempt != 0 { |
| 175 | if !sleep(ctx, backoff(attempt, 100*time.Millisecond, 10*time.Second)) { |
| 176 | errch <- ctx.Err() |
| 177 | return |
| 178 | } |
| 179 | } |
| 180 | |
| 181 | partitions, err := c.ReadPartitions(topic) |
| 182 | if err != nil { |
| 183 | if isTemporary(err) { |
| 184 | continue |
| 185 | } |
| 186 | errch <- err |
| 187 | return |
| 188 | } |
| 189 | |
| 190 | for _, p := range partitions { |
| 191 | if p.ID == partition { |
| 192 | brkch <- p |
| 193 | return |
| 194 | } |
| 195 | } |
| 196 | } |
| 197 | |
| 198 | errch <- UnknownTopicOrPartition |
| 199 | }() |
| 200 | |
| 201 | var prt Partition |
| 202 | select { |
| 203 | case prt = <-brkch: |
| 204 | case err = <-errch: |
| 205 | case <-ctx.Done(): |
| 206 | err = ctx.Err() |
| 207 | } |
| 208 | return prt, err |
| 209 | } |
| 210 | |
| 211 | // LookupPartitions returns the list of partitions that exist for the given topic. |
| 212 | func (d *Dialer) LookupPartitions(ctx context.Context, network string, address string, topic string) ([]Partition, error) { |
no test coverage detected