(t *testing.T)
| 507 | } |
| 508 | |
| 509 | func TestPushConsumerConsume_WithQueue(t *testing.T) { |
| 510 | srv := RunBasicJetStreamServer() |
| 511 | defer shutdownJSServerAndRemoveStorage(t, srv) |
| 512 | nc, err := nats.Connect(srv.ClientURL()) |
| 513 | if err != nil { |
| 514 | t.Fatalf("Unexpected error: %v", err) |
| 515 | } |
| 516 | |
| 517 | js, err := jetstream.New(nc) |
| 518 | if err != nil { |
| 519 | t.Fatalf("Unexpected error: %v", err) |
| 520 | } |
| 521 | defer nc.Close() |
| 522 | |
| 523 | ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) |
| 524 | defer cancel() |
| 525 | s, err := js.CreateStream(ctx, jetstream.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}}) |
| 526 | if err != nil { |
| 527 | t.Fatalf("Unexpected error: %v", err) |
| 528 | } |
| 529 | c1, err := s.CreatePushConsumer(ctx, jetstream.ConsumerConfig{ |
| 530 | DeliverSubject: nats.NewInbox(), |
| 531 | DeliverGroup: "workers", |
| 532 | AckPolicy: jetstream.AckExplicitPolicy, |
| 533 | }) |
| 534 | if err != nil { |
| 535 | t.Fatalf("Unexpected error: %v", err) |
| 536 | } |
| 537 | |
| 538 | c2, err := s.PushConsumer(ctx, c1.CachedInfo().Name) |
| 539 | if err != nil { |
| 540 | t.Fatalf("Unexpected error: %v", err) |
| 541 | } |
| 542 | |
| 543 | msgs := make([]jetstream.Msg, 0) |
| 544 | lock := sync.Mutex{} |
| 545 | wg := &sync.WaitGroup{} |
| 546 | l1, err := c1.Consume(func(msg jetstream.Msg) { |
| 547 | msg.Ack() |
| 548 | lock.Lock() |
| 549 | msgs = append(msgs, msg) |
| 550 | lock.Unlock() |
| 551 | wg.Done() |
| 552 | }) |
| 553 | if err != nil { |
| 554 | t.Fatalf("Unexpected error: %v", err) |
| 555 | } |
| 556 | defer l1.Stop() |
| 557 | l2, err := c2.Consume(func(msg jetstream.Msg) { |
| 558 | msg.Ack() |
| 559 | lock.Lock() |
| 560 | msgs = append(msgs, msg) |
| 561 | lock.Unlock() |
| 562 | wg.Done() |
| 563 | }) |
| 564 | if err != nil { |
| 565 | t.Fatalf("Unexpected error: %v", err) |
| 566 | } |
nothing calls this directly
no test coverage detected