HandleStreams receives incoming streams using the given handler. This is typically run in a separate goroutine. traceCtx attaches trace to ctx and returns the new context.
(ctx context.Context, handle func(*ServerStream))
| 637 | // typically run in a separate goroutine. |
| 638 | // traceCtx attaches trace to ctx and returns the new context. |
| 639 | func (t *http2Server) HandleStreams(ctx context.Context, handle func(*ServerStream)) { |
| 640 | defer func() { |
| 641 | close(t.readerDone) |
| 642 | <-t.loopyWriterDone |
| 643 | }() |
| 644 | for { |
| 645 | t.controlBuf.throttle() |
| 646 | frame, err := t.framer.readFrame() |
| 647 | atomic.StoreInt64(&t.lastRead, time.Now().UnixNano()) |
| 648 | if err != nil { |
| 649 | if se, ok := err.(http2.StreamError); ok { |
| 650 | if t.logger.V(logLevel) { |
| 651 | t.logger.Warningf("Encountered http2.StreamError: %v", se) |
| 652 | } |
| 653 | t.mu.Lock() |
| 654 | s := t.activeStreams[se.StreamID] |
| 655 | t.mu.Unlock() |
| 656 | if s != nil { |
| 657 | t.closeStream(s, true, se.Code, false) |
| 658 | } else { |
| 659 | t.controlBuf.put(&cleanupStream{ |
| 660 | streamID: se.StreamID, |
| 661 | rst: true, |
| 662 | rstCode: se.Code, |
| 663 | onWrite: func() {}, |
| 664 | }) |
| 665 | } |
| 666 | continue |
| 667 | } |
| 668 | t.Close(err) |
| 669 | return |
| 670 | } |
| 671 | switch frame := frame.(type) { |
| 672 | case *http2.MetaHeadersFrame: |
| 673 | if err := t.operateHeaders(ctx, frame, handle); err != nil { |
| 674 | // Any error processing client headers, e.g. invalid stream ID, |
| 675 | // is considered a protocol violation. |
| 676 | t.controlBuf.put(&goAway{ |
| 677 | code: http2.ErrCodeProtocol, |
| 678 | debugData: []byte(err.Error()), |
| 679 | closeConn: err, |
| 680 | }) |
| 681 | continue |
| 682 | } |
| 683 | case *parsedDataFrame: |
| 684 | t.handleData(frame) |
| 685 | frame.data.Free() |
| 686 | case *http2.RSTStreamFrame: |
| 687 | t.handleRSTStream(frame) |
| 688 | case *http2.SettingsFrame: |
| 689 | t.handleSettings(frame) |
| 690 | case *http2.PingFrame: |
| 691 | t.handlePing(frame) |
| 692 | case *http2.WindowUpdateFrame: |
| 693 | t.handleWindowUpdate(frame) |
| 694 | case *http2.GoAwayFrame: |
| 695 | // TODO: Handle GoAway from the client appropriately. |
| 696 | default: |
nothing calls this directly
no test coverage detected