buildShardedRequests returns a slice of requests sharded on the precalculated block boundaries
(parent pipeline.Request)
| 81 | // buildShardedRequests returns a slice of requests sharded on the precalculated |
| 82 | // block boundaries |
| 83 | func (s *asyncTraceSharder) buildShardedRequests(parent pipeline.Request) ([]pipeline.Request, error) { |
| 84 | userID, err := validation.ExtractValidTenantID(parent.Context()) |
| 85 | if err != nil { |
| 86 | return nil, err |
| 87 | } |
| 88 | |
| 89 | reqs := make([]pipeline.Request, 0, s.cfg.QueryShards) |
| 90 | params := map[string]string{} |
| 91 | |
| 92 | // Job 0: ingester job |
| 93 | req, err := cloneRequestforQueriers(parent, userID, func(r *http.Request) (*http.Request, error) { |
| 94 | params[querier.QueryModeKey] = querier.QueryModeIngesters |
| 95 | return api.BuildQueryRequest(r, params), nil |
| 96 | }) |
| 97 | if err != nil { |
| 98 | return nil, err |
| 99 | } |
| 100 | reqs = append(reqs, req) |
| 101 | |
| 102 | // Job 1: external job (if enabled) |
| 103 | if s.cfg.ExternalEnabled { |
| 104 | req, err = cloneRequestforQueriers(parent, userID, func(r *http.Request) (*http.Request, error) { |
| 105 | params[querier.QueryModeKey] = querier.QueryModeExternal |
| 106 | return api.BuildQueryRequest(r, params), nil |
| 107 | }) |
| 108 | if err != nil { |
| 109 | return nil, err |
| 110 | } |
| 111 | reqs = append(reqs, req) |
| 112 | } |
| 113 | |
| 114 | // Jobs 2 to N-1: block queries |
| 115 | // When external is enabled, we have N-2 block shards |
| 116 | // When external is disabled, we have N-1 block shards |
| 117 | // blockBoundaries has length equal to numBlockShards, and we create shards between boundaries |
| 118 | for i := 1; i < len(s.blockBoundaries); i++ { |
| 119 | i := i // save the loop variable locally to make sure the closure grabs the correct var. |
| 120 | pipelineR, _ := cloneRequestforQueriers(parent, userID, func(r *http.Request) (*http.Request, error) { |
| 121 | // block queries |
| 122 | params[querier.BlockStartKey] = hex.EncodeToString(s.blockBoundaries[i-1]) |
| 123 | params[querier.BlockEndKey] = hex.EncodeToString(s.blockBoundaries[i]) |
| 124 | params[querier.QueryModeKey] = querier.QueryModeBlocks |
| 125 | |
| 126 | return api.BuildQueryRequest(r, params), nil |
| 127 | }) |
| 128 | reqs = append(reqs, pipelineR) |
| 129 | } |
| 130 | |
| 131 | return reqs, nil |
| 132 | } |