MCPcopy
hub / github.com/grafana/tempo / buildShardedRequests

Method buildShardedRequests

modules/frontend/traceid_sharder.go:83–132  ·  view source on GitHub ↗

buildShardedRequests returns a slice of requests sharded on the precalculated block boundaries

(parent pipeline.Request)

Source from the content-addressed store, hash-verified

81// buildShardedRequests returns a slice of requests sharded on the precalculated
82// block boundaries
83func (s *asyncTraceSharder) buildShardedRequests(parent pipeline.Request) ([]pipeline.Request, error) {
84 userID, err := validation.ExtractValidTenantID(parent.Context())
85 if err != nil {
86 return nil, err
87 }
88
89 reqs := make([]pipeline.Request, 0, s.cfg.QueryShards)
90 params := map[string]string{}
91
92 // Job 0: ingester job
93 req, err := cloneRequestforQueriers(parent, userID, func(r *http.Request) (*http.Request, error) {
94 params[querier.QueryModeKey] = querier.QueryModeIngesters
95 return api.BuildQueryRequest(r, params), nil
96 })
97 if err != nil {
98 return nil, err
99 }
100 reqs = append(reqs, req)
101
102 // Job 1: external job (if enabled)
103 if s.cfg.ExternalEnabled {
104 req, err = cloneRequestforQueriers(parent, userID, func(r *http.Request) (*http.Request, error) {
105 params[querier.QueryModeKey] = querier.QueryModeExternal
106 return api.BuildQueryRequest(r, params), nil
107 })
108 if err != nil {
109 return nil, err
110 }
111 reqs = append(reqs, req)
112 }
113
114 // Jobs 2 to N-1: block queries
115 // When external is enabled, we have N-2 block shards
116 // When external is disabled, we have N-1 block shards
117 // blockBoundaries has length equal to numBlockShards, and we create shards between boundaries
118 for i := 1; i < len(s.blockBoundaries); i++ {
119 i := i // save the loop variable locally to make sure the closure grabs the correct var.
120 pipelineR, _ := cloneRequestforQueriers(parent, userID, func(r *http.Request) (*http.Request, error) {
121 // block queries
122 params[querier.BlockStartKey] = hex.EncodeToString(s.blockBoundaries[i-1])
123 params[querier.BlockEndKey] = hex.EncodeToString(s.blockBoundaries[i])
124 params[querier.QueryModeKey] = querier.QueryModeBlocks
125
126 return api.BuildQueryRequest(r, params), nil
127 })
128 reqs = append(reqs, pipelineR)
129 }
130
131 return reqs, nil
132}

Callers 3

RoundTripMethod · 0.95
TestBuildShardedRequestsFunction · 0.95

Calls 5

ExtractValidTenantIDFunction · 0.92
BuildQueryRequestFunction · 0.92
cloneRequestforQueriersFunction · 0.85
EncodeToStringMethod · 0.80
ContextMethod · 0.65

Tested by 2

TestBuildShardedRequestsFunction · 0.76