(m any)
| 1792 | } |
| 1793 | |
| 1794 | func (ss *serverStream) RecvMsg(m any) (err error) { |
| 1795 | defer func() { |
| 1796 | if ss.trInfo != nil { |
| 1797 | ss.mu.Lock() |
| 1798 | if ss.trInfo.tr != nil { |
| 1799 | if err == nil { |
| 1800 | ss.trInfo.tr.LazyLog(&payload{sent: false, msg: m}, true) |
| 1801 | } else if err != io.EOF { |
| 1802 | ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []any{err}}, true) |
| 1803 | ss.trInfo.tr.SetError() |
| 1804 | } |
| 1805 | } |
| 1806 | ss.mu.Unlock() |
| 1807 | } |
| 1808 | if err != nil && err != io.EOF { |
| 1809 | st, _ := status.FromError(toRPCErr(err)) |
| 1810 | ss.s.WriteStatus(st) |
| 1811 | // Non-user specified status was sent out. This should be an error |
| 1812 | // case (as a server side Cancel maybe). |
| 1813 | // |
| 1814 | // This is not handled specifically now. User will return a final |
| 1815 | // status from the service handler, we will log that error instead. |
| 1816 | // This behavior is similar to an interceptor. |
| 1817 | } |
| 1818 | }() |
| 1819 | var payInfo *payloadInfo |
| 1820 | if ss.statsHandler != nil || len(ss.binlogs) != 0 { |
| 1821 | payInfo = &payloadInfo{} |
| 1822 | defer payInfo.free() |
| 1823 | } |
| 1824 | if err := recv(&ss.p, ss.codec, ss.s, ss.decompressorV0, m, ss.maxReceiveMessageSize, payInfo, ss.decompressorV1, true); err != nil { |
| 1825 | if err == io.EOF { |
| 1826 | if len(ss.binlogs) != 0 { |
| 1827 | chc := &binarylog.ClientHalfClose{} |
| 1828 | for _, binlog := range ss.binlogs { |
| 1829 | binlog.Log(ss.ctx, chc) |
| 1830 | } |
| 1831 | } |
| 1832 | // Received no request msg for non-client streaming rpcs. |
| 1833 | if !ss.desc.ClientStreams && !ss.recvFirstMsg { |
| 1834 | return status.Error(codes.Internal, "cardinality violation: received no request message from non-client-streaming RPC") |
| 1835 | } |
| 1836 | return err |
| 1837 | } |
| 1838 | if err == io.ErrUnexpectedEOF { |
| 1839 | err = status.Error(codes.Internal, io.ErrUnexpectedEOF.Error()) |
| 1840 | } |
| 1841 | return toRPCErr(err) |
| 1842 | } |
| 1843 | ss.recvFirstMsg = true |
| 1844 | if ss.statsHandler != nil { |
| 1845 | ss.statsHandler.HandleRPC(ss.s.Context(), &stats.InPayload{ |
| 1846 | RecvTime: time.Now(), |
| 1847 | Payload: m, |
| 1848 | Length: payInfo.uncompressedBytes.Len(), |
| 1849 | WireLength: payInfo.compressedLength + headerLen, |
| 1850 | CompressedLength: payInfo.compressedLength, |
| 1851 | }) |
nothing calls this directly
no test coverage detected