(n int)
| 225 | } |
| 226 | |
| 227 | func (r *recvBufferReader) readClient(n int) (buf mem.Buffer, err error) { |
| 228 | // If the context is canceled, then closes the stream with nil metadata. |
| 229 | // closeStream writes its error parameter to r.recv as a recvMsg. |
| 230 | // r.readAdditional acts on that message and returns the necessary error. |
| 231 | select { |
| 232 | case <-r.ctxDone: |
| 233 | // Note that this adds the ctx error to the end of recv buffer, and |
| 234 | // reads from the head. This will delay the error until recv buffer is |
| 235 | // empty, thus will delay ctx cancellation in Recv(). |
| 236 | // |
| 237 | // It's done this way to fix a race between ctx cancel and trailer. The |
| 238 | // race was, stream.Recv() may return ctx error if ctxDone wins the |
| 239 | // race, but stream.Trailer() may return a non-nil md because the stream |
| 240 | // was not marked as done when trailer is received. This closeStream |
| 241 | // call will mark stream as done, thus fix the race. |
| 242 | // |
| 243 | // TODO: delaying ctx error seems like a unnecessary side effect. What |
| 244 | // we really want is to mark the stream as done, and return ctx error |
| 245 | // faster. |
| 246 | r.clientStream.Close(ContextErr(r.ctx.Err())) |
| 247 | m := <-r.recv.get() |
| 248 | return r.readAdditional(m, n) |
| 249 | case m := <-r.recv.get(): |
| 250 | return r.readAdditional(m, n) |
| 251 | } |
| 252 | } |
| 253 | |
| 254 | func (r *recvBufferReader) readMessageHeaderAdditional(m recvMsg, header []byte) (n int, err error) { |
| 255 | r.recv.load() |
no test coverage detected