| 1538 | } |
| 1539 | |
| 1540 | func (c *Client) pubSub() *PubSub { |
| 1541 | pubsub := &PubSub{ |
| 1542 | opt: c.opt, |
| 1543 | newConn: func(ctx context.Context, addr string, channels []string) (*pool.Conn, error) { |
| 1544 | cn, err := c.pubSubPool.NewConn(ctx, c.opt.Network, addr, channels) |
| 1545 | if err != nil { |
| 1546 | return nil, err |
| 1547 | } |
| 1548 | // will return nil if already initialized |
| 1549 | err = c.initConn(ctx, cn) |
| 1550 | if err != nil { |
| 1551 | _ = cn.Close() |
| 1552 | return nil, err |
| 1553 | } |
| 1554 | // Track connection in PubSubPool |
| 1555 | c.pubSubPool.TrackConn(cn) |
| 1556 | return cn, nil |
| 1557 | }, |
| 1558 | closeConn: func(cn *pool.Conn) error { |
| 1559 | // Untrack connection from PubSubPool |
| 1560 | c.pubSubPool.UntrackConn(cn) |
| 1561 | _ = cn.Close() |
| 1562 | return nil |
| 1563 | }, |
| 1564 | pushProcessor: c.pushProcessor, |
| 1565 | } |
| 1566 | pubsub.init() |
| 1567 | |
| 1568 | return pubsub |
| 1569 | } |
| 1570 | |
| 1571 | // Subscribe subscribes the client to the specified channels. |
| 1572 | // Channels can be omitted to create empty subscription. |