MCPcopy
hub / github.com/nats-io/nats.go / Barrier

Method Barrier

nats.go:6334–6373  ·  view source on GitHub ↗

Barrier schedules the given function `f` to all registered asynchronous subscriptions. Only the last subscription to see this barrier will invoke the function. If no subscription is registered at the time of this call, `f()` is invoked right away. ErrConnectionClosed is returned if the connection is

(f func())

Source from the content-addressed store, hash-verified

6332// ErrConnectionClosed is returned if the connection is closed prior to
6333// the call.
6334func (nc *Conn) Barrier(f func()) error {
6335 nc.mu.Lock()
6336 if nc.isClosed() {
6337 nc.mu.Unlock()
6338 return ErrConnectionClosed
6339 }
6340 nc.subsMu.Lock()
6341 // Need to figure out how many non chan subscriptions there are
6342 numSubs := 0
6343 for _, sub := range nc.subs {
6344 if sub.typ == AsyncSubscription {
6345 numSubs++
6346 }
6347 }
6348 if numSubs == 0 {
6349 nc.subsMu.Unlock()
6350 nc.mu.Unlock()
6351 f()
6352 return nil
6353 }
6354 barrier := &barrierInfo{refs: int64(numSubs), f: f}
6355 for _, sub := range nc.subs {
6356 sub.mu.Lock()
6357 if sub.mch == nil {
6358 msg := &Msg{barrier: barrier}
6359 // Push onto the async pList
6360 if sub.pTail != nil {
6361 sub.pTail.next = msg
6362 } else {
6363 sub.pHead = msg
6364 sub.pCond.Signal()
6365 }
6366 sub.pTail = msg
6367 }
6368 sub.mu.Unlock()
6369 }
6370 nc.subsMu.Unlock()
6371 nc.mu.Unlock()
6372 return nil
6373}
6374
6375// ServerPool returns a copy of the current server pool for the connection.
6376//

Callers 1

TestBarrierFunction · 0.80

Calls 1

isClosedMethod · 0.95

Tested by 1

TestBarrierFunction · 0.64