initAllChan must be in sync with initMsgChan.
()
| 803 | |
| 804 | // initAllChan must be in sync with initMsgChan. |
| 805 | func (c *channel) initAllChan() { |
| 806 | ctx := context.TODO() |
| 807 | c.allCh = make(chan interface{}, c.chanSize) |
| 808 | |
| 809 | go func() { |
| 810 | timer := time.NewTimer(time.Minute) |
| 811 | timer.Stop() |
| 812 | |
| 813 | var errCount int |
| 814 | for { |
| 815 | msg, err := c.pubSub.Receive(ctx) |
| 816 | if err != nil { |
| 817 | if err == pool.ErrClosed { |
| 818 | close(c.allCh) |
| 819 | return |
| 820 | } |
| 821 | if errCount > 0 { |
| 822 | time.Sleep(100 * time.Millisecond) |
| 823 | } |
| 824 | errCount++ |
| 825 | continue |
| 826 | } |
| 827 | |
| 828 | errCount = 0 |
| 829 | |
| 830 | // Any message is as good as a ping. |
| 831 | select { |
| 832 | case c.ping <- struct{}{}: |
| 833 | default: |
| 834 | } |
| 835 | |
| 836 | switch msg := msg.(type) { |
| 837 | case *Pong: |
| 838 | // Ignore. |
| 839 | case *Subscription, *Message: |
| 840 | timer.Reset(c.chanSendTimeout) |
| 841 | select { |
| 842 | case c.allCh <- msg: |
| 843 | if !timer.Stop() { |
| 844 | <-timer.C |
| 845 | } |
| 846 | case <-timer.C: |
| 847 | internal.Logger.Printf( |
| 848 | ctx, "redis: %v channel is full for %s (message is dropped)", |
| 849 | c, c.chanSendTimeout) |
| 850 | } |
| 851 | default: |
| 852 | internal.Logger.Printf(ctx, "redis: unknown message type: %T", msg) |
| 853 | } |
| 854 | } |
| 855 | }() |
| 856 | } |
no test coverage detected