(ctx context.Context, startStream func(*ServerStream))
| 390 | } |
| 391 | |
| 392 | func (ht *serverHandlerTransport) HandleStreams(ctx context.Context, startStream func(*ServerStream)) { |
| 393 | // With this transport type there will be exactly 1 stream: this HTTP request. |
| 394 | var cancel context.CancelFunc |
| 395 | if ht.timeoutSet { |
| 396 | ctx, cancel = context.WithTimeout(ctx, ht.timeout) |
| 397 | } else { |
| 398 | ctx, cancel = context.WithCancel(ctx) |
| 399 | } |
| 400 | |
| 401 | // requestOver is closed when the status has been written via WriteStatus. |
| 402 | requestOver := make(chan struct{}) |
| 403 | go func() { |
| 404 | select { |
| 405 | case <-requestOver: |
| 406 | case <-ht.closedCh: |
| 407 | case <-ht.req.Context().Done(): |
| 408 | } |
| 409 | cancel() |
| 410 | ht.Close(errors.New("request is done processing")) |
| 411 | }() |
| 412 | |
| 413 | ctx = metadata.NewIncomingContext(ctx, ht.headerMD) |
| 414 | req := ht.req |
| 415 | s := &ServerStream{ |
| 416 | Stream: Stream{ |
| 417 | id: 0, // irrelevant |
| 418 | ctx: ctx, |
| 419 | method: req.URL.Path, |
| 420 | recvCompress: req.Header.Get("grpc-encoding"), |
| 421 | contentSubtype: ht.contentSubtype, |
| 422 | }, |
| 423 | cancel: cancel, |
| 424 | st: ht, |
| 425 | headerWireLength: 0, // won't have access to header wire length until golang/go#18997. |
| 426 | } |
| 427 | s.Stream.buf.init() |
| 428 | s.readRequester = s |
| 429 | s.trReader = transportReader{ |
| 430 | reader: recvBufferReader{ctx: s.ctx, ctxDone: s.ctx.Done(), recv: &s.buf}, |
| 431 | windowHandler: s, |
| 432 | } |
| 433 | |
| 434 | // readerDone is closed when the Body.Read-ing goroutine exits. |
| 435 | readerDone := make(chan struct{}) |
| 436 | go func() { |
| 437 | defer close(readerDone) |
| 438 | |
| 439 | for { |
| 440 | buf := ht.bufferPool.Get(http2MaxFrameLen) |
| 441 | n, err := req.Body.Read(*buf) |
| 442 | if n > 0 { |
| 443 | *buf = (*buf)[:n] |
| 444 | s.buf.put(recvMsg{buffer: mem.NewBuffer(buf, ht.bufferPool)}) |
| 445 | } else { |
| 446 | ht.bufferPool.Put(buf) |
| 447 | } |
| 448 | if err != nil { |
| 449 | s.buf.put(recvMsg{err: mapRecvMsgError(err)}) |
nothing calls this directly
no test coverage detected