initMsgChan must be in sync with initAllChan.
()
| 747 | |
| 748 | // initMsgChan must be in sync with initAllChan. |
| 749 | func (c *channel) initMsgChan() { |
| 750 | ctx := context.TODO() |
| 751 | c.msgCh = make(chan *Message, c.chanSize) |
| 752 | |
| 753 | go func() { |
| 754 | timer := time.NewTimer(time.Minute) |
| 755 | timer.Stop() |
| 756 | |
| 757 | var errCount int |
| 758 | for { |
| 759 | msg, err := c.pubSub.Receive(ctx) |
| 760 | if err != nil { |
| 761 | if err == pool.ErrClosed { |
| 762 | close(c.msgCh) |
| 763 | return |
| 764 | } |
| 765 | if errCount > 0 { |
| 766 | time.Sleep(100 * time.Millisecond) |
| 767 | } |
| 768 | errCount++ |
| 769 | continue |
| 770 | } |
| 771 | |
| 772 | errCount = 0 |
| 773 | |
| 774 | // Any message is as good as a ping. |
| 775 | select { |
| 776 | case c.ping <- struct{}{}: |
| 777 | default: |
| 778 | } |
| 779 | |
| 780 | switch msg := msg.(type) { |
| 781 | case *Subscription: |
| 782 | // Ignore. |
| 783 | case *Pong: |
| 784 | // Ignore. |
| 785 | case *Message: |
| 786 | timer.Reset(c.chanSendTimeout) |
| 787 | select { |
| 788 | case c.msgCh <- msg: |
| 789 | if !timer.Stop() { |
| 790 | <-timer.C |
| 791 | } |
| 792 | case <-timer.C: |
| 793 | internal.Logger.Printf( |
| 794 | ctx, "redis: %v channel is full for %s (message is dropped)", |
| 795 | c, c.chanSendTimeout) |
| 796 | } |
| 797 | default: |
| 798 | internal.Logger.Printf(ctx, "redis: unknown message type: %T", msg) |
| 799 | } |
| 800 | } |
| 801 | }() |
| 802 | } |
| 803 | |
| 804 | // initAllChan must be in sync with initMsgChan. |
| 805 | func (c *channel) initAllChan() { |