MCPcopy
hub / github.com/redis/go-redis / initMsgChan

Method initMsgChan

pubsub.go:749–802  ·  view source on GitHub ↗

initMsgChan must be in sync with initAllChan.

()

Source from the content-addressed store, hash-verified

747
748// initMsgChan must be in sync with initAllChan.
749func (c *channel) initMsgChan() {
750 ctx := context.TODO()
751 c.msgCh = make(chan *Message, c.chanSize)
752
753 go func() {
754 timer := time.NewTimer(time.Minute)
755 timer.Stop()
756
757 var errCount int
758 for {
759 msg, err := c.pubSub.Receive(ctx)
760 if err != nil {
761 if err == pool.ErrClosed {
762 close(c.msgCh)
763 return
764 }
765 if errCount > 0 {
766 time.Sleep(100 * time.Millisecond)
767 }
768 errCount++
769 continue
770 }
771
772 errCount = 0
773
774 // Any message is as good as a ping.
775 select {
776 case c.ping <- struct{}{}:
777 default:
778 }
779
780 switch msg := msg.(type) {
781 case *Subscription:
782 // Ignore.
783 case *Pong:
784 // Ignore.
785 case *Message:
786 timer.Reset(c.chanSendTimeout)
787 select {
788 case c.msgCh <- msg:
789 if !timer.Stop() {
790 <-timer.C
791 }
792 case <-timer.C:
793 internal.Logger.Printf(
794 ctx, "redis: %v channel is full for %s (message is dropped)",
795 c, c.chanSendTimeout)
796 }
797 default:
798 internal.Logger.Printf(ctx, "redis: unknown message type: %T", msg)
799 }
800 }
801 }()
802}
803
804// initAllChan must be in sync with initMsgChan.
805func (c *channel) initAllChan() {

Callers 1

ChannelMethod · 0.80

Calls 4

ReceiveMethod · 0.80
StopMethod · 0.65
PrintfMethod · 0.65
ResetMethod · 0.45

Tested by

no test coverage detected