MCPcopy
hub / github.com/nats-io/nats.go / TestPushConsumerConsume_WithQueue

Function TestPushConsumerConsume_WithQueue

jetstream/test/push_test.go:509–582  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

507}
508
509func TestPushConsumerConsume_WithQueue(t *testing.T) {
510 srv := RunBasicJetStreamServer()
511 defer shutdownJSServerAndRemoveStorage(t, srv)
512 nc, err := nats.Connect(srv.ClientURL())
513 if err != nil {
514 t.Fatalf("Unexpected error: %v", err)
515 }
516
517 js, err := jetstream.New(nc)
518 if err != nil {
519 t.Fatalf("Unexpected error: %v", err)
520 }
521 defer nc.Close()
522
523 ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
524 defer cancel()
525 s, err := js.CreateStream(ctx, jetstream.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}})
526 if err != nil {
527 t.Fatalf("Unexpected error: %v", err)
528 }
529 c1, err := s.CreatePushConsumer(ctx, jetstream.ConsumerConfig{
530 DeliverSubject: nats.NewInbox(),
531 DeliverGroup: "workers",
532 AckPolicy: jetstream.AckExplicitPolicy,
533 })
534 if err != nil {
535 t.Fatalf("Unexpected error: %v", err)
536 }
537
538 c2, err := s.PushConsumer(ctx, c1.CachedInfo().Name)
539 if err != nil {
540 t.Fatalf("Unexpected error: %v", err)
541 }
542
543 msgs := make([]jetstream.Msg, 0)
544 lock := sync.Mutex{}
545 wg := &sync.WaitGroup{}
546 l1, err := c1.Consume(func(msg jetstream.Msg) {
547 msg.Ack()
548 lock.Lock()
549 msgs = append(msgs, msg)
550 lock.Unlock()
551 wg.Done()
552 })
553 if err != nil {
554 t.Fatalf("Unexpected error: %v", err)
555 }
556 defer l1.Stop()
557 l2, err := c2.Consume(func(msg jetstream.Msg) {
558 msg.Ack()
559 lock.Lock()
560 msgs = append(msgs, msg)
561 lock.Unlock()
562 wg.Done()
563 })
564 if err != nil {
565 t.Fatalf("Unexpected error: %v", err)
566 }

Callers

nothing calls this directly

Calls 15

NewFunction · 0.92
ConnectMethod · 0.80
FatalfMethod · 0.80
NewInboxMethod · 0.80
RunBasicJetStreamServerFunction · 0.70
CreateStreamMethod · 0.65
CreatePushConsumerMethod · 0.65
PushConsumerMethod · 0.65
CachedInfoMethod · 0.65
ConsumeMethod · 0.65
AckMethod · 0.65

Tested by

no test coverage detected