(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption)
| 202 | } |
| 203 | |
| 204 | func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error) { |
| 205 | if channelz.IsOn() { |
| 206 | cc.incrCallsStarted() |
| 207 | } |
| 208 | defer func() { |
| 209 | if err != nil { |
| 210 | // Ensure cleanup when stream creation fails. |
| 211 | endOfClientStream(cc, err, opts...) |
| 212 | } |
| 213 | }() |
| 214 | |
| 215 | // Start tracking the RPC for idleness purposes. This is where a stream is |
| 216 | // created for both streaming and unary RPCs, and hence is a good place to |
| 217 | // track active RPC count. |
| 218 | cc.idlenessMgr.OnCallBegin() |
| 219 | |
| 220 | // Add a calloption, to decrement the active call count, that gets executed |
| 221 | // when the RPC completes. |
| 222 | opts = append([]CallOption{OnFinish(func(error) { cc.idlenessMgr.OnCallEnd() })}, opts...) |
| 223 | |
| 224 | if md, added, ok := metadataFromOutgoingContextRaw(ctx); ok { |
| 225 | // validate md |
| 226 | if err := imetadata.Validate(md); err != nil { |
| 227 | return nil, status.Error(codes.Internal, err.Error()) |
| 228 | } |
| 229 | // validate added |
| 230 | for _, kvs := range added { |
| 231 | for i := 0; i < len(kvs); i += 2 { |
| 232 | if err := imetadata.ValidatePair(kvs[i], kvs[i+1]); err != nil { |
| 233 | return nil, status.Error(codes.Internal, err.Error()) |
| 234 | } |
| 235 | } |
| 236 | } |
| 237 | } |
| 238 | // Provide an opportunity for the first RPC to see the first service config |
| 239 | // provided by the resolver. |
| 240 | nameResolutionDelayed, err := cc.waitForResolvedAddrs(ctx) |
| 241 | if err != nil { |
| 242 | return nil, err |
| 243 | } |
| 244 | |
| 245 | mc := &emptyMethodConfig |
| 246 | var onCommit func() |
| 247 | newStream := func(ctx context.Context, done func()) (iresolver.ClientStream, error) { |
| 248 | return newClientStreamWithParams(ctx, desc, cc, method, mc, onCommit, done, nameResolutionDelayed, opts...) |
| 249 | } |
| 250 | |
| 251 | rpcInfo := iresolver.RPCInfo{Context: ctx, Method: method} |
| 252 | rpcConfig, err := cc.safeConfigSelector.SelectConfig(rpcInfo) |
| 253 | if err != nil { |
| 254 | if st, ok := status.FromError(err); ok { |
| 255 | // Restrict the code to the list allowed by gRFC A54. |
| 256 | if istatus.IsRestrictedControlPlaneCode(st) { |
| 257 | err = status.Errorf(codes.Internal, "config selector returned illegal status: %v", err) |
| 258 | } |
| 259 | return nil, err |
| 260 | } |
| 261 | return nil, toRPCErr(err) |
no test coverage detected