MCPcopy
hub / github.com/IBM/sarama / ListOffsets

Method ListOffsets

admin_offsets.go:42–153  ·  view source on GitHub ↗

ListOffsets fans out across the partition leaders to fetch offsets in parallel. Per-partition results may carry their own Err (e.g. NotLeaderForPartition, UnknownTopicOrPartition) when metadata is stale; the caller can refresh metadata via the underlying client and retry those partitions if needed.

(partitions map[string]map[int32]int64, options *ListOffsetsOptions)

Source from the content-addressed store, hash-verified

40// metadata via the underlying client and retry those partitions if needed. The
41// retry loop here only covers transport-level failures.
42func (ca *clusterAdmin) ListOffsets(partitions map[string]map[int32]int64, options *ListOffsetsOptions) (map[string]map[int32]*OffsetResult, error) {
43 type topicPartition struct {
44 topic string
45 partition int32
46 }
47
48 type brokerOffsetRequest struct {
49 request *OffsetRequest
50 partitions []topicPartition
51 }
52
53 type brokerOffsetResult struct {
54 result map[topicPartition]*OffsetResult
55 err error
56 }
57
58 if len(partitions) == 0 {
59 return nil, ConfigurationError("no partitions provided")
60 }
61
62 if options == nil {
63 options = &ListOffsetsOptions{}
64 }
65
66 allResults := make(map[string]map[int32]*OffsetResult, len(partitions))
67 setResult := func(topic string, partition int32, result *OffsetResult) {
68 if allResults[topic] == nil {
69 allResults[topic] = make(map[int32]*OffsetResult)
70 }
71 allResults[topic][partition] = result
72 }
73
74 requests := make(map[*Broker]*brokerOffsetRequest)
75 for topic, topicOffsets := range partitions {
76 for partition, offsetQuery := range topicOffsets {
77 broker, _, err := ca.client.LeaderAndEpoch(topic, partition)
78 if err != nil {
79 setResult(topic, partition, &OffsetResult{Err: err})
80 continue
81 }
82
83 req := requests[broker]
84 if req == nil {
85 req = &brokerOffsetRequest{
86 request: NewOffsetRequest(ca.conf.Version),
87 }
88 req.request.IsolationLevel = options.IsolationLevel
89 requests[broker] = req
90 }
91 req.request.AddBlock(topic, partition, offsetQuery, 1)
92 req.partitions = append(req.partitions, topicPartition{topic: topic, partition: partition})
93 }
94 }
95
96 if len(requests) == 0 {
97 return allResults, nil
98 }
99

Callers

nothing calls this directly

Calls 10

retryOnErrorMethod · 0.95
GetBlockMethod · 0.95
ConfigurationErrorTypeAlias · 0.85
NewOffsetRequestFunction · 0.85
OpenMethod · 0.80
GetAvailableOffsetsMethod · 0.80
LeaderAndEpochMethod · 0.65
ConfigMethod · 0.65
AddBlockMethod · 0.45

Tested by

no test coverage detected