MCPcopy
hub / github.com/grpc/grpc-go / newNonRetryClientStream

Function newNonRetryClientStream

stream.go:1283–1388  ·  view source on GitHub ↗

newNonRetryClientStream creates a ClientStream with the specified transport, on the given addrConn. It's expected that the given transport is either the same one in addrConn, or is already closed. To avoid race, transport is specified separately, instead of using ac.transport. Main difference betw

(ctx context.Context, desc *StreamDesc, method string, t transport.ClientTransport, ac *addrConn, opts ...CallOption)

Source from the content-addressed store, hash-verified

1281// - no service config (or wait for service config)
1282// - no tracing or stats
1283func newNonRetryClientStream(ctx context.Context, desc *StreamDesc, method string, t transport.ClientTransport, ac *addrConn, opts ...CallOption) (_ ClientStream, err error) {
1284 if t == nil {
1285 // TODO: return RPC error here?
1286 return nil, errors.New("transport provided is nil")
1287 }
1288 // defaultCallInfo contains unnecessary info(i.e. failfast, maxRetryRPCBufferSize), so we just initialize an empty struct.
1289 c := &callInfo{}
1290
1291 // Possible context leak:
1292 // The cancel function for the child context we create will only be called
1293 // when RecvMsg returns a non-nil error, if the ClientConn is closed, or if
1294 // an error is generated by SendMsg.
1295 // https://github.com/grpc/grpc-go/issues/1818.
1296 ctx, cancel := context.WithCancel(ctx)
1297 defer func() {
1298 if err != nil {
1299 cancel()
1300 }
1301 }()
1302
1303 for _, o := range opts {
1304 if err := o.before(c); err != nil {
1305 return nil, toRPCErr(err)
1306 }
1307 }
1308 c.maxReceiveMessageSize = getMaxSize(nil, c.maxReceiveMessageSize, defaultClientMaxReceiveMessageSize)
1309 c.maxSendMessageSize = getMaxSize(nil, c.maxSendMessageSize, defaultServerMaxSendMessageSize)
1310 if err := setCallInfoCodec(c); err != nil {
1311 return nil, err
1312 }
1313
1314 callHdr := &transport.CallHdr{
1315 Host: ac.cc.authority,
1316 Method: method,
1317 ContentSubtype: c.contentSubtype,
1318 }
1319
1320 // Set our outgoing compression according to the UseCompressor CallOption, if
1321 // set. In that case, also find the compressor from the encoding package.
1322 // Otherwise, use the compressor configured by the WithCompressor DialOption,
1323 // if set.
1324 var cp Compressor
1325 var comp encoding.Compressor
1326 if ct := c.compressorName; ct != "" {
1327 callHdr.SendCompress = ct
1328 if ct != encoding.Identity {
1329 comp = encoding.GetCompressor(ct)
1330 if comp == nil {
1331 return nil, status.Errorf(codes.Internal, "grpc: Compressor is not installed for requested grpc-encoding %q", ct)
1332 }
1333 }
1334 } else if ac.cc.dopts.compressorV0 != nil {
1335 callHdr.SendCompress = ac.cc.dopts.compressorV0.Type()
1336 cp = ac.cc.dopts.compressorV0
1337 }
1338 if c.creds != nil {
1339 callHdr.Creds = c.creds
1340 }

Callers 2

startHealthCheckMethod · 0.85
NewStreamMethod · 0.85

Calls 15

finishMethod · 0.95
GetCompressorFunction · 0.92
ErrorfFunction · 0.92
ErrorFunction · 0.92
toRPCErrFunction · 0.85
getMaxSizeFunction · 0.85
setCallInfoCodecFunction · 0.85
ErrMethod · 0.80
beforeMethod · 0.65
TypeMethod · 0.65
NewStreamMethod · 0.65
incrCallsStartedMethod · 0.45

Tested by

no test coverage detected