MCPcopy
hub / github.com/grafana/tempo / AddResponse

Method AddResponse

modules/frontend/combiner/trace_by_id.go:56–106  ·  view source on GitHub ↗
(r PipelineResponse)

Source from the content-addressed store, hash-verified

54}
55
56func (c *TraceByIDCombiner) AddResponse(r PipelineResponse) error {
57 c.mu.Lock()
58 defer c.mu.Unlock()
59
60 if c.shouldQuit() {
61 return nil
62 }
63
64 res := r.HTTPResponse()
65 if res.StatusCode == http.StatusNotFound {
66 // 404s are not considered errors, so we don't need to do anything.
67 return nil
68 }
69 c.code = res.StatusCode
70
71 if res.StatusCode != http.StatusOK {
72 bytesMsg, err := io.ReadAll(res.Body)
73 if err != nil {
74 return fmt.Errorf("error reading response body: %w", err)
75 }
76 c.statusMessage = string(bytesMsg)
77 return nil
78 }
79
80 // Read the body
81 buff, err := tempo_io.ReadAllWithEstimate(res.Body, res.ContentLength)
82 if err != nil {
83 c.statusMessage = internalErrorMsg
84 return fmt.Errorf("error reading response body: %w", err)
85 }
86 _ = res.Body.Close()
87
88 // Unmarshal the body
89 resp := &tempopb.TraceByIDResponse{}
90 err = resp.Unmarshal(buff)
91 if err != nil {
92 c.statusMessage = internalErrorMsg
93 return fmt.Errorf("error unmarshalling response body: %w", err)
94 }
95
96 // Consume the trace
97 _, err = c.c.Consume(resp.Trace)
98 if errors.Is(err, trace.ErrTraceTooLarge) {
99 c.code = http.StatusUnprocessableEntity
100 c.statusMessage = fmt.Sprint(err)
101 return nil
102 }
103 c.MetricsCombiner.Combine(resp.Metrics, r)
104
105 return err
106}
107
108func (c *TraceByIDCombiner) HTTPFinal() (*http.Response, error) {
109 c.mu.Lock()

Callers

nothing calls this directly

Calls 7

shouldQuitMethod · 0.95
UnmarshalMethod · 0.95
HTTPResponseMethod · 0.65
ReadAllMethod · 0.65
CloseMethod · 0.65
CombineMethod · 0.65
ConsumeMethod · 0.45

Tested by

no test coverage detected