MCPcopy
hub / github.com/grpc/grpc-go / handleStream

Method handleStream

server.go:1800–1891  ·  view source on GitHub ↗
(t transport.ServerTransport, stream *transport.ServerStream)

Source from the content-addressed store, hash-verified

1798}
1799
1800func (s *Server) handleStream(t transport.ServerTransport, stream *transport.ServerStream) {
1801 ctx := stream.Context()
1802 ctx = contextWithServer(ctx, s)
1803 if envconfig.LabelServerGoroutines&envconfig.GoroutineLabelServerMethod != 0 {
1804 // This method always runs in its own goroutine, so we can set a
1805 // goroutine label without needing to restore a previous context.
1806 ctx = pprof.WithLabels(ctx, pprof.Labels("grpc.method", stream.Method()))
1807 pprof.SetGoroutineLabels(ctx)
1808 }
1809 var ti *traceInfo
1810 if EnableTracing {
1811 tr := newTrace("grpc.Recv."+methodFamily(stream.Method()), stream.Method())
1812 ctx = newTraceContext(ctx, tr)
1813 ti = &traceInfo{
1814 tr: tr,
1815 firstLine: firstLine{
1816 client: false,
1817 remoteAddr: t.Peer().Addr,
1818 },
1819 }
1820 if dl, ok := ctx.Deadline(); ok {
1821 ti.firstLine.deadline = time.Until(dl)
1822 }
1823 }
1824
1825 sm, found := strings.CutPrefix(stream.Method(), "/")
1826 if !found {
1827 s.handleMalformedMethodName(stream, ti)
1828 return
1829 }
1830 pos := strings.LastIndex(sm, "/")
1831 if pos == -1 {
1832 s.handleMalformedMethodName(stream, ti)
1833 return
1834 }
1835 service := sm[:pos]
1836 method := sm[pos+1:]
1837
1838 // FromIncomingContext is expensive: skip if there are no statsHandlers
1839 if s.statsHandler != nil {
1840 md, _ := metadata.FromIncomingContext(ctx)
1841 ctx = s.statsHandler.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: stream.Method()})
1842 s.statsHandler.HandleRPC(ctx, &stats.InHeader{
1843 FullMethod: stream.Method(),
1844 RemoteAddr: t.Peer().Addr,
1845 LocalAddr: t.Peer().LocalAddr,
1846 Compression: stream.RecvCompress(),
1847 WireLength: stream.HeaderWireLength(),
1848 Header: md,
1849 })
1850 }
1851 // To have calls in stream callouts work. Will delete once all stats handler
1852 // calls come from the gRPC layer.
1853 stream.SetContext(ctx)
1854
1855 srv, knownService := s.services[service]
1856 if knownService {
1857 if md, ok := srv.methods[method]; ok {

Callers 1

serveStreamsMethod · 0.95

Calls 15

processUnaryRPCMethod · 0.95
processStreamingRPCMethod · 0.95
FromIncomingContextFunction · 0.92
NewFunction · 0.92
WarningfFunction · 0.92
contextWithServerFunction · 0.85
methodFamilyFunction · 0.85
HeaderWireLengthMethod · 0.80
SetContextMethod · 0.80
WriteStatusMethod · 0.80
newTraceFunction · 0.70

Tested by

no test coverage detected