processData removes the first stream from active streams, writes out at most 16KB of its data and then puts it at the end of activeStreams if there's still more data to be sent and stream has some stream-level flow control.
()
| 935 | // of its data and then puts it at the end of activeStreams if there's still more data |
| 936 | // to be sent and stream has some stream-level flow control. |
| 937 | func (l *loopyWriter) processData() (bool, error) { |
| 938 | if l.sendQuota == 0 { |
| 939 | return true, nil |
| 940 | } |
| 941 | str := l.activeStreams.dequeue() // Remove the first stream. |
| 942 | if str == nil { |
| 943 | return true, nil |
| 944 | } |
| 945 | reader := &str.reader |
| 946 | dataItem := str.itl.peek().(*dataFrame) // Peek at the first data item this stream. |
| 947 | if !dataItem.processing { |
| 948 | dataItem.processing = true |
| 949 | reader.Reset(dataItem.data) |
| 950 | dataItem.data.Free() |
| 951 | } |
| 952 | // A data item is represented by a dataFrame, since it later translates into |
| 953 | // multiple HTTP2 data frames. |
| 954 | // Every dataFrame has two buffers; h that keeps grpc-message header and data |
| 955 | // that is the actual message. As an optimization to keep wire traffic low, data |
| 956 | // from data is copied to h to make as big as the maximum possible HTTP2 frame |
| 957 | // size. |
| 958 | |
| 959 | isEmpty := len(dataItem.h) == 0 && reader.Remaining() == 0 |
| 960 | // Figure out the maximum size we can send |
| 961 | maxSize := http2MaxFrameLen |
| 962 | strQuota := int(l.oiws) - str.bytesOutStanding |
| 963 | if strQuota <= 0 && !isEmpty { // stream-level flow control. |
| 964 | str.state = waitingOnStreamQuota |
| 965 | return false, nil |
| 966 | } |
| 967 | maxSize = min(maxSize, max(strQuota, 0)) |
| 968 | maxSize = min(maxSize, int(l.sendQuota)) // connection-level flow control. |
| 969 | // Compute how much of the header and data we can send within quota and max frame length |
| 970 | hSize := min(maxSize, len(dataItem.h)) |
| 971 | dSize := min(maxSize-hSize, reader.Remaining()) |
| 972 | remainingBytes := len(dataItem.h) + reader.Remaining() - hSize - dSize |
| 973 | size := hSize + dSize |
| 974 | |
| 975 | l.writeBuf = l.writeBuf[:0] |
| 976 | if hSize > 0 { |
| 977 | l.writeBuf = append(l.writeBuf, dataItem.h[:hSize]) |
| 978 | } |
| 979 | if dSize > 0 { |
| 980 | var err error |
| 981 | l.writeBuf, err = reader.Peek(dSize, l.writeBuf) |
| 982 | if err != nil { |
| 983 | // This must never happen since the reader must have at least dSize |
| 984 | // bytes. |
| 985 | // Log an error to fail tests. |
| 986 | l.logger.Errorf("unexpected error while reading Data frame payload: %v", err) |
| 987 | return false, err |
| 988 | } |
| 989 | } |
| 990 | |
| 991 | // Now that outgoing flow controls are checked we can replenish str's write quota |
| 992 | str.wq.replenish(size) |
| 993 | var endStream bool |
| 994 | // If this is the last data message on this stream and all of it can be written in this iteration. |