MCPcopy
hub / github.com/nats-io/nats.go / TestJetStreamSubscribe_AckDupInProgress

Function TestJetStreamSubscribe_AckDupInProgress

test/js_test.go:5622–5677  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

5620}
5621
5622func TestJetStreamSubscribe_AckDupInProgress(t *testing.T) {
5623 s := RunBasicJetStreamServer()
5624 defer shutdownJSServerAndRemoveStorage(t, s)
5625
5626 nc, js := jsClient(t, s)
5627 defer nc.Close()
5628
5629 var err error
5630
5631 // Create the stream using our client API.
5632 _, err = js.AddStream(&nats.StreamConfig{
5633 Name: "TEST",
5634 Subjects: []string{"foo"},
5635 })
5636 if err != nil {
5637 t.Fatalf("Unexpected error: %v", err)
5638 }
5639
5640 js.Publish("foo", []byte("hello"))
5641
5642 ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
5643 defer cancel()
5644
5645 pings := make(chan struct{}, 3)
5646 nc.Subscribe("$JS.ACK.TEST.>", func(msg *nats.Msg) {
5647 pings <- struct{}{}
5648 })
5649 nc.Flush()
5650
5651 ch := make(chan error, 3)
5652 _, err = js.Subscribe("foo", func(m *nats.Msg) {
5653 // InProgress ACK can be sent any number of times.
5654 ch <- m.InProgress()
5655 ch <- m.InProgress()
5656 ch <- m.Ack()
5657 }, nats.Durable("WQ"), nats.ManualAck())
5658 if err != nil {
5659 t.Fatalf("Unexpected error: %v", err)
5660 }
5661 <-ctx.Done()
5662 ackErr1 := <-ch
5663 ackErr2 := <-ch
5664 ackErr3 := <-ch
5665 if ackErr1 != nil {
5666 t.Errorf("Unexpected error: %v", ackErr1)
5667 }
5668 if ackErr2 != nil {
5669 t.Errorf("Unexpected error: %v", ackErr2)
5670 }
5671 if ackErr3 != nil {
5672 t.Errorf("Unexpected error: %v", ackErr3)
5673 }
5674 if len(pings) != 3 {
5675 t.Logf("Expected to receive multiple acks, got: %v", len(pings))
5676 }
5677}
5678
5679func TestJetStream_Unsubscribe(t *testing.T) {

Callers

nothing calls this directly

Calls 13

FatalfMethod · 0.80
ErrorfMethod · 0.80
RunBasicJetStreamServerFunction · 0.70
jsClientFunction · 0.70
AddStreamMethod · 0.65
PublishMethod · 0.65
SubscribeMethod · 0.65
InProgressMethod · 0.65
AckMethod · 0.65
DoneMethod · 0.65
CloseMethod · 0.45

Tested by

no test coverage detected