(ctx context.Context, stream *transport.ServerStream, info *serviceInfo, sd *StreamDesc, trInfo *traceInfo)
| 1588 | } |
| 1589 | |
| 1590 | func (s *Server) processStreamingRPC(ctx context.Context, stream *transport.ServerStream, info *serviceInfo, sd *StreamDesc, trInfo *traceInfo) (err error) { |
| 1591 | if channelz.IsOn() { |
| 1592 | s.incrCallsStarted() |
| 1593 | } |
| 1594 | sh := s.statsHandler |
| 1595 | var statsBegin *stats.Begin |
| 1596 | if sh != nil { |
| 1597 | statsBegin = &stats.Begin{ |
| 1598 | BeginTime: time.Now(), |
| 1599 | IsClientStream: sd.ClientStreams, |
| 1600 | IsServerStream: sd.ServerStreams, |
| 1601 | } |
| 1602 | sh.HandleRPC(ctx, statsBegin) |
| 1603 | } |
| 1604 | ctx = NewContextWithServerTransportStream(ctx, stream) |
| 1605 | ss := &serverStream{ |
| 1606 | ctx: ctx, |
| 1607 | s: stream, |
| 1608 | p: parser{r: stream, bufferPool: s.opts.bufferPool}, |
| 1609 | codec: s.getCodec(stream.ContentSubtype()), |
| 1610 | desc: sd, |
| 1611 | maxReceiveMessageSize: s.opts.maxReceiveMessageSize, |
| 1612 | maxSendMessageSize: s.opts.maxSendMessageSize, |
| 1613 | trInfo: trInfo, |
| 1614 | statsHandler: sh, |
| 1615 | } |
| 1616 | |
| 1617 | if sh != nil || trInfo != nil || channelz.IsOn() { |
| 1618 | // See comment in processUnaryRPC on defers. |
| 1619 | defer func() { |
| 1620 | if trInfo != nil { |
| 1621 | ss.mu.Lock() |
| 1622 | if err != nil && err != io.EOF { |
| 1623 | ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []any{err}}, true) |
| 1624 | ss.trInfo.tr.SetError() |
| 1625 | } |
| 1626 | ss.trInfo.tr.Finish() |
| 1627 | ss.trInfo.tr = nil |
| 1628 | ss.mu.Unlock() |
| 1629 | } |
| 1630 | |
| 1631 | if sh != nil { |
| 1632 | end := &stats.End{ |
| 1633 | BeginTime: statsBegin.BeginTime, |
| 1634 | EndTime: time.Now(), |
| 1635 | } |
| 1636 | if err != nil && err != io.EOF { |
| 1637 | end.Error = toRPCErr(err) |
| 1638 | } |
| 1639 | sh.HandleRPC(ctx, end) |
| 1640 | } |
| 1641 | |
| 1642 | if channelz.IsOn() { |
| 1643 | if err != nil && err != io.EOF { |
| 1644 | s.incrCallsFailed() |
| 1645 | } else { |
| 1646 | s.incrCallsSucceeded() |
| 1647 | } |
no test coverage detected