(f *parsedDataFrame)
| 763 | } |
| 764 | |
| 765 | func (t *http2Server) handleData(f *parsedDataFrame) { |
| 766 | size := f.Header().Length |
| 767 | var sendBDPPing bool |
| 768 | if t.bdpEst != nil { |
| 769 | sendBDPPing = t.bdpEst.add(size) |
| 770 | } |
| 771 | // Decouple connection's flow control from application's read. |
| 772 | // An update on connection's flow control should not depend on |
| 773 | // whether user application has read the data or not. Such a |
| 774 | // restriction is already imposed on the stream's flow control, |
| 775 | // and therefore the sender will be blocked anyways. |
| 776 | // Decoupling the connection flow control will prevent other |
| 777 | // active(fast) streams from starving in presence of slow or |
| 778 | // inactive streams. |
| 779 | if w := t.fc.onData(size); w > 0 { |
| 780 | t.controlBuf.put(&outgoingWindowUpdate{ |
| 781 | streamID: 0, |
| 782 | increment: w, |
| 783 | }) |
| 784 | } |
| 785 | if sendBDPPing { |
| 786 | // Avoid excessive ping detection (e.g. in an L7 proxy) |
| 787 | // by sending a window update prior to the BDP ping. |
| 788 | if w := t.fc.reset(); w > 0 { |
| 789 | t.controlBuf.put(&outgoingWindowUpdate{ |
| 790 | streamID: 0, |
| 791 | increment: w, |
| 792 | }) |
| 793 | } |
| 794 | t.controlBuf.put(bdpPing) |
| 795 | } |
| 796 | // Select the right stream to dispatch. |
| 797 | s, ok := t.getStream(f) |
| 798 | if !ok { |
| 799 | return |
| 800 | } |
| 801 | if s.getState() == streamReadDone { |
| 802 | t.closeStream(s, true, http2.ErrCodeStreamClosed, false) |
| 803 | return |
| 804 | } |
| 805 | if size > 0 { |
| 806 | if err := s.fc.onData(size); err != nil { |
| 807 | t.closeStream(s, true, http2.ErrCodeFlowControl, false) |
| 808 | return |
| 809 | } |
| 810 | dataLen := f.data.Len() |
| 811 | if f.Header().Flags.Has(http2.FlagDataPadded) { |
| 812 | if w := s.fc.onRead(size - uint32(dataLen)); w > 0 { |
| 813 | t.controlBuf.put(&outgoingWindowUpdate{s.id, w}) |
| 814 | } |
| 815 | } |
| 816 | if dataLen > 0 { |
| 817 | f.data.Ref() |
| 818 | s.write(recvMsg{buffer: f.data}) |
| 819 | } |
| 820 | } |
| 821 | if f.StreamEnded() { |
| 822 | // Received the end of stream from the client. |
no test coverage detected