Subscribe registers the provided Subscriber to the PubSub. If the PubSub contains a previously published message, the Subscriber's OnMessage() callback will be invoked asynchronously with the existing message to begin with, and subsequently for every newly published message. The caller is responsi
(sub Subscriber)
| 70 | // The caller is responsible for invoking the returned cancel function to |
| 71 | // unsubscribe itself from the PubSub. |
| 72 | func (ps *PubSub) Subscribe(sub Subscriber) (cancel func()) { |
| 73 | ps.mu.Lock() |
| 74 | defer ps.mu.Unlock() |
| 75 | |
| 76 | ps.subscribers[sub] = true |
| 77 | |
| 78 | if ps.msg != nil { |
| 79 | msg := ps.msg |
| 80 | ps.cs.TrySchedule(func(context.Context) { |
| 81 | ps.mu.Lock() |
| 82 | defer ps.mu.Unlock() |
| 83 | if !ps.subscribers[sub] { |
| 84 | return |
| 85 | } |
| 86 | sub.OnMessage(msg) |
| 87 | }) |
| 88 | } |
| 89 | |
| 90 | return func() { |
| 91 | ps.mu.Lock() |
| 92 | defer ps.mu.Unlock() |
| 93 | delete(ps.subscribers, sub) |
| 94 | } |
| 95 | } |
| 96 | |
| 97 | // Publish publishes the provided message to the PubSub, and invokes |
| 98 | // callbacks registered by subscribers asynchronously. |