(t *testing.T)
| 5486 | } |
| 5487 | |
| 5488 | func TestJetStreamSubscribe_AckDup(t *testing.T) { |
| 5489 | s := RunBasicJetStreamServer() |
| 5490 | defer shutdownJSServerAndRemoveStorage(t, s) |
| 5491 | |
| 5492 | nc, js := jsClient(t, s) |
| 5493 | defer nc.Close() |
| 5494 | |
| 5495 | var err error |
| 5496 | |
| 5497 | // Create the stream using our client API. |
| 5498 | _, err = js.AddStream(&nats.StreamConfig{ |
| 5499 | Name: "TEST", |
| 5500 | Subjects: []string{"foo"}, |
| 5501 | }) |
| 5502 | if err != nil { |
| 5503 | t.Fatalf("Unexpected error: %v", err) |
| 5504 | } |
| 5505 | |
| 5506 | js.Publish("foo", []byte("hello")) |
| 5507 | |
| 5508 | ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) |
| 5509 | defer cancel() |
| 5510 | |
| 5511 | pings := make(chan struct{}, 6) |
| 5512 | nc.Subscribe("$JS.ACK.TEST.>", func(msg *nats.Msg) { |
| 5513 | pings <- struct{}{} |
| 5514 | }) |
| 5515 | nc.Flush() |
| 5516 | |
| 5517 | ch := make(chan error, 6) |
| 5518 | _, err = js.Subscribe("foo", func(m *nats.Msg) { |
| 5519 | // Only first ack will be sent, auto ack that will occur after |
| 5520 | // this won't be sent either. |
| 5521 | ch <- m.Ack() |
| 5522 | |
| 5523 | // Any following acks should fail. |
| 5524 | ch <- m.Ack() |
| 5525 | ch <- m.Nak() |
| 5526 | ch <- m.AckSync() |
| 5527 | ch <- m.Term() |
| 5528 | ch <- m.InProgress() |
| 5529 | }) |
| 5530 | if err != nil { |
| 5531 | t.Fatalf("Unexpected error: %v", err) |
| 5532 | } |
| 5533 | <-ctx.Done() |
| 5534 | ackErr1 := <-ch |
| 5535 | if ackErr1 != nil { |
| 5536 | t.Errorf("Unexpected error: %v", ackErr1) |
| 5537 | } |
| 5538 | |
| 5539 | for i := 0; i < 5; i++ { |
| 5540 | e := <-ch |
| 5541 | if e != nats.ErrMsgAlreadyAckd { |
| 5542 | t.Errorf("Expected error: %v", e) |
| 5543 | } |
| 5544 | } |
| 5545 | if len(pings) != 1 { |
nothing calls this directly
no test coverage detected