AddResponse is used to add a http response to the combiner.
(r PipelineResponse)
| 63 | |
| 64 | // AddResponse is used to add a http response to the combiner. |
| 65 | func (c *genericCombiner[T]) AddResponse(r PipelineResponse) error { |
| 66 | if r.IsMetadata() && c.metadata != nil { |
| 67 | c.mu.Lock() |
| 68 | defer c.mu.Unlock() |
| 69 | |
| 70 | if err := c.metadata(r, c.current); err != nil { |
| 71 | return fmt.Errorf("error processing metadata: %w", err) |
| 72 | } |
| 73 | return nil |
| 74 | } |
| 75 | |
| 76 | res := r.HTTPResponse() |
| 77 | if res == nil { |
| 78 | return nil |
| 79 | } |
| 80 | |
| 81 | // todo: reevaluate this. should the caller owner the lifecycle of the http.response body? |
| 82 | defer func() { _ = res.Body.Close() }() |
| 83 | |
| 84 | // test shouldQuit and set response all under the same lock. this prevents race conditions where |
| 85 | // two responses can make it pass shouldQuit() with different results. |
| 86 | shouldQuitAndSetResponse := func() (bool, error) { |
| 87 | c.mu.Lock() |
| 88 | defer c.mu.Unlock() |
| 89 | |
| 90 | if c.shouldQuit() { |
| 91 | return true, nil |
| 92 | } |
| 93 | |
| 94 | if res.StatusCode != http.StatusOK { |
| 95 | bytesMsg, err := io.ReadAll(res.Body) |
| 96 | if err != nil { |
| 97 | return true, fmt.Errorf("error reading response body: %w", err) |
| 98 | } |
| 99 | c.httpRespBody = string(bytesMsg) |
| 100 | c.httpStatusCode = res.StatusCode |
| 101 | // don't return error. the error path is reserved for unexpected errors. |
| 102 | // http pipeline errors should be returned through the final response. (Complete/TypedComplete/TypedDiff) |
| 103 | return true, nil |
| 104 | } |
| 105 | |
| 106 | return false, nil |
| 107 | } |
| 108 | |
| 109 | if quit, err := shouldQuitAndSetResponse(); quit { |
| 110 | return err |
| 111 | } |
| 112 | |
| 113 | partial := c.new() // instantiating directly requires additional type constraints. this seemed cleaner: https://stackoverflow.com/questions/69573113/how-can-i-instantiate-a-non-nil-pointer-of-type-argument-with-generic-go |
| 114 | |
| 115 | switch res.Header.Get(api.HeaderContentType) { |
| 116 | case api.HeaderAcceptProtobuf: |
| 117 | b, err := tempo_io.ReadAllWithEstimate(res.Body, res.ContentLength) |
| 118 | if err != nil { |
| 119 | return fmt.Errorf("error reading response body") |
| 120 | } |
| 121 | if err := proto.Unmarshal(b, partial); err != nil { |
| 122 | return fmt.Errorf("error unmarshalling proto response body: %w", err) |
nothing calls this directly
no test coverage detected