(t transport.ServerTransport, stream *transport.ServerStream)
| 1798 | } |
| 1799 | |
| 1800 | func (s *Server) handleStream(t transport.ServerTransport, stream *transport.ServerStream) { |
| 1801 | ctx := stream.Context() |
| 1802 | ctx = contextWithServer(ctx, s) |
| 1803 | if envconfig.LabelServerGoroutines&envconfig.GoroutineLabelServerMethod != 0 { |
| 1804 | // This method always runs in its own goroutine, so we can set a |
| 1805 | // goroutine label without needing to restore a previous context. |
| 1806 | ctx = pprof.WithLabels(ctx, pprof.Labels("grpc.method", stream.Method())) |
| 1807 | pprof.SetGoroutineLabels(ctx) |
| 1808 | } |
| 1809 | var ti *traceInfo |
| 1810 | if EnableTracing { |
| 1811 | tr := newTrace("grpc.Recv."+methodFamily(stream.Method()), stream.Method()) |
| 1812 | ctx = newTraceContext(ctx, tr) |
| 1813 | ti = &traceInfo{ |
| 1814 | tr: tr, |
| 1815 | firstLine: firstLine{ |
| 1816 | client: false, |
| 1817 | remoteAddr: t.Peer().Addr, |
| 1818 | }, |
| 1819 | } |
| 1820 | if dl, ok := ctx.Deadline(); ok { |
| 1821 | ti.firstLine.deadline = time.Until(dl) |
| 1822 | } |
| 1823 | } |
| 1824 | |
| 1825 | sm, found := strings.CutPrefix(stream.Method(), "/") |
| 1826 | if !found { |
| 1827 | s.handleMalformedMethodName(stream, ti) |
| 1828 | return |
| 1829 | } |
| 1830 | pos := strings.LastIndex(sm, "/") |
| 1831 | if pos == -1 { |
| 1832 | s.handleMalformedMethodName(stream, ti) |
| 1833 | return |
| 1834 | } |
| 1835 | service := sm[:pos] |
| 1836 | method := sm[pos+1:] |
| 1837 | |
| 1838 | // FromIncomingContext is expensive: skip if there are no statsHandlers |
| 1839 | if s.statsHandler != nil { |
| 1840 | md, _ := metadata.FromIncomingContext(ctx) |
| 1841 | ctx = s.statsHandler.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: stream.Method()}) |
| 1842 | s.statsHandler.HandleRPC(ctx, &stats.InHeader{ |
| 1843 | FullMethod: stream.Method(), |
| 1844 | RemoteAddr: t.Peer().Addr, |
| 1845 | LocalAddr: t.Peer().LocalAddr, |
| 1846 | Compression: stream.RecvCompress(), |
| 1847 | WireLength: stream.HeaderWireLength(), |
| 1848 | Header: md, |
| 1849 | }) |
| 1850 | } |
| 1851 | // To have calls in stream callouts work. Will delete once all stats handler |
| 1852 | // calls come from the gRPC layer. |
| 1853 | stream.SetContext(ctx) |
| 1854 | |
| 1855 | srv, knownService := s.services[service] |
| 1856 | if knownService { |
| 1857 | if md, ok := srv.methods[method]; ok { |
no test coverage detected