(t *testing.T, nodes ...*jsServer)
| 6623 | } |
| 6624 | |
| 6625 | func testJetStreamMirror_Source(t *testing.T, nodes ...*jsServer) { |
| 6626 | srvA := nodes[0] |
| 6627 | nc, js := jsClient(t, srvA.Server) |
| 6628 | defer nc.Close() |
| 6629 | |
| 6630 | var err error |
| 6631 | |
| 6632 | _, err = js.AddStream(&nats.StreamConfig{ |
| 6633 | Name: "origin", |
| 6634 | Placement: &nats.Placement{ |
| 6635 | Tags: []string{"NODE_0"}, |
| 6636 | }, |
| 6637 | Storage: nats.MemoryStorage, |
| 6638 | Replicas: 1, |
| 6639 | }) |
| 6640 | if err != nil { |
| 6641 | t.Fatalf("Unexpected error creating stream: %v", err) |
| 6642 | } |
| 6643 | |
| 6644 | totalMsgs := 10 |
| 6645 | for i := 0; i < totalMsgs; i++ { |
| 6646 | payload := fmt.Sprintf("i:%d", i) |
| 6647 | js.Publish("origin", []byte(payload)) |
| 6648 | } |
| 6649 | |
| 6650 | t.Run("create mirrors", func(t *testing.T) { |
| 6651 | _, err = js.AddStream(&nats.StreamConfig{ |
| 6652 | Name: "m1", |
| 6653 | Mirror: &nats.StreamSource{Name: "origin"}, |
| 6654 | Storage: nats.FileStorage, |
| 6655 | Replicas: 1, |
| 6656 | }) |
| 6657 | if err != nil { |
| 6658 | t.Fatalf("Unexpected error creating stream: %v", err) |
| 6659 | } |
| 6660 | |
| 6661 | _, err = js.AddStream(&nats.StreamConfig{ |
| 6662 | Name: "m2", |
| 6663 | Mirror: &nats.StreamSource{Name: "origin"}, |
| 6664 | Storage: nats.MemoryStorage, |
| 6665 | Replicas: 1, |
| 6666 | }) |
| 6667 | if err != nil { |
| 6668 | t.Fatalf("Unexpected error creating stream: %v", err) |
| 6669 | } |
| 6670 | msgs := make([]*nats.RawStreamMsg, 0) |
| 6671 | |
| 6672 | // Stored message sequences start at 1 |
| 6673 | startSequence := 1 |
| 6674 | |
| 6675 | GetNextMsg: |
| 6676 | for i := startSequence; i < totalMsgs+1; i++ { |
| 6677 | var ( |
| 6678 | err error |
| 6679 | seq = uint64(i) |
| 6680 | msgA *nats.RawStreamMsg |
| 6681 | msgB *nats.RawStreamMsg |
| 6682 | sourceMsg *nats.RawStreamMsg |
nothing calls this directly
no test coverage detected