runStream runs a single stream on the subchannel and returns the resulting error, if any, and whether or not the run loop should reset the backoff timer to zero or advance it.
(ctx context.Context, interval time.Duration)
| 201 | // error, if any, and whether or not the run loop should reset the backoff |
| 202 | // timer to zero or advance it. |
| 203 | func (p *producer) runStream(ctx context.Context, interval time.Duration) (resetBackoff bool, err error) { |
| 204 | streamCtx, cancel := context.WithCancel(ctx) |
| 205 | defer cancel() |
| 206 | stream, err := p.client.StreamCoreMetrics(streamCtx, &v3orcaservicepb.OrcaLoadReportRequest{ |
| 207 | ReportInterval: durationpb.New(interval), |
| 208 | }) |
| 209 | if err != nil { |
| 210 | return false, err |
| 211 | } |
| 212 | |
| 213 | for { |
| 214 | report, err := stream.Recv() |
| 215 | if err != nil { |
| 216 | return resetBackoff, err |
| 217 | } |
| 218 | resetBackoff = true |
| 219 | p.mu.Lock() |
| 220 | for l := range p.listeners { |
| 221 | l.OnLoadReport(report) |
| 222 | } |
| 223 | p.mu.Unlock() |
| 224 | } |
| 225 | } |
no test coverage detected