(m any)
| 937 | } |
| 938 | |
| 939 | func (cs *clientStream) SendMsg(m any) (err error) { |
| 940 | defer func() { |
| 941 | if err != nil && err != io.EOF { |
| 942 | // Call finish on the client stream for errors generated by this SendMsg |
| 943 | // call, as these indicate problems created by this client. (Transport |
| 944 | // errors are converted to an io.EOF error in csAttempt.sendMsg; the real |
| 945 | // error will be returned from RecvMsg eventually in that case, or be |
| 946 | // retried.) |
| 947 | cs.finish(err) |
| 948 | } |
| 949 | }() |
| 950 | if cs.sentLast { |
| 951 | return status.Errorf(codes.Internal, "SendMsg called after CloseSend") |
| 952 | } |
| 953 | if !cs.desc.ClientStreams { |
| 954 | cs.sentLast = true |
| 955 | } |
| 956 | |
| 957 | // load hdr, payload, data |
| 958 | hdr, data, payload, pf, err := prepareMsg(m, cs.codec, cs.compressorV0, cs.compressorV1, cs.cc.dopts.copts.BufferPool) |
| 959 | if err != nil { |
| 960 | return err |
| 961 | } |
| 962 | |
| 963 | defer func() { |
| 964 | data.Free() |
| 965 | // only free payload if compression was made, and therefore it is a different set |
| 966 | // of buffers from data. |
| 967 | if pf.isCompressed() { |
| 968 | payload.Free() |
| 969 | } |
| 970 | }() |
| 971 | |
| 972 | dataLen := data.Len() |
| 973 | payloadLen := payload.Len() |
| 974 | // TODO(dfawley): should we be checking len(data) instead? |
| 975 | if payloadLen > *cs.callInfo.maxSendMessageSize { |
| 976 | return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", payloadLen, *cs.callInfo.maxSendMessageSize) |
| 977 | } |
| 978 | |
| 979 | // always take an extra ref in case data == payload (i.e. when the data isn't |
| 980 | // compressed). The original ref will always be freed by the deferred free above. |
| 981 | payload.Ref() |
| 982 | op := func(a *csAttempt) error { |
| 983 | return a.sendMsg(m, hdr, payload, dataLen, payloadLen) |
| 984 | } |
| 985 | |
| 986 | // onSuccess is invoked when the op is captured for a subsequent retry. If the |
| 987 | // stream was established by a previous message and therefore retries are |
| 988 | // disabled, onSuccess will not be invoked, and payloadRef can be freed |
| 989 | // immediately. |
| 990 | onSuccessCalled := false |
| 991 | err = cs.withRetry(op, func() { |
| 992 | cs.bufferForRetryLocked(len(hdr)+payloadLen, op, payload.Free) |
| 993 | onSuccessCalled = true |
| 994 | }) |
| 995 | if !onSuccessCalled { |
| 996 | payload.Free() |
nothing calls this directly
no test coverage detected