maintenance notifications won't work here for now
()
| 2183 | |
| 2184 | // maintenance notifications won't work here for now |
| 2185 | func (c *ClusterClient) pubSub() *PubSub { |
| 2186 | var node *clusterNode |
| 2187 | pubsub := &PubSub{ |
| 2188 | opt: c.opt.clientOptions(), |
| 2189 | newConn: func(ctx context.Context, addr string, channels []string) (*pool.Conn, error) { |
| 2190 | if node != nil { |
| 2191 | panic("node != nil") |
| 2192 | } |
| 2193 | |
| 2194 | var err error |
| 2195 | |
| 2196 | if len(channels) > 0 { |
| 2197 | slot := hashtag.Slot(channels[0]) |
| 2198 | |
| 2199 | // newConn in PubSub is only used for subscription connections, so it is safe to |
| 2200 | // assume that a slave node can always be used when client options specify ReadOnly. |
| 2201 | if c.opt.ReadOnly { |
| 2202 | state, err := c.state.Get(ctx) |
| 2203 | if err != nil { |
| 2204 | return nil, err |
| 2205 | } |
| 2206 | |
| 2207 | node, err = c.slotReadOnlyNode(state, slot) |
| 2208 | if err != nil { |
| 2209 | return nil, err |
| 2210 | } |
| 2211 | } else { |
| 2212 | node, err = c.slotMasterNode(ctx, slot) |
| 2213 | if err != nil { |
| 2214 | return nil, err |
| 2215 | } |
| 2216 | } |
| 2217 | } else { |
| 2218 | node, err = c.nodes.Random() |
| 2219 | if err != nil { |
| 2220 | return nil, err |
| 2221 | } |
| 2222 | } |
| 2223 | cn, err := node.Client.pubSubPool.NewConn(ctx, node.Client.opt.Network, node.Client.opt.Addr, channels) |
| 2224 | if err != nil { |
| 2225 | node = nil |
| 2226 | return nil, err |
| 2227 | } |
| 2228 | // will return nil if already initialized |
| 2229 | err = node.Client.initConn(ctx, cn) |
| 2230 | if err != nil { |
| 2231 | _ = cn.Close() |
| 2232 | node = nil |
| 2233 | return nil, err |
| 2234 | } |
| 2235 | node.Client.pubSubPool.TrackConn(cn) |
| 2236 | return cn, nil |
| 2237 | }, |
| 2238 | closeConn: func(cn *pool.Conn) error { |
| 2239 | // Untrack connection from PubSubPool |
| 2240 | node.Client.pubSubPool.UntrackConn(cn) |
| 2241 | err := cn.Close() |
| 2242 | node = nil |
no test coverage detected