MCPcopy
hub / github.com/redis/go-redis / conn

Method conn

pubsub.go:74–114  ·  view source on GitHub ↗
(ctx context.Context, newChannels []string)

Source from the content-addressed store, hash-verified

72}
73
74func (c *PubSub) conn(ctx context.Context, newChannels []string) (*pool.Conn, error) {
75 if c.closed {
76 return nil, pool.ErrClosed
77 }
78 if c.cn != nil {
79 return c.cn, nil
80 }
81
82 if c.opt.Addr == "" {
83 // TODO(maintenanceNotifications):
84 // this is probably cluster client
85 // c.newConn will ignore the addr argument
86 // will be changed when we have maintenanceNotifications upgrades for cluster clients
87 c.opt.Addr = internal.RedisNull
88 }
89
90 // Include c.schannels so reconnect-time routing of an SSubscribe-only
91 // PubSub picks the slot owner (channels[0] in ClusterClient.pubSub()'s
92 // newConn closure) instead of a random node.
93 // See https://github.com/redis/go-redis/issues/3806.
94 // c.patterns is intentionally NOT included: patterns are not slot-
95 // addressable, and adding them would force PSubscribe-only PubSubs to
96 // pin to a single node based on pattern-string hash, regressing the
97 // existing random-node behaviour.
98 channels := slices.Collect(maps.Keys(c.channels))
99 channels = append(channels, slices.Collect(maps.Keys(c.schannels))...)
100 channels = append(channels, newChannels...)
101
102 cn, err := c.newConn(ctx, c.opt.Addr, channels)
103 if err != nil {
104 return nil, err
105 }
106
107 if err := c.resubscribe(ctx, cn); err != nil {
108 _ = c.closeConn(cn)
109 return nil, err
110 }
111
112 c.cn = cn
113 return cn, nil
114}
115
116func (c *PubSub) writeCmd(ctx context.Context, cn *pool.Conn, cmd Cmder) error {
117 return cn.WithWriter(ctx, c.opt.WriteTimeout, func(wr *proto.Writer) error {

Callers 6

connWithLockMethod · 0.95
reconnectMethod · 0.95
subscribeMethod · 0.95
PingMethod · 0.95
ClientSetNameMethod · 0.95

Calls 5

resubscribeMethod · 0.95
CollectMethod · 0.80
newConnMethod · 0.80
closeConnMethod · 0.80
KeysMethod · 0.65