(header []byte)
| 198 | } |
| 199 | |
| 200 | func (r *recvBufferReader) readMessageHeaderClient(header []byte) (n int, err error) { |
| 201 | // If the context is canceled, then closes the stream with nil metadata. |
| 202 | // closeStream writes its error parameter to r.recv as a recvMsg. |
| 203 | // r.readAdditional acts on that message and returns the necessary error. |
| 204 | select { |
| 205 | case <-r.ctxDone: |
| 206 | // Note that this adds the ctx error to the end of recv buffer, and |
| 207 | // reads from the head. This will delay the error until recv buffer is |
| 208 | // empty, thus will delay ctx cancellation in Recv(). |
| 209 | // |
| 210 | // It's done this way to fix a race between ctx cancel and trailer. The |
| 211 | // race was, stream.Recv() may return ctx error if ctxDone wins the |
| 212 | // race, but stream.Trailer() may return a non-nil md because the stream |
| 213 | // was not marked as done when trailer is received. This closeStream |
| 214 | // call will mark stream as done, thus fix the race. |
| 215 | // |
| 216 | // TODO: delaying ctx error seems like a unnecessary side effect. What |
| 217 | // we really want is to mark the stream as done, and return ctx error |
| 218 | // faster. |
| 219 | r.clientStream.Close(ContextErr(r.ctx.Err())) |
| 220 | m := <-r.recv.get() |
| 221 | return r.readMessageHeaderAdditional(m, header) |
| 222 | case m := <-r.recv.get(): |
| 223 | return r.readMessageHeaderAdditional(m, header) |
| 224 | } |
| 225 | } |
| 226 | |
| 227 | func (r *recvBufferReader) readClient(n int) (buf mem.Buffer, err error) { |
| 228 | // If the context is canceled, then closes the stream with nil metadata. |
no test coverage detected