(t *testing.T)
| 5620 | } |
| 5621 | |
| 5622 | func TestJetStreamSubscribe_AckDupInProgress(t *testing.T) { |
| 5623 | s := RunBasicJetStreamServer() |
| 5624 | defer shutdownJSServerAndRemoveStorage(t, s) |
| 5625 | |
| 5626 | nc, js := jsClient(t, s) |
| 5627 | defer nc.Close() |
| 5628 | |
| 5629 | var err error |
| 5630 | |
| 5631 | // Create the stream using our client API. |
| 5632 | _, err = js.AddStream(&nats.StreamConfig{ |
| 5633 | Name: "TEST", |
| 5634 | Subjects: []string{"foo"}, |
| 5635 | }) |
| 5636 | if err != nil { |
| 5637 | t.Fatalf("Unexpected error: %v", err) |
| 5638 | } |
| 5639 | |
| 5640 | js.Publish("foo", []byte("hello")) |
| 5641 | |
| 5642 | ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) |
| 5643 | defer cancel() |
| 5644 | |
| 5645 | pings := make(chan struct{}, 3) |
| 5646 | nc.Subscribe("$JS.ACK.TEST.>", func(msg *nats.Msg) { |
| 5647 | pings <- struct{}{} |
| 5648 | }) |
| 5649 | nc.Flush() |
| 5650 | |
| 5651 | ch := make(chan error, 3) |
| 5652 | _, err = js.Subscribe("foo", func(m *nats.Msg) { |
| 5653 | // InProgress ACK can be sent any number of times. |
| 5654 | ch <- m.InProgress() |
| 5655 | ch <- m.InProgress() |
| 5656 | ch <- m.Ack() |
| 5657 | }, nats.Durable("WQ"), nats.ManualAck()) |
| 5658 | if err != nil { |
| 5659 | t.Fatalf("Unexpected error: %v", err) |
| 5660 | } |
| 5661 | <-ctx.Done() |
| 5662 | ackErr1 := <-ch |
| 5663 | ackErr2 := <-ch |
| 5664 | ackErr3 := <-ch |
| 5665 | if ackErr1 != nil { |
| 5666 | t.Errorf("Unexpected error: %v", ackErr1) |
| 5667 | } |
| 5668 | if ackErr2 != nil { |
| 5669 | t.Errorf("Unexpected error: %v", ackErr2) |
| 5670 | } |
| 5671 | if ackErr3 != nil { |
| 5672 | t.Errorf("Unexpected error: %v", ackErr3) |
| 5673 | } |
| 5674 | if len(pings) != 3 { |
| 5675 | t.Logf("Expected to receive multiple acks, got: %v", len(pings)) |
| 5676 | } |
| 5677 | } |
| 5678 | |
| 5679 | func TestJetStream_Unsubscribe(t *testing.T) { |
nothing calls this directly
no test coverage detected