MCPcopy
hub / github.com/grpc/grpc-go / newClientStreamWithParams

Function newClientStreamWithParams

stream.go:286–436  ·  view source on GitHub ↗
(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, mc *serviceconfig.MethodConfig, onCommit, doneFunc func(), nameResolutionDelayed bool, opts ...CallOption)

Source from the content-addressed store, hash-verified

284}
285
286func newClientStreamWithParams(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, mc *serviceconfig.MethodConfig, onCommit, doneFunc func(), nameResolutionDelayed bool, opts ...CallOption) (_ iresolver.ClientStream, err error) {
287 callInfo := defaultCallInfo()
288 if mc.WaitForReady != nil {
289 callInfo.failFast = !*mc.WaitForReady
290 }
291
292 // Possible context leak:
293 // The cancel function for the child context we create will only be called
294 // when RecvMsg returns a non-nil error, if the ClientConn is closed, or if
295 // an error is generated by SendMsg.
296 // https://github.com/grpc/grpc-go/issues/1818.
297 var cancel context.CancelFunc
298 if mc.Timeout != nil && *mc.Timeout >= 0 {
299 ctx, cancel = context.WithTimeout(ctx, *mc.Timeout)
300 } else {
301 ctx, cancel = context.WithCancel(ctx)
302 }
303 defer func() {
304 if err != nil {
305 cancel()
306 }
307 }()
308
309 for _, o := range opts {
310 if err := o.before(callInfo); err != nil {
311 return nil, toRPCErr(err)
312 }
313 }
314 callInfo.maxSendMessageSize = getMaxSize(mc.MaxReqSize, callInfo.maxSendMessageSize, defaultClientMaxSendMessageSize)
315 callInfo.maxReceiveMessageSize = getMaxSize(mc.MaxRespSize, callInfo.maxReceiveMessageSize, defaultClientMaxReceiveMessageSize)
316 if err := setCallInfoCodec(callInfo); err != nil {
317 return nil, err
318 }
319
320 callHdr := &transport.CallHdr{
321 Host: cc.authority,
322 Method: method,
323 ContentSubtype: callInfo.contentSubtype,
324 DoneFunc: doneFunc,
325 Authority: callInfo.authority,
326 }
327 if allowed := callInfo.acceptedResponseCompressors; len(allowed) > 0 {
328 headerValue := strings.Join(allowed, ",")
329 callHdr.AcceptedCompressors = &headerValue
330 }
331
332 // Set our outgoing compression according to the UseCompressor CallOption, if
333 // set. In that case, also find the compressor from the encoding package.
334 // Otherwise, use the compressor configured by the WithCompressor DialOption,
335 // if set.
336 var compressorV0 Compressor
337 var compressorV1 encoding.Compressor
338 if ct := callInfo.compressorName; ct != "" {
339 callHdr.SendCompress = ct
340 if ct != encoding.Identity {
341 compressorV1 = encoding.GetCompressor(ct)
342 if compressorV1 == nil {
343 return nil, status.Errorf(codes.Internal, "grpc: Compressor is not installed for requested grpc-encoding %q", ct)

Callers 1

newClientStreamFunction · 0.85

Calls 15

withRetryMethod · 0.95
bufferForRetryLockedMethod · 0.95
finishMethod · 0.95
GetCompressorFunction · 0.92
ErrorfFunction · 0.92
GetMethodLoggerFunction · 0.92
FromOutgoingContextFunction · 0.92
defaultCallInfoFunction · 0.85
toRPCErrFunction · 0.85
getMaxSizeFunction · 0.85
setCallInfoCodecFunction · 0.85
JoinMethod · 0.80

Tested by

no test coverage detected