| 237 | } |
| 238 | |
| 239 | func (f *fakeORCAService) StreamCoreMetrics(req *v3orcaservicepb.OrcaLoadReportRequest, stream v3orcaservicegrpc.OpenRcaService_StreamCoreMetricsServer) error { |
| 240 | select { |
| 241 | case f.reqCh <- req: |
| 242 | case <-stream.Context().Done(): |
| 243 | return stream.Context().Err() |
| 244 | } |
| 245 | for { |
| 246 | var resp any |
| 247 | select { |
| 248 | case resp = <-f.respCh: |
| 249 | case <-stream.Context().Done(): |
| 250 | return stream.Context().Err() |
| 251 | } |
| 252 | |
| 253 | if err, ok := resp.(error); ok { |
| 254 | return err |
| 255 | } |
| 256 | if err := stream.Send(resp.(*v3orcapb.OrcaLoadReport)); err != nil { |
| 257 | // In the event that a stream error occurs, a new stream will have |
| 258 | // been created that was waiting for this response message. Push |
| 259 | // it back onto the channel and return. |
| 260 | // |
| 261 | // This happens because we range over respCh. If we changed to |
| 262 | // instead select on respCh + stream.Context(), the same situation |
| 263 | // could still occur due to a race between noticing the two events, |
| 264 | // so such a workaround would still be needed to prevent flakiness. |
| 265 | f.respCh <- resp |
| 266 | return err |
| 267 | } |
| 268 | } |
| 269 | } |
| 270 | |
| 271 | // TestProducerBackoff verifies that the ORCA producer applies the proper |
| 272 | // backoff after stream failures. |