MCPcopy
hub / github.com/grpc/grpc-go / recvMsg

Method recvMsg

stream.go:1147–1218  ·  view source on GitHub ↗
(m any, payInfo *payloadInfo)

Source from the content-addressed store, hash-verified

1145}
1146
1147func (a *csAttempt) recvMsg(m any, payInfo *payloadInfo) (err error) {
1148 cs := a.cs
1149 if a.statsHandler != nil && payInfo == nil {
1150 payInfo = &payloadInfo{}
1151 defer payInfo.free()
1152 }
1153
1154 if !a.decompressorSet {
1155 // Block until we receive headers containing received message encoding.
1156 if ct := a.transportStream.RecvCompress(); ct != "" && ct != encoding.Identity {
1157 if a.decompressorV0 == nil || a.decompressorV0.Type() != ct {
1158 // No configured decompressor, or it does not match the incoming
1159 // message encoding; attempt to find a registered compressor that does.
1160 a.decompressorV0 = nil
1161 a.decompressorV1 = encoding.GetCompressor(ct)
1162 }
1163 // Validate that the compression method is acceptable for this call.
1164 if !acceptedCompressorAllows(cs.callInfo.acceptedResponseCompressors, ct) {
1165 return status.Errorf(codes.Internal, "grpc: peer compressed the response with %q which is not allowed by AcceptCompressors", ct)
1166 }
1167 } else {
1168 // No compression is used; disable our decompressor.
1169 a.decompressorV0 = nil
1170 }
1171 // Only initialize this state once per stream.
1172 a.decompressorSet = true
1173 }
1174 if err := recv(&a.parser, cs.codec, a.transportStream, a.decompressorV0, m, *cs.callInfo.maxReceiveMessageSize, payInfo, a.decompressorV1, false); err != nil {
1175 if err == io.EOF {
1176 if statusErr := a.transportStream.Status().Err(); statusErr != nil {
1177 return statusErr
1178 }
1179 // Received no msg and status OK for non-server streaming rpcs.
1180 if !cs.desc.ServerStreams && !cs.receivedFirstMsg {
1181 return status.Error(codes.Internal, "cardinality violation: received no response message from non-server-streaming RPC")
1182 }
1183 return io.EOF // indicates successful end of stream.
1184 }
1185
1186 return toRPCErr(err)
1187 }
1188 cs.receivedFirstMsg = true
1189 if a.trInfo != nil {
1190 a.mu.Lock()
1191 if a.trInfo.tr != nil {
1192 a.trInfo.tr.LazyLog(&payload{sent: false, msg: m}, true)
1193 }
1194 a.mu.Unlock()
1195 }
1196 if a.statsHandler != nil {
1197 a.statsHandler.HandleRPC(a.ctx, &stats.InPayload{
1198 Client: true,
1199 RecvTime: time.Now(),
1200 Payload: m,
1201 WireLength: payInfo.compressedLength + headerLen,
1202 CompressedLength: payInfo.compressedLength,
1203 Length: payInfo.uncompressedBytes.Len(),
1204 })

Callers 1

RecvMsgMethod · 0.45

Calls 15

GetCompressorFunction · 0.92
ErrorfFunction · 0.92
ErrorFunction · 0.92
acceptedCompressorAllowsFunction · 0.85
recvFunction · 0.85
toRPCErrFunction · 0.85
freeMethod · 0.80
ErrMethod · 0.80
StatusMethod · 0.80
NowMethod · 0.80
RecvCompressMethod · 0.65
TypeMethod · 0.65

Tested by

no test coverage detected