MCPcopy
hub / github.com/redis/go-redis / startPubSubConnections

Function startPubSubConnections

maintnotifications/e2e/cluster_maintnotif_test.go:51–125  ·  view source on GitHub ↗

startPubSubConnections creates pubsub subscriptions on channels spread across different slots to ensure pubsub connections are active during slot migrations. Returns a stop function. The pubsub connections use sharded pubsub (SSUBSCRIBE) to ensure they're on specific shards. Returns nil stop functio

(ctx context.Context, t *testing.T, client redis.UniversalClient)

Source from the content-addressed store, hash-verified

49// Returns nil stop function if client is not a ClusterClient.
50// The stop function waits for all goroutines to finish before returning.
51func startPubSubConnections(ctx context.Context, t *testing.T, client redis.UniversalClient) (stop func()) {
52 clusterClient, ok := client.(*redis.ClusterClient)
53 if !ok {
54 t.Log("Skipping pubsub connections - client is not a ClusterClient")
55 return func() {}
56 }
57 stopChan := make(chan struct{})
58 var pubsubs []*redis.PubSub
59 var wg sync.WaitGroup
60
61 // Create pubsub subscriptions on channels in different slots
62 // Use channels that hash to slots spread across shards: 0, 1001, 2002, ...
63 for slot := 0; slot < 16384; slot += 1001 {
64 channelName := fmt.Sprintf("test-channel-%s", slotKeys[slot])
65
66 // Use sharded pubsub (SSUBSCRIBE) for cluster-aware subscriptions
67 pubsub := clusterClient.SSubscribe(ctx, channelName)
68 pubsubs = append(pubsubs, pubsub)
69
70 // Start a goroutine to receive messages (keeps connection active)
71 wg.Add(1)
72 go func(ps *redis.PubSub, ch string) {
73 defer wg.Done()
74 msgCh := ps.Channel()
75 for {
76 select {
77 case <-stopChan:
78 return
79 case msg, ok := <-msgCh:
80 if !ok {
81 return
82 }
83 if debugE2E() {
84 t.Logf("PubSub received message on %s: %v", ch, msg)
85 }
86 }
87 }
88 }(pubsub, channelName)
89 }
90
91 // Start a goroutine to publish messages periodically to keep connections active
92 wg.Add(1)
93 go func() {
94 defer wg.Done()
95 ticker := time.NewTicker(100 * time.Millisecond)
96 defer ticker.Stop()
97 msgCount := 0
98 for {
99 select {
100 case <-stopChan:
101 return
102 case <-ticker.C:
103 // Publish to channels in different slots
104 for slot := 0; slot < 16384; slot += 1001 {
105 channelName := fmt.Sprintf("test-channel-%s", slotKeys[slot])
106 // Use SPublish for sharded pubsub
107 _ = clusterClient.SPublish(ctx, channelName, fmt.Sprintf("msg-%d", msgCount)).Err()
108 }

Calls 9

debugE2EFunction · 0.85
ChannelMethod · 0.80
WaitMethod · 0.80
SSubscribeMethod · 0.65
AddMethod · 0.65
StopMethod · 0.65
ErrMethod · 0.65
SPublishMethod · 0.65
CloseMethod · 0.65

Tested by

no test coverage detected