MCPcopy
hub / github.com/grpc/grpc-go / HandleStreams

Method HandleStreams

internal/transport/handler_server.go:392–467  ·  view source on GitHub ↗
(ctx context.Context, startStream func(*ServerStream))

Source from the content-addressed store, hash-verified

390}
391
392func (ht *serverHandlerTransport) HandleStreams(ctx context.Context, startStream func(*ServerStream)) {
393 // With this transport type there will be exactly 1 stream: this HTTP request.
394 var cancel context.CancelFunc
395 if ht.timeoutSet {
396 ctx, cancel = context.WithTimeout(ctx, ht.timeout)
397 } else {
398 ctx, cancel = context.WithCancel(ctx)
399 }
400
401 // requestOver is closed when the status has been written via WriteStatus.
402 requestOver := make(chan struct{})
403 go func() {
404 select {
405 case <-requestOver:
406 case <-ht.closedCh:
407 case <-ht.req.Context().Done():
408 }
409 cancel()
410 ht.Close(errors.New("request is done processing"))
411 }()
412
413 ctx = metadata.NewIncomingContext(ctx, ht.headerMD)
414 req := ht.req
415 s := &ServerStream{
416 Stream: Stream{
417 id: 0, // irrelevant
418 ctx: ctx,
419 method: req.URL.Path,
420 recvCompress: req.Header.Get("grpc-encoding"),
421 contentSubtype: ht.contentSubtype,
422 },
423 cancel: cancel,
424 st: ht,
425 headerWireLength: 0, // won't have access to header wire length until golang/go#18997.
426 }
427 s.Stream.buf.init()
428 s.readRequester = s
429 s.trReader = transportReader{
430 reader: recvBufferReader{ctx: s.ctx, ctxDone: s.ctx.Done(), recv: &s.buf},
431 windowHandler: s,
432 }
433
434 // readerDone is closed when the Body.Read-ing goroutine exits.
435 readerDone := make(chan struct{})
436 go func() {
437 defer close(readerDone)
438
439 for {
440 buf := ht.bufferPool.Get(http2MaxFrameLen)
441 n, err := req.Body.Read(*buf)
442 if n > 0 {
443 *buf = (*buf)[:n]
444 s.buf.put(recvMsg{buffer: mem.NewBuffer(buf, ht.bufferPool)})
445 } else {
446 ht.bufferPool.Put(buf)
447 }
448 if err != nil {
449 s.buf.put(recvMsg{err: mapRecvMsgError(err)})

Callers

nothing calls this directly

Calls 13

CloseMethod · 0.95
runStreamMethod · 0.95
NewIncomingContextFunction · 0.92
NewBufferFunction · 0.92
mapRecvMsgErrorFunction · 0.85
ContextMethod · 0.65
GetMethod · 0.65
ReadMethod · 0.65
PutMethod · 0.65
CloseMethod · 0.65
DoneMethod · 0.45
initMethod · 0.45

Tested by

no test coverage detected