(t *testing.T)
| 2283 | } |
| 2284 | |
| 2285 | func TestBarrier(t *testing.T) { |
| 2286 | s := RunDefaultServer() |
| 2287 | defer s.Shutdown() |
| 2288 | |
| 2289 | nc := NewDefaultConnection(t) |
| 2290 | defer nc.Close() |
| 2291 | |
| 2292 | pubMsgs := int32(0) |
| 2293 | ch := make(chan bool, 1) |
| 2294 | |
| 2295 | sub1, err := nc.Subscribe("pub", func(_ *nats.Msg) { |
| 2296 | atomic.AddInt32(&pubMsgs, 1) |
| 2297 | time.Sleep(250 * time.Millisecond) |
| 2298 | }) |
| 2299 | if err != nil { |
| 2300 | t.Fatalf("Error on subscribe: %v", err) |
| 2301 | } |
| 2302 | |
| 2303 | sub2, err := nc.Subscribe("close", func(_ *nats.Msg) { |
| 2304 | // The "close" message was sent/received lat, but |
| 2305 | // because we are dealing with different subscriptions, |
| 2306 | // which are dispatched by different dispatchers, and |
| 2307 | // because the "pub" subscription is delayed, this |
| 2308 | // callback is likely to be invoked before the sub1's |
| 2309 | // second callback is invoked. Using the Barrier call |
| 2310 | // here will ensure that the given function will be invoked |
| 2311 | // after the preceding messages have been dispatched. |
| 2312 | nc.Barrier(func() { |
| 2313 | res := atomic.LoadInt32(&pubMsgs) == 2 |
| 2314 | ch <- res |
| 2315 | }) |
| 2316 | }) |
| 2317 | if err != nil { |
| 2318 | t.Fatalf("Error on subscribe: %v", err) |
| 2319 | } |
| 2320 | |
| 2321 | // Send 2 "pub" messages followed by a "close" message |
| 2322 | for i := 0; i < 2; i++ { |
| 2323 | if err := nc.Publish("pub", []byte("pub msg")); err != nil { |
| 2324 | t.Fatalf("Error on publish: %v", err) |
| 2325 | } |
| 2326 | } |
| 2327 | if err := nc.Publish("close", []byte("closing")); err != nil { |
| 2328 | t.Fatalf("Error on publish: %v", err) |
| 2329 | } |
| 2330 | |
| 2331 | select { |
| 2332 | case ok := <-ch: |
| 2333 | if !ok { |
| 2334 | t.Fatal("The barrier function was invoked before the second message") |
| 2335 | } |
| 2336 | case <-time.After(2 * time.Second): |
| 2337 | t.Fatal("Waited for too long...") |
| 2338 | } |
| 2339 | |
| 2340 | // Remove all subs |
| 2341 | sub1.Unsubscribe() |
| 2342 | sub2.Unsubscribe() |
nothing calls this directly
no test coverage detected