ListOffsets fans out across the partition leaders to fetch offsets in parallel. Per-partition results may carry their own Err (e.g. NotLeaderForPartition, UnknownTopicOrPartition) when metadata is stale; the caller can refresh metadata via the underlying client and retry those partitions if needed.
(partitions map[string]map[int32]int64, options *ListOffsetsOptions)
| 40 | // metadata via the underlying client and retry those partitions if needed. The |
| 41 | // retry loop here only covers transport-level failures. |
| 42 | func (ca *clusterAdmin) ListOffsets(partitions map[string]map[int32]int64, options *ListOffsetsOptions) (map[string]map[int32]*OffsetResult, error) { |
| 43 | type topicPartition struct { |
| 44 | topic string |
| 45 | partition int32 |
| 46 | } |
| 47 | |
| 48 | type brokerOffsetRequest struct { |
| 49 | request *OffsetRequest |
| 50 | partitions []topicPartition |
| 51 | } |
| 52 | |
| 53 | type brokerOffsetResult struct { |
| 54 | result map[topicPartition]*OffsetResult |
| 55 | err error |
| 56 | } |
| 57 | |
| 58 | if len(partitions) == 0 { |
| 59 | return nil, ConfigurationError("no partitions provided") |
| 60 | } |
| 61 | |
| 62 | if options == nil { |
| 63 | options = &ListOffsetsOptions{} |
| 64 | } |
| 65 | |
| 66 | allResults := make(map[string]map[int32]*OffsetResult, len(partitions)) |
| 67 | setResult := func(topic string, partition int32, result *OffsetResult) { |
| 68 | if allResults[topic] == nil { |
| 69 | allResults[topic] = make(map[int32]*OffsetResult) |
| 70 | } |
| 71 | allResults[topic][partition] = result |
| 72 | } |
| 73 | |
| 74 | requests := make(map[*Broker]*brokerOffsetRequest) |
| 75 | for topic, topicOffsets := range partitions { |
| 76 | for partition, offsetQuery := range topicOffsets { |
| 77 | broker, _, err := ca.client.LeaderAndEpoch(topic, partition) |
| 78 | if err != nil { |
| 79 | setResult(topic, partition, &OffsetResult{Err: err}) |
| 80 | continue |
| 81 | } |
| 82 | |
| 83 | req := requests[broker] |
| 84 | if req == nil { |
| 85 | req = &brokerOffsetRequest{ |
| 86 | request: NewOffsetRequest(ca.conf.Version), |
| 87 | } |
| 88 | req.request.IsolationLevel = options.IsolationLevel |
| 89 | requests[broker] = req |
| 90 | } |
| 91 | req.request.AddBlock(topic, partition, offsetQuery, 1) |
| 92 | req.partitions = append(req.partitions, topicPartition{topic: topic, partition: partition}) |
| 93 | } |
| 94 | } |
| 95 | |
| 96 | if len(requests) == 0 { |
| 97 | return allResults, nil |
| 98 | } |
| 99 |
nothing calls this directly
no test coverage detected