MCPcopy
hub / github.com/redis/go-redis / initAllChan

Method initAllChan

pubsub.go:805–856  ·  view source on GitHub ↗

initAllChan must be in sync with initMsgChan.

()

Source from the content-addressed store, hash-verified

803
804// initAllChan must be in sync with initMsgChan.
805func (c *channel) initAllChan() {
806 ctx := context.TODO()
807 c.allCh = make(chan interface{}, c.chanSize)
808
809 go func() {
810 timer := time.NewTimer(time.Minute)
811 timer.Stop()
812
813 var errCount int
814 for {
815 msg, err := c.pubSub.Receive(ctx)
816 if err != nil {
817 if err == pool.ErrClosed {
818 close(c.allCh)
819 return
820 }
821 if errCount > 0 {
822 time.Sleep(100 * time.Millisecond)
823 }
824 errCount++
825 continue
826 }
827
828 errCount = 0
829
830 // Any message is as good as a ping.
831 select {
832 case c.ping <- struct{}{}:
833 default:
834 }
835
836 switch msg := msg.(type) {
837 case *Pong:
838 // Ignore.
839 case *Subscription, *Message:
840 timer.Reset(c.chanSendTimeout)
841 select {
842 case c.allCh <- msg:
843 if !timer.Stop() {
844 <-timer.C
845 }
846 case <-timer.C:
847 internal.Logger.Printf(
848 ctx, "redis: %v channel is full for %s (message is dropped)",
849 c, c.chanSendTimeout)
850 }
851 default:
852 internal.Logger.Printf(ctx, "redis: unknown message type: %T", msg)
853 }
854 }
855 }()
856}

Callers 1

Calls 4

ReceiveMethod · 0.80
StopMethod · 0.65
PrintfMethod · 0.65
ResetMethod · 0.45

Tested by

no test coverage detected