MCPcopy
hub / github.com/nats-io/nats.go / TestQueueSubscribeIterator

Function TestQueueSubscribeIterator

test/nats_iter_test.go:227–336  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

225}
226
227func TestQueueSubscribeIterator(t *testing.T) {
228 t.Run("basic", func(t *testing.T) {
229 s := RunServerOnPort(-1)
230 defer s.Shutdown()
231
232 nc, err := nats.Connect(s.ClientURL())
233 if err != nil {
234 t.Fatalf("Error on connect: %v", err)
235 }
236 defer nc.Close()
237
238 subs := make([]*nats.Subscription, 4)
239 for i := 0; i < 4; i++ {
240 sub, err := nc.QueueSubscribeSync("foo", "q")
241 if err != nil {
242 t.Fatal("Failed to subscribe: ", err)
243 }
244 subs[i] = sub
245 defer sub.Unsubscribe()
246 }
247
248 // Send some messages to ourselves.
249 total := 100
250 for i := 0; i < total; i++ {
251 if err := nc.Publish("foo", []byte(fmt.Sprintf("%d", i))); err != nil {
252 t.Fatalf("Error on publish: %v", err)
253 }
254 }
255
256 wg := sync.WaitGroup{}
257 wg.Add(100)
258 startWg := sync.WaitGroup{}
259 startWg.Add(4)
260
261 for i := range subs {
262 go func(i int) {
263 startWg.Done()
264 for _, err := range subs[i].MsgsTimeout(100 * time.Millisecond) {
265 if err != nil {
266 break
267 }
268 wg.Done()
269 }
270 }(i)
271 }
272
273 startWg.Wait()
274
275 wg.Wait()
276
277 for _, sub := range subs {
278 if _, err = sub.NextMsg(100 * time.Millisecond); !errors.Is(err, nats.ErrTimeout) {
279 t.Fatalf("Expected timeout waiting for next message, got %v", err)
280 }
281 }
282 })
283
284 t.Run("permissions violation", func(t *testing.T) {

Callers

nothing calls this directly

Calls 14

ConnectMethod · 0.80
FatalfMethod · 0.80
UnsubscribeMethod · 0.80
MsgsTimeoutMethod · 0.80
NextMsgMethod · 0.80
RunServerOnPortFunction · 0.70
createConfFileFunction · 0.70
RunServerWithConfigFunction · 0.70
QueueSubscribeSyncMethod · 0.65
PublishMethod · 0.65
AddMethod · 0.65
DoneMethod · 0.65

Tested by

no test coverage detected