MCPcopy
hub / github.com/nats-io/nats.go / TestSubscribeIterator

Function TestSubscribeIterator

test/nats_iter_test.go:29–225  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

27)
28
29func TestSubscribeIterator(t *testing.T) {
30 t.Run("with timeout", func(t *testing.T) {
31 s := RunServerOnPort(-1)
32 defer s.Shutdown()
33
34 nc, err := nats.Connect(s.ClientURL(), nats.PermissionErrOnSubscribe(true))
35 if err != nil {
36 t.Fatalf("Error on connect: %v", err)
37 }
38 defer nc.Close()
39
40 sub, err := nc.SubscribeSync("foo")
41 if err != nil {
42 t.Fatal("Failed to subscribe: ", err)
43 }
44 defer sub.Unsubscribe()
45
46 total := 100
47 for i := 0; i < total/2; i++ {
48 if err := nc.Publish("foo", []byte("Hello")); err != nil {
49 t.Fatalf("Error on publish: %v", err)
50 }
51 }
52
53 // publish some more messages asynchronously
54 errCh := make(chan error, 1)
55 go func() {
56 for i := 0; i < total/2; i++ {
57 if err := nc.Publish("foo", []byte("Hello")); err != nil {
58 errCh <- err
59 return
60 }
61 time.Sleep(10 * time.Millisecond)
62 }
63 close(errCh)
64 }()
65
66 received := 0
67 for _, err := range sub.MsgsTimeout(100 * time.Millisecond) {
68 if err != nil {
69 if !errors.Is(err, nats.ErrTimeout) {
70 t.Fatalf("Error on subscribe: %v", err)
71 }
72 break
73 } else {
74 received++
75 }
76 }
77 if received != total {
78 t.Fatalf("Expected %d messages, got %d", total, received)
79 }
80 })
81
82 t.Run("no timeout", func(t *testing.T) {
83 s := RunServerOnPort(-1)
84 defer s.Shutdown()
85
86 nc, err := nats.Connect(s.ClientURL(), nats.PermissionErrOnSubscribe(true))

Callers

nothing calls this directly

Calls 14

ConnectMethod · 0.80
FatalfMethod · 0.80
UnsubscribeMethod · 0.80
MsgsTimeoutMethod · 0.80
MsgsMethod · 0.80
NextMsgMethod · 0.80
RunServerOnPortFunction · 0.70
createConfFileFunction · 0.70
RunServerWithConfigFunction · 0.70
SubscribeSyncMethod · 0.65
PublishMethod · 0.65
SubscribeMethod · 0.65

Tested by

no test coverage detected