MCPcopy
hub / github.com/grpc/grpc-go / processStreamingRPC

Method processStreamingRPC

server.go:1590–1780  ·  server.go::Server.processStreamingRPC
(ctx context.Context, stream *transport.ServerStream, info *serviceInfo, sd *StreamDesc, trInfo *traceInfo)

Source from the content-addressed store, hash-verified

1588}
1589
1590func (s *Server) processStreamingRPC(ctx context.Context, stream *transport.ServerStream, info *serviceInfo, sd *StreamDesc, trInfo *traceInfo) (err error) {
1591 if channelz.IsOn() {
1592 s.incrCallsStarted()
1593 }
1594 sh := s.statsHandler
1595 var statsBegin *stats.Begin
1596 if sh != nil {
1597 statsBegin = &stats.Begin{
1598 BeginTime: time.Now(),
1599 IsClientStream: sd.ClientStreams,
1600 IsServerStream: sd.ServerStreams,
1601 }
1602 sh.HandleRPC(ctx, statsBegin)
1603 }
1604 ctx = NewContextWithServerTransportStream(ctx, stream)
1605 ss := &serverStream{
1606 ctx: ctx,
1607 s: stream,
1608 p: parser{r: stream, bufferPool: s.opts.bufferPool},
1609 codec: s.getCodec(stream.ContentSubtype()),
1610 desc: sd,
1611 maxReceiveMessageSize: s.opts.maxReceiveMessageSize,
1612 maxSendMessageSize: s.opts.maxSendMessageSize,
1613 trInfo: trInfo,
1614 statsHandler: sh,
1615 }
1616
1617 if sh != nil || trInfo != nil || channelz.IsOn() {
1618 // See comment in processUnaryRPC on defers.
1619 defer func() {
1620 if trInfo != nil {
1621 ss.mu.Lock()
1622 if err != nil && err != io.EOF {
1623 ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []any{err}}, true)
1624 ss.trInfo.tr.SetError()
1625 }
1626 ss.trInfo.tr.Finish()
1627 ss.trInfo.tr = nil
1628 ss.mu.Unlock()
1629 }
1630
1631 if sh != nil {
1632 end := &stats.End{
1633 BeginTime: statsBegin.BeginTime,
1634 EndTime: time.Now(),
1635 }
1636 if err != nil && err != io.EOF {
1637 end.Error = toRPCErr(err)
1638 }
1639 sh.HandleRPC(ctx, end)
1640 }
1641
1642 if channelz.IsOn() {
1643 if err != nil && err != io.EOF {
1644 s.incrCallsFailed()
1645 } else {
1646 s.incrCallsSucceeded()
1647 }

Callers 1

handleStreamMethod · 0.95

Calls 15

incrCallsStartedMethod · 0.95
getCodecMethod · 0.95
incrCallsFailedMethod · 0.95
incrCallsSucceededMethod · 0.95
ContextMethod · 0.95
IsOnFunction · 0.92
GetMethodLoggerFunction · 0.92
FromIncomingContextFunction · 0.92
FromContextFunction · 0.92
GetCompressorFunction · 0.92
NewfFunction · 0.92
ErrorfFunction · 0.92

Tested by

no test coverage detected