Barrier schedules the given function `f` to all registered asynchronous subscriptions. Only the last subscription to see this barrier will invoke the function. If no subscription is registered at the time of this call, `f()` is invoked right away. ErrConnectionClosed is returned if the connection is
(f func())
| 6332 | // ErrConnectionClosed is returned if the connection is closed prior to |
| 6333 | // the call. |
| 6334 | func (nc *Conn) Barrier(f func()) error { |
| 6335 | nc.mu.Lock() |
| 6336 | if nc.isClosed() { |
| 6337 | nc.mu.Unlock() |
| 6338 | return ErrConnectionClosed |
| 6339 | } |
| 6340 | nc.subsMu.Lock() |
| 6341 | // Need to figure out how many non chan subscriptions there are |
| 6342 | numSubs := 0 |
| 6343 | for _, sub := range nc.subs { |
| 6344 | if sub.typ == AsyncSubscription { |
| 6345 | numSubs++ |
| 6346 | } |
| 6347 | } |
| 6348 | if numSubs == 0 { |
| 6349 | nc.subsMu.Unlock() |
| 6350 | nc.mu.Unlock() |
| 6351 | f() |
| 6352 | return nil |
| 6353 | } |
| 6354 | barrier := &barrierInfo{refs: int64(numSubs), f: f} |
| 6355 | for _, sub := range nc.subs { |
| 6356 | sub.mu.Lock() |
| 6357 | if sub.mch == nil { |
| 6358 | msg := &Msg{barrier: barrier} |
| 6359 | // Push onto the async pList |
| 6360 | if sub.pTail != nil { |
| 6361 | sub.pTail.next = msg |
| 6362 | } else { |
| 6363 | sub.pHead = msg |
| 6364 | sub.pCond.Signal() |
| 6365 | } |
| 6366 | sub.pTail = msg |
| 6367 | } |
| 6368 | sub.mu.Unlock() |
| 6369 | } |
| 6370 | nc.subsMu.Unlock() |
| 6371 | nc.mu.Unlock() |
| 6372 | return nil |
| 6373 | } |
| 6374 | |
| 6375 | // ServerPool returns a copy of the current server pool for the connection. |
| 6376 | // |