MCPcopy
hub / github.com/grpc/grpc-go / newClientStream

Function newClientStream

stream.go:204–284  ·  view source on GitHub ↗
(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption)

Source from the content-addressed store, hash-verified

202}
203
204func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error) {
205 if channelz.IsOn() {
206 cc.incrCallsStarted()
207 }
208 defer func() {
209 if err != nil {
210 // Ensure cleanup when stream creation fails.
211 endOfClientStream(cc, err, opts...)
212 }
213 }()
214
215 // Start tracking the RPC for idleness purposes. This is where a stream is
216 // created for both streaming and unary RPCs, and hence is a good place to
217 // track active RPC count.
218 cc.idlenessMgr.OnCallBegin()
219
220 // Add a calloption, to decrement the active call count, that gets executed
221 // when the RPC completes.
222 opts = append([]CallOption{OnFinish(func(error) { cc.idlenessMgr.OnCallEnd() })}, opts...)
223
224 if md, added, ok := metadataFromOutgoingContextRaw(ctx); ok {
225 // validate md
226 if err := imetadata.Validate(md); err != nil {
227 return nil, status.Error(codes.Internal, err.Error())
228 }
229 // validate added
230 for _, kvs := range added {
231 for i := 0; i < len(kvs); i += 2 {
232 if err := imetadata.ValidatePair(kvs[i], kvs[i+1]); err != nil {
233 return nil, status.Error(codes.Internal, err.Error())
234 }
235 }
236 }
237 }
238 // Provide an opportunity for the first RPC to see the first service config
239 // provided by the resolver.
240 nameResolutionDelayed, err := cc.waitForResolvedAddrs(ctx)
241 if err != nil {
242 return nil, err
243 }
244
245 mc := &emptyMethodConfig
246 var onCommit func()
247 newStream := func(ctx context.Context, done func()) (iresolver.ClientStream, error) {
248 return newClientStreamWithParams(ctx, desc, cc, method, mc, onCommit, done, nameResolutionDelayed, opts...)
249 }
250
251 rpcInfo := iresolver.RPCInfo{Context: ctx, Method: method}
252 rpcConfig, err := cc.safeConfigSelector.SelectConfig(rpcInfo)
253 if err != nil {
254 if st, ok := status.FromError(err); ok {
255 // Restrict the code to the list allowed by gRFC A54.
256 if istatus.IsRestrictedControlPlaneCode(st) {
257 err = status.Errorf(codes.Internal, "config selector returned illegal status: %v", err)
258 }
259 return nil, err
260 }
261 return nil, toRPCErr(err)

Callers 2

NewStreamMethod · 0.85
invokeFunction · 0.85

Calls 15

IsOnFunction · 0.92
ErrorFunction · 0.92
FromErrorFunction · 0.92
ErrorfFunction · 0.92
endOfClientStreamFunction · 0.85
OnFinishFunction · 0.85
toRPCErrFunction · 0.85
newStreamFunction · 0.85
OnCallBeginMethod · 0.80
OnCallEndMethod · 0.80
waitForResolvedAddrsMethod · 0.80

Tested by

no test coverage detected