MCPcopy
hub / github.com/nats-io/nats.go / TestBarrier

Function TestBarrier

test/conn_test.go:2285–2496  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

2283}
2284
2285func TestBarrier(t *testing.T) {
2286 s := RunDefaultServer()
2287 defer s.Shutdown()
2288
2289 nc := NewDefaultConnection(t)
2290 defer nc.Close()
2291
2292 pubMsgs := int32(0)
2293 ch := make(chan bool, 1)
2294
2295 sub1, err := nc.Subscribe("pub", func(_ *nats.Msg) {
2296 atomic.AddInt32(&pubMsgs, 1)
2297 time.Sleep(250 * time.Millisecond)
2298 })
2299 if err != nil {
2300 t.Fatalf("Error on subscribe: %v", err)
2301 }
2302
2303 sub2, err := nc.Subscribe("close", func(_ *nats.Msg) {
2304 // The "close" message was sent/received lat, but
2305 // because we are dealing with different subscriptions,
2306 // which are dispatched by different dispatchers, and
2307 // because the "pub" subscription is delayed, this
2308 // callback is likely to be invoked before the sub1's
2309 // second callback is invoked. Using the Barrier call
2310 // here will ensure that the given function will be invoked
2311 // after the preceding messages have been dispatched.
2312 nc.Barrier(func() {
2313 res := atomic.LoadInt32(&pubMsgs) == 2
2314 ch <- res
2315 })
2316 })
2317 if err != nil {
2318 t.Fatalf("Error on subscribe: %v", err)
2319 }
2320
2321 // Send 2 "pub" messages followed by a "close" message
2322 for i := 0; i < 2; i++ {
2323 if err := nc.Publish("pub", []byte("pub msg")); err != nil {
2324 t.Fatalf("Error on publish: %v", err)
2325 }
2326 }
2327 if err := nc.Publish("close", []byte("closing")); err != nil {
2328 t.Fatalf("Error on publish: %v", err)
2329 }
2330
2331 select {
2332 case ok := <-ch:
2333 if !ok {
2334 t.Fatal("The barrier function was invoked before the second message")
2335 }
2336 case <-time.After(2 * time.Second):
2337 t.Fatal("Waited for too long...")
2338 }
2339
2340 // Remove all subs
2341 sub1.Unsubscribe()
2342 sub2.Unsubscribe()

Callers

nothing calls this directly

Calls 15

FatalfMethod · 0.80
BarrierMethod · 0.80
UnsubscribeMethod · 0.80
AutoUnsubscribeMethod · 0.80
NextMsgMethod · 0.80
TLSRequiredMethod · 0.80
RunDefaultServerFunction · 0.70
NewDefaultConnectionFunction · 0.70
WaitFunction · 0.70
SubscribeMethod · 0.65
PublishMethod · 0.65
SubscribeSyncMethod · 0.65

Tested by

no test coverage detected