newNonRetryClientStream creates a ClientStream with the specified transport, on the given addrConn. It's expected that the given transport is either the same one in addrConn, or is already closed. To avoid race, transport is specified separately, instead of using ac.transport. Main difference betw
(ctx context.Context, desc *StreamDesc, method string, t transport.ClientTransport, ac *addrConn, opts ...CallOption)
| 1281 | // - no service config (or wait for service config) |
| 1282 | // - no tracing or stats |
| 1283 | func newNonRetryClientStream(ctx context.Context, desc *StreamDesc, method string, t transport.ClientTransport, ac *addrConn, opts ...CallOption) (_ ClientStream, err error) { |
| 1284 | if t == nil { |
| 1285 | // TODO: return RPC error here? |
| 1286 | return nil, errors.New("transport provided is nil") |
| 1287 | } |
| 1288 | // defaultCallInfo contains unnecessary info(i.e. failfast, maxRetryRPCBufferSize), so we just initialize an empty struct. |
| 1289 | c := &callInfo{} |
| 1290 | |
| 1291 | // Possible context leak: |
| 1292 | // The cancel function for the child context we create will only be called |
| 1293 | // when RecvMsg returns a non-nil error, if the ClientConn is closed, or if |
| 1294 | // an error is generated by SendMsg. |
| 1295 | // https://github.com/grpc/grpc-go/issues/1818. |
| 1296 | ctx, cancel := context.WithCancel(ctx) |
| 1297 | defer func() { |
| 1298 | if err != nil { |
| 1299 | cancel() |
| 1300 | } |
| 1301 | }() |
| 1302 | |
| 1303 | for _, o := range opts { |
| 1304 | if err := o.before(c); err != nil { |
| 1305 | return nil, toRPCErr(err) |
| 1306 | } |
| 1307 | } |
| 1308 | c.maxReceiveMessageSize = getMaxSize(nil, c.maxReceiveMessageSize, defaultClientMaxReceiveMessageSize) |
| 1309 | c.maxSendMessageSize = getMaxSize(nil, c.maxSendMessageSize, defaultServerMaxSendMessageSize) |
| 1310 | if err := setCallInfoCodec(c); err != nil { |
| 1311 | return nil, err |
| 1312 | } |
| 1313 | |
| 1314 | callHdr := &transport.CallHdr{ |
| 1315 | Host: ac.cc.authority, |
| 1316 | Method: method, |
| 1317 | ContentSubtype: c.contentSubtype, |
| 1318 | } |
| 1319 | |
| 1320 | // Set our outgoing compression according to the UseCompressor CallOption, if |
| 1321 | // set. In that case, also find the compressor from the encoding package. |
| 1322 | // Otherwise, use the compressor configured by the WithCompressor DialOption, |
| 1323 | // if set. |
| 1324 | var cp Compressor |
| 1325 | var comp encoding.Compressor |
| 1326 | if ct := c.compressorName; ct != "" { |
| 1327 | callHdr.SendCompress = ct |
| 1328 | if ct != encoding.Identity { |
| 1329 | comp = encoding.GetCompressor(ct) |
| 1330 | if comp == nil { |
| 1331 | return nil, status.Errorf(codes.Internal, "grpc: Compressor is not installed for requested grpc-encoding %q", ct) |
| 1332 | } |
| 1333 | } |
| 1334 | } else if ac.cc.dopts.compressorV0 != nil { |
| 1335 | callHdr.SendCompress = ac.cc.dopts.compressorV0.Type() |
| 1336 | cp = ac.cc.dopts.compressorV0 |
| 1337 | } |
| 1338 | if c.creds != nil { |
| 1339 | callHdr.Creds = c.creds |
| 1340 | } |
no test coverage detected