MCPcopy
hub / github.com/grpc/grpc-go / serveStreams

Method serveStreams

server.go:1053–1091  ·  view source on GitHub ↗
(ctx context.Context, st transport.ServerTransport, rawConn net.Conn)

Source from the content-addressed store, hash-verified

1051}
1052
1053func (s *Server) serveStreams(ctx context.Context, st transport.ServerTransport, rawConn net.Conn) {
1054 ctx = transport.SetConnection(ctx, rawConn)
1055 ctx = peer.NewContext(ctx, st.Peer())
1056 if s.statsHandler != nil {
1057 ctx = s.statsHandler.TagConn(ctx, &stats.ConnTagInfo{
1058 RemoteAddr: st.Peer().Addr,
1059 LocalAddr: st.Peer().LocalAddr,
1060 })
1061 s.statsHandler.HandleConn(ctx, &stats.ConnBegin{})
1062 }
1063
1064 defer func() {
1065 st.Close(errors.New("finished serving streams for the server transport"))
1066 if s.statsHandler != nil {
1067 s.statsHandler.HandleConn(ctx, &stats.ConnEnd{})
1068 }
1069 }()
1070
1071 streamQuota := newHandlerQuota(s.opts.maxConcurrentStreams)
1072 st.HandleStreams(ctx, func(stream *transport.ServerStream) {
1073 s.handlersWG.Add(1)
1074 streamQuota.acquire()
1075 f := func() {
1076 defer streamQuota.release()
1077 defer s.handlersWG.Done()
1078 s.handleStream(st, stream)
1079 }
1080
1081 if s.opts.numServerWorkers > 0 {
1082 select {
1083 case s.serverWorkerChannel <- f:
1084 return
1085 default:
1086 // If all stream workers are busy, fallback to the default code path.
1087 }
1088 }
1089 go f()
1090 })
1091}
1092
1093var _ http.Handler = (*Server)(nil)
1094

Callers 2

handleRawConnMethod · 0.95
ServeHTTPMethod · 0.95

Calls 13

handleStreamMethod · 0.95
SetConnectionFunction · 0.92
NewContextFunction · 0.92
newHandlerQuotaFunction · 0.85
acquireMethod · 0.80
releaseMethod · 0.80
PeerMethod · 0.65
TagConnMethod · 0.65
HandleConnMethod · 0.65
CloseMethod · 0.65
HandleStreamsMethod · 0.65
AddMethod · 0.65

Tested by

no test coverage detected