run manages the ORCA OOB stream on the subchannel.
(ctx context.Context, done chan struct{}, interval time.Duration)
| 170 | |
| 171 | // run manages the ORCA OOB stream on the subchannel. |
| 172 | func (p *producer) run(ctx context.Context, done chan struct{}, interval time.Duration) { |
| 173 | defer close(done) |
| 174 | |
| 175 | runStream := func() error { |
| 176 | resetBackoff, err := p.runStream(ctx, interval) |
| 177 | if status.Code(err) == codes.Unimplemented { |
| 178 | // Unimplemented; do not retry. |
| 179 | logger.Error("Server doesn't support ORCA OOB load reporting protocol; not listening for load reports.") |
| 180 | return err |
| 181 | } |
| 182 | // Retry for all other errors. |
| 183 | if code := status.Code(err); code != codes.Unavailable && code != codes.Canceled { |
| 184 | // TODO: Unavailable and Canceled should also ideally log an error, |
| 185 | // but for now we receive them when shutting down the ClientConn |
| 186 | // (Unavailable if the stream hasn't started yet, and Canceled if it |
| 187 | // happens mid-stream). Once we can determine the state or ensure |
| 188 | // the producer is stopped before the stream ends, we can log an |
| 189 | // error when it's not a natural shutdown. |
| 190 | logger.Error("Received unexpected stream error:", err) |
| 191 | } |
| 192 | if resetBackoff { |
| 193 | return backoff.ErrResetBackoff |
| 194 | } |
| 195 | return nil |
| 196 | } |
| 197 | backoff.RunF(ctx, runStream, p.backoff) |
| 198 | } |
| 199 | |
| 200 | // runStream runs a single stream on the subchannel and returns the resulting |
| 201 | // error, if any, and whether or not the run loop should reset the backoff |
no test coverage detected