MCPcopy
hub / github.com/grafana/tempo / TestTotalJobsIncludesIngester

Function TestTotalJobsIncludesIngester

modules/frontend/search_sharder_test.go:808–873  ·  modules/frontend/search_sharder_test.go::TestTotalJobsIncludesIngester
(t *testing.T)

Source from the content-addressed store, hash-verified

806}
807
808func TestTotalJobsIncludesIngester(t *testing.T) {
809 next := pipeline.AsyncRoundTripperFunc[combiner.PipelineResponse](func(_ pipeline.Request) (pipeline.Responses[combiner.PipelineResponse], error) {
810 resString, err := (&jsonpb.Marshaler{}).MarshalToString(&tempopb.SearchResponse{
811 Metrics: &tempopb.SearchMetrics{},
812 })
813 require.NoError(t, err)
814
815 return pipeline.NewHTTPToAsyncResponse(&http.Response{
816 Body: io.NopCloser(strings.NewReader(resString)),
817 StatusCode: 200,
818 }), nil
819 })
820
821 o, err := overrides.NewOverrides(overrides.Config{}, nil, prometheus.DefaultRegisterer)
822 require.NoError(t, err)
823
824 blockTime := time.Now().Add(-10 * time.Minute).Unix()
825
826 sharder := newAsyncSearchSharder(&mockReader{
827 metas: []*backend.BlockMeta{ // one block with 2 records that are each the target bytes per request will force 2 sub queries
828 {
829 StartTime: time.Unix(blockTime, 0),
830 EndTime: time.Unix(blockTime, 0),
831 Size_: defaultTargetBytesPerRequest * 2,
832 TotalRecords: 2,
833 BlockID: backend.MustParse("00000000-0000-0000-0000-000000000000"),
834 },
835 },
836 }, o, SearchSharderConfig{
837 QueryBackendAfter: 5 * time.Minute,
838 ConcurrentRequests: 1, // 1 concurrent request to force order
839 TargetBytesPerRequest: defaultTargetBytesPerRequest,
840 MostRecentShards: defaultMostRecentShards,
841 IngesterShards: 1,
842 }, nil, newJobsPerQueryHistogram(), log.NewNopLogger())
843 testRT := sharder.Wrap(next)
844
845 // query range straddles the QueryBackendAfter boundary so both backend and ingester are queried
846 path := fmt.Sprintf("/?start=%d&end=%d", blockTime-1, time.Now().Unix())
847 req := httptest.NewRequest("GET", path, nil)
848 ctx := req.Context()
849 ctx = user.InjectOrgID(ctx, "blerg")
850 req = req.WithContext(ctx)
851
852 resps, err := testRT.RoundTrip(pipeline.NewHTTPRequest(req))
853 require.NoError(t, err)
854 // find a response with total jobs > . this is the metadata response
855
856 totalJobs := 0
857 for {
858 res, done, err := resps.Next(context.Background())
859
860 if res.IsMetadata() {
861 searchJobResponse := res.(*combiner.SearchJobResponse)
862 totalJobs += searchJobResponse.TotalJobs
863
864 break
865 }

Callers

nothing calls this directly

Calls 14

ContextMethod · 0.95
NewHTTPToAsyncResponseFunction · 0.92
NewOverridesFunction · 0.92
MustParseFunction · 0.92
NewHTTPRequestFunction · 0.92
newAsyncSearchSharderFunction · 0.85
newJobsPerQueryHistogramFunction · 0.85
AddMethod · 0.65
NowMethod · 0.65
WrapMethod · 0.65
RoundTripMethod · 0.65
NextMethod · 0.65

Tested by

no test coverage detected