MCPcopy
hub / github.com/grpc/grpc-go / processData

Method processData

internal/transport/controlbuf.go:937–1020  ·  view source on GitHub ↗

processData removes the first stream from active streams, writes out at most 16KB of its data and then puts it at the end of activeStreams if there's still more data to be sent and stream has some stream-level flow control.

()

Source from the content-addressed store, hash-verified

935// of its data and then puts it at the end of activeStreams if there's still more data
936// to be sent and stream has some stream-level flow control.
937func (l *loopyWriter) processData() (bool, error) {
938 if l.sendQuota == 0 {
939 return true, nil
940 }
941 str := l.activeStreams.dequeue() // Remove the first stream.
942 if str == nil {
943 return true, nil
944 }
945 reader := &str.reader
946 dataItem := str.itl.peek().(*dataFrame) // Peek at the first data item this stream.
947 if !dataItem.processing {
948 dataItem.processing = true
949 reader.Reset(dataItem.data)
950 dataItem.data.Free()
951 }
952 // A data item is represented by a dataFrame, since it later translates into
953 // multiple HTTP2 data frames.
954 // Every dataFrame has two buffers; h that keeps grpc-message header and data
955 // that is the actual message. As an optimization to keep wire traffic low, data
956 // from data is copied to h to make as big as the maximum possible HTTP2 frame
957 // size.
958
959 isEmpty := len(dataItem.h) == 0 && reader.Remaining() == 0
960 // Figure out the maximum size we can send
961 maxSize := http2MaxFrameLen
962 strQuota := int(l.oiws) - str.bytesOutStanding
963 if strQuota <= 0 && !isEmpty { // stream-level flow control.
964 str.state = waitingOnStreamQuota
965 return false, nil
966 }
967 maxSize = min(maxSize, max(strQuota, 0))
968 maxSize = min(maxSize, int(l.sendQuota)) // connection-level flow control.
969 // Compute how much of the header and data we can send within quota and max frame length
970 hSize := min(maxSize, len(dataItem.h))
971 dSize := min(maxSize-hSize, reader.Remaining())
972 remainingBytes := len(dataItem.h) + reader.Remaining() - hSize - dSize
973 size := hSize + dSize
974
975 l.writeBuf = l.writeBuf[:0]
976 if hSize > 0 {
977 l.writeBuf = append(l.writeBuf, dataItem.h[:hSize])
978 }
979 if dSize > 0 {
980 var err error
981 l.writeBuf, err = reader.Peek(dSize, l.writeBuf)
982 if err != nil {
983 // This must never happen since the reader must have at least dSize
984 // bytes.
985 // Log an error to fail tests.
986 l.logger.Errorf("unexpected error while reading Data frame payload: %v", err)
987 return false, err
988 }
989 }
990
991 // Now that outgoing flow controls are checked we can replenish str's write quota
992 str.wq.replenish(size)
993 var endStream bool
994 // If this is the last data message on this stream and all of it can be written in this iteration.

Callers 1

runMethod · 0.95

Calls 11

peekMethod · 0.80
RemainingMethod · 0.80
PeekMethod · 0.80
DiscardMethod · 0.80
FreeMethod · 0.65
ErrorfMethod · 0.65
CloseMethod · 0.65
dequeueMethod · 0.45
ResetMethod · 0.45
writeDataMethod · 0.45

Tested by

no test coverage detected