MCPcopy
hub / github.com/grafana/tempo / NewSearch

Function NewSearch

modules/frontend/combiner/search.go:36–139  ·  view source on GitHub ↗

NewSearch returns a search combiner

(limit int, keepMostRecent bool, marshalingFormat api.MarshallingFormat, padTraceIDs bool)

Source from the content-addressed store, hash-verified

34
35// NewSearch returns a search combiner
36func NewSearch(limit int, keepMostRecent bool, marshalingFormat api.MarshallingFormat, padTraceIDs bool) Combiner {
37 metadataCombiner := traceql.NewMetadataCombiner(limit, keepMostRecent)
38 diffTraces := map[string]struct{}{}
39 completedThroughTracker := &shardtracker.CompletionTracker{}
40 metricsCombiner := NewSearchMetricsCombiner()
41
42 c := &genericCombiner[*tempopb.SearchResponse]{
43 httpStatusCode: 200,
44 new: func() *tempopb.SearchResponse { return &tempopb.SearchResponse{} },
45 current: &tempopb.SearchResponse{Metrics: &tempopb.SearchMetrics{}},
46 combine: func(partial *tempopb.SearchResponse, final *tempopb.SearchResponse, resp PipelineResponse) error {
47 requestIdx, ok := resp.RequestData().(int)
48 if ok && keepMostRecent {
49 completedThroughTracker.AddShardIdx(requestIdx)
50 }
51
52 for _, t := range partial.Traces {
53 if metadataCombiner.AddMetadata(t) {
54 // record modified traces
55 diffTraces[t.TraceID] = struct{}{}
56 }
57 }
58
59 metricsCombiner.Combine(partial.Metrics, resp)
60
61 return nil
62 },
63 metadata: func(resp PipelineResponse, final *tempopb.SearchResponse) error {
64 if sj, ok := resp.(*SearchJobResponse); ok && sj != nil {
65 sjMetrics := &tempopb.SearchMetrics{
66 TotalBlocks: uint32(sj.TotalBlocks), //nolint:gosec
67 TotalJobs: uint32(sj.TotalJobs), //nolint:gosec
68 TotalBlockBytes: sj.TotalBytes,
69 }
70 metricsCombiner.CombineMetadata(sjMetrics, resp)
71
72 if keepMostRecent {
73 completedThroughTracker.AddShards(sj.Shards)
74 }
75 }
76
77 return nil
78 },
79 finalize: func(final *tempopb.SearchResponse) (*tempopb.SearchResponse, error) {
80 // metrics are already combined on the passed in final
81 final.Traces = metadataCombiner.Metadata()
82 final.Metrics = metricsCombiner.Metrics
83 addRootSpanNotReceivedText(final.Traces)
84 if padTraceIDs {
85 padTraceIDsInResponse(final.Traces)
86 }
87 return final, nil
88 },
89 diff: func(current *tempopb.SearchResponse) (*tempopb.SearchResponse, error) {
90 // wipe out any existing traces and recreate from the map
91 diff := &tempopb.SearchResponse{
92 Traces: make([]*tempopb.TraceSearchMetadata, 0, len(diffTraces)),
93 Metrics: metricsCombiner.Metrics,

Calls 15

AddShardIdxMethod · 0.95
AddMetadataMethod · 0.95
CombineMethod · 0.95
CombineMetadataMethod · 0.95
AddShardsMethod · 0.95
MetadataMethod · 0.95
MetadataAfterMethod · 0.95
IsCompleteForMethod · 0.95
NewMetadataCombinerFunction · 0.92
NewSearchMetricsCombinerFunction · 0.85