(m any)
| 1495 | } |
| 1496 | |
| 1497 | func (as *addrConnStream) RecvMsg(m any) (err error) { |
| 1498 | defer func() { |
| 1499 | if err != nil || !as.desc.ServerStreams { |
| 1500 | // err != nil or non-server-streaming indicates end of stream. |
| 1501 | as.finish(err) |
| 1502 | } |
| 1503 | }() |
| 1504 | |
| 1505 | if !as.decompressorSet { |
| 1506 | // Block until we receive headers containing received message encoding. |
| 1507 | if ct := as.transportStream.RecvCompress(); ct != "" && ct != encoding.Identity { |
| 1508 | if as.decompressorV0 == nil || as.decompressorV0.Type() != ct { |
| 1509 | // No configured decompressor, or it does not match the incoming |
| 1510 | // message encoding; attempt to find a registered compressor that does. |
| 1511 | as.decompressorV0 = nil |
| 1512 | as.decompressorV1 = encoding.GetCompressor(ct) |
| 1513 | } |
| 1514 | // Validate that the compression method is acceptable for this call. |
| 1515 | if !acceptedCompressorAllows(as.callInfo.acceptedResponseCompressors, ct) { |
| 1516 | return status.Errorf(codes.Internal, "grpc: peer compressed the response with %q which is not allowed by AcceptCompressors", ct) |
| 1517 | } |
| 1518 | } else { |
| 1519 | // No compression is used; disable our decompressor. |
| 1520 | as.decompressorV0 = nil |
| 1521 | } |
| 1522 | // Only initialize this state once per stream. |
| 1523 | as.decompressorSet = true |
| 1524 | } |
| 1525 | if err := recv(&as.parser, as.codec, as.transportStream, as.decompressorV0, m, *as.callInfo.maxReceiveMessageSize, nil, as.decompressorV1, false); err != nil { |
| 1526 | if err == io.EOF { |
| 1527 | if statusErr := as.transportStream.Status().Err(); statusErr != nil { |
| 1528 | return statusErr |
| 1529 | } |
| 1530 | // Received no msg and status OK for non-server streaming rpcs. |
| 1531 | if !as.desc.ServerStreams && !as.receivedFirstMsg { |
| 1532 | return status.Error(codes.Internal, "cardinality violation: received no response message from non-server-streaming RPC") |
| 1533 | } |
| 1534 | return io.EOF // indicates successful end of stream. |
| 1535 | } |
| 1536 | return toRPCErr(err) |
| 1537 | } |
| 1538 | as.receivedFirstMsg = true |
| 1539 | |
| 1540 | if as.desc.ServerStreams { |
| 1541 | // Subsequent messages should be received by subsequent RecvMsg calls. |
| 1542 | return nil |
| 1543 | } |
| 1544 | |
| 1545 | // Special handling for non-server-stream rpcs. |
| 1546 | // This recv expects EOF or errors, so we don't collect inPayload. |
| 1547 | if err := recv(&as.parser, as.codec, as.transportStream, as.decompressorV0, m, *as.callInfo.maxReceiveMessageSize, nil, as.decompressorV1, false); err == io.EOF { |
| 1548 | return as.transportStream.Status().Err() // non-server streaming Recv returns nil on success |
| 1549 | } else if err != nil { |
| 1550 | return toRPCErr(err) |
| 1551 | } |
| 1552 | return status.Error(codes.Internal, "cardinality violation: expected <EOF> for non server-streaming RPCs, but received another message") |
| 1553 | } |
| 1554 |
nothing calls this directly
no test coverage detected