| 1008 | } |
| 1009 | |
| 1010 | func (cs *clientStream) RecvMsg(m any) error { |
| 1011 | if len(cs.binlogs) != 0 && !cs.serverHeaderBinlogged { |
| 1012 | // Call Header() to binary log header if it's not already logged. |
| 1013 | cs.Header() |
| 1014 | } |
| 1015 | var recvInfo *payloadInfo |
| 1016 | if len(cs.binlogs) != 0 { |
| 1017 | recvInfo = &payloadInfo{} |
| 1018 | defer recvInfo.free() |
| 1019 | } |
| 1020 | err := cs.withRetry(func(a *csAttempt) error { |
| 1021 | return a.recvMsg(m, recvInfo) |
| 1022 | }, cs.commitAttemptLocked) |
| 1023 | if len(cs.binlogs) != 0 && err == nil { |
| 1024 | sm := &binarylog.ServerMessage{ |
| 1025 | OnClientSide: true, |
| 1026 | Message: recvInfo.uncompressedBytes.Materialize(), |
| 1027 | } |
| 1028 | for _, binlog := range cs.binlogs { |
| 1029 | binlog.Log(cs.ctx, sm) |
| 1030 | } |
| 1031 | } |
| 1032 | if err != nil || !cs.desc.ServerStreams { |
| 1033 | // err != nil or non-server-streaming indicates end of stream. |
| 1034 | cs.finish(err) |
| 1035 | } |
| 1036 | return err |
| 1037 | } |
| 1038 | |
| 1039 | func (cs *clientStream) CloseSend() error { |
| 1040 | if cs.sentLast { |