(t *testing.T)
| 225 | } |
| 226 | |
| 227 | func TestQueueSubscribeIterator(t *testing.T) { |
| 228 | t.Run("basic", func(t *testing.T) { |
| 229 | s := RunServerOnPort(-1) |
| 230 | defer s.Shutdown() |
| 231 | |
| 232 | nc, err := nats.Connect(s.ClientURL()) |
| 233 | if err != nil { |
| 234 | t.Fatalf("Error on connect: %v", err) |
| 235 | } |
| 236 | defer nc.Close() |
| 237 | |
| 238 | subs := make([]*nats.Subscription, 4) |
| 239 | for i := 0; i < 4; i++ { |
| 240 | sub, err := nc.QueueSubscribeSync("foo", "q") |
| 241 | if err != nil { |
| 242 | t.Fatal("Failed to subscribe: ", err) |
| 243 | } |
| 244 | subs[i] = sub |
| 245 | defer sub.Unsubscribe() |
| 246 | } |
| 247 | |
| 248 | // Send some messages to ourselves. |
| 249 | total := 100 |
| 250 | for i := 0; i < total; i++ { |
| 251 | if err := nc.Publish("foo", []byte(fmt.Sprintf("%d", i))); err != nil { |
| 252 | t.Fatalf("Error on publish: %v", err) |
| 253 | } |
| 254 | } |
| 255 | |
| 256 | wg := sync.WaitGroup{} |
| 257 | wg.Add(100) |
| 258 | startWg := sync.WaitGroup{} |
| 259 | startWg.Add(4) |
| 260 | |
| 261 | for i := range subs { |
| 262 | go func(i int) { |
| 263 | startWg.Done() |
| 264 | for _, err := range subs[i].MsgsTimeout(100 * time.Millisecond) { |
| 265 | if err != nil { |
| 266 | break |
| 267 | } |
| 268 | wg.Done() |
| 269 | } |
| 270 | }(i) |
| 271 | } |
| 272 | |
| 273 | startWg.Wait() |
| 274 | |
| 275 | wg.Wait() |
| 276 | |
| 277 | for _, sub := range subs { |
| 278 | if _, err = sub.NextMsg(100 * time.Millisecond); !errors.Is(err, nats.ErrTimeout) { |
| 279 | t.Fatalf("Expected timeout waiting for next message, got %v", err) |
| 280 | } |
| 281 | } |
| 282 | }) |
| 283 | |
| 284 | t.Run("permissions violation", func(t *testing.T) { |
nothing calls this directly
no test coverage detected