New makes a new Querier. Recent data is queried via the partition ring.
( cfg Config, liveStoreRing ring.ReadRing, liveStoreClientConfig livestore_client.Config, partitionRing *ring.PartitionInstanceRing, queryExternal bool, store storage.Store, limits overrides.Interface, )
| 75 | // |
| 76 | // Recent data is queried via the partition ring. |
| 77 | func New( |
| 78 | cfg Config, |
| 79 | |
| 80 | liveStoreRing ring.ReadRing, |
| 81 | liveStoreClientConfig livestore_client.Config, |
| 82 | partitionRing *ring.PartitionInstanceRing, |
| 83 | |
| 84 | queryExternal bool, |
| 85 | store storage.Store, |
| 86 | limits overrides.Interface, |
| 87 | ) (*Querier, error) { |
| 88 | var liveStoreClientFactory ring_client.PoolAddrFunc = func(addr string) (ring_client.PoolClient, error) { |
| 89 | return livestore_client.New(addr, liveStoreClientConfig) |
| 90 | } |
| 91 | |
| 92 | q := &Querier{ |
| 93 | cfg: cfg, |
| 94 | partitionRing: partitionRing, |
| 95 | liveStorePool: ring_client.NewPool("querier_to_livestore_pool", |
| 96 | liveStoreClientConfig.PoolConfig, |
| 97 | ring_client.NewRingServiceDiscovery(liveStoreRing), |
| 98 | liveStoreClientFactory, |
| 99 | metricMetricsLiveStoreClients, |
| 100 | log.Logger), |
| 101 | engine: traceql.NewEngine(), |
| 102 | store: store, |
| 103 | limits: limits, |
| 104 | } |
| 105 | |
| 106 | if queryExternal { |
| 107 | externalClient, err := external.NewClient(cfg.TraceByID.External.Endpoint, cfg.TraceByID.External.Timeout) |
| 108 | if err != nil { |
| 109 | return nil, fmt.Errorf("failed to create external client: %w", err) |
| 110 | } |
| 111 | q.externalClient = externalClient |
| 112 | } |
| 113 | |
| 114 | q.Service = services.NewBasicService(q.starting, q.running, q.stopping) |
| 115 | return q, nil |
| 116 | } |
| 117 | |
| 118 | func (q *Querier) CreateAndRegisterWorker(handler http.Handler) error { |
| 119 | q.cfg.Worker.MaxConcurrentRequests = q.cfg.MaxConcurrentQueries |