(m any, payInfo *payloadInfo)
| 1145 | } |
| 1146 | |
| 1147 | func (a *csAttempt) recvMsg(m any, payInfo *payloadInfo) (err error) { |
| 1148 | cs := a.cs |
| 1149 | if a.statsHandler != nil && payInfo == nil { |
| 1150 | payInfo = &payloadInfo{} |
| 1151 | defer payInfo.free() |
| 1152 | } |
| 1153 | |
| 1154 | if !a.decompressorSet { |
| 1155 | // Block until we receive headers containing received message encoding. |
| 1156 | if ct := a.transportStream.RecvCompress(); ct != "" && ct != encoding.Identity { |
| 1157 | if a.decompressorV0 == nil || a.decompressorV0.Type() != ct { |
| 1158 | // No configured decompressor, or it does not match the incoming |
| 1159 | // message encoding; attempt to find a registered compressor that does. |
| 1160 | a.decompressorV0 = nil |
| 1161 | a.decompressorV1 = encoding.GetCompressor(ct) |
| 1162 | } |
| 1163 | // Validate that the compression method is acceptable for this call. |
| 1164 | if !acceptedCompressorAllows(cs.callInfo.acceptedResponseCompressors, ct) { |
| 1165 | return status.Errorf(codes.Internal, "grpc: peer compressed the response with %q which is not allowed by AcceptCompressors", ct) |
| 1166 | } |
| 1167 | } else { |
| 1168 | // No compression is used; disable our decompressor. |
| 1169 | a.decompressorV0 = nil |
| 1170 | } |
| 1171 | // Only initialize this state once per stream. |
| 1172 | a.decompressorSet = true |
| 1173 | } |
| 1174 | if err := recv(&a.parser, cs.codec, a.transportStream, a.decompressorV0, m, *cs.callInfo.maxReceiveMessageSize, payInfo, a.decompressorV1, false); err != nil { |
| 1175 | if err == io.EOF { |
| 1176 | if statusErr := a.transportStream.Status().Err(); statusErr != nil { |
| 1177 | return statusErr |
| 1178 | } |
| 1179 | // Received no msg and status OK for non-server streaming rpcs. |
| 1180 | if !cs.desc.ServerStreams && !cs.receivedFirstMsg { |
| 1181 | return status.Error(codes.Internal, "cardinality violation: received no response message from non-server-streaming RPC") |
| 1182 | } |
| 1183 | return io.EOF // indicates successful end of stream. |
| 1184 | } |
| 1185 | |
| 1186 | return toRPCErr(err) |
| 1187 | } |
| 1188 | cs.receivedFirstMsg = true |
| 1189 | if a.trInfo != nil { |
| 1190 | a.mu.Lock() |
| 1191 | if a.trInfo.tr != nil { |
| 1192 | a.trInfo.tr.LazyLog(&payload{sent: false, msg: m}, true) |
| 1193 | } |
| 1194 | a.mu.Unlock() |
| 1195 | } |
| 1196 | if a.statsHandler != nil { |
| 1197 | a.statsHandler.HandleRPC(a.ctx, &stats.InPayload{ |
| 1198 | Client: true, |
| 1199 | RecvTime: time.Now(), |
| 1200 | Payload: m, |
| 1201 | WireLength: payInfo.compressedLength + headerLen, |
| 1202 | CompressedLength: payInfo.compressedLength, |
| 1203 | Length: payInfo.uncompressedBytes.Len(), |
| 1204 | }) |
no test coverage detected