MCPcopy
hub / github.com/grpc/grpc-go / RecvMsg

Method RecvMsg

stream.go:1497–1553  ·  view source on GitHub ↗
(m any)

Source from the content-addressed store, hash-verified

1495}
1496
1497func (as *addrConnStream) RecvMsg(m any) (err error) {
1498 defer func() {
1499 if err != nil || !as.desc.ServerStreams {
1500 // err != nil or non-server-streaming indicates end of stream.
1501 as.finish(err)
1502 }
1503 }()
1504
1505 if !as.decompressorSet {
1506 // Block until we receive headers containing received message encoding.
1507 if ct := as.transportStream.RecvCompress(); ct != "" && ct != encoding.Identity {
1508 if as.decompressorV0 == nil || as.decompressorV0.Type() != ct {
1509 // No configured decompressor, or it does not match the incoming
1510 // message encoding; attempt to find a registered compressor that does.
1511 as.decompressorV0 = nil
1512 as.decompressorV1 = encoding.GetCompressor(ct)
1513 }
1514 // Validate that the compression method is acceptable for this call.
1515 if !acceptedCompressorAllows(as.callInfo.acceptedResponseCompressors, ct) {
1516 return status.Errorf(codes.Internal, "grpc: peer compressed the response with %q which is not allowed by AcceptCompressors", ct)
1517 }
1518 } else {
1519 // No compression is used; disable our decompressor.
1520 as.decompressorV0 = nil
1521 }
1522 // Only initialize this state once per stream.
1523 as.decompressorSet = true
1524 }
1525 if err := recv(&as.parser, as.codec, as.transportStream, as.decompressorV0, m, *as.callInfo.maxReceiveMessageSize, nil, as.decompressorV1, false); err != nil {
1526 if err == io.EOF {
1527 if statusErr := as.transportStream.Status().Err(); statusErr != nil {
1528 return statusErr
1529 }
1530 // Received no msg and status OK for non-server streaming rpcs.
1531 if !as.desc.ServerStreams && !as.receivedFirstMsg {
1532 return status.Error(codes.Internal, "cardinality violation: received no response message from non-server-streaming RPC")
1533 }
1534 return io.EOF // indicates successful end of stream.
1535 }
1536 return toRPCErr(err)
1537 }
1538 as.receivedFirstMsg = true
1539
1540 if as.desc.ServerStreams {
1541 // Subsequent messages should be received by subsequent RecvMsg calls.
1542 return nil
1543 }
1544
1545 // Special handling for non-server-stream rpcs.
1546 // This recv expects EOF or errors, so we don't collect inPayload.
1547 if err := recv(&as.parser, as.codec, as.transportStream, as.decompressorV0, m, *as.callInfo.maxReceiveMessageSize, nil, as.decompressorV1, false); err == io.EOF {
1548 return as.transportStream.Status().Err() // non-server streaming Recv returns nil on success
1549 } else if err != nil {
1550 return toRPCErr(err)
1551 }
1552 return status.Error(codes.Internal, "cardinality violation: expected <EOF> for non server-streaming RPCs, but received another message")
1553}
1554

Callers

nothing calls this directly

Calls 11

finishMethod · 0.95
GetCompressorFunction · 0.92
ErrorfFunction · 0.92
ErrorFunction · 0.92
acceptedCompressorAllowsFunction · 0.85
recvFunction · 0.85
toRPCErrFunction · 0.85
ErrMethod · 0.80
StatusMethod · 0.80
RecvCompressMethod · 0.65
TypeMethod · 0.65

Tested by

no test coverage detected