Need access to internals for loss testing.
(t *testing.T)
| 30 | |
| 31 | // Need access to internals for loss testing. |
| 32 | func TestJetStreamOrderedConsumer(t *testing.T) { |
| 33 | s := RunBasicJetStreamServer() |
| 34 | defer shutdownJSServerAndRemoveStorage(t, s) |
| 35 | |
| 36 | nc, js := jsClient(t, s) |
| 37 | defer nc.Close() |
| 38 | |
| 39 | var err error |
| 40 | _, err = js.AddStream(&nats.StreamConfig{ |
| 41 | Name: "OBJECT", |
| 42 | Subjects: []string{"a"}, |
| 43 | Storage: nats.MemoryStorage, |
| 44 | }) |
| 45 | if err != nil { |
| 46 | t.Fatalf("Unexpected error: %v", err) |
| 47 | } |
| 48 | |
| 49 | // Will be used as start time to validate proper reset to sequence on retries. |
| 50 | startTime := time.Now() |
| 51 | |
| 52 | // Create a sample asset. |
| 53 | msg := make([]byte, 1024*1024) |
| 54 | rand.Read(msg) |
| 55 | msg = []byte(base64.StdEncoding.EncodeToString(msg)) |
| 56 | mlen, sum := len(msg), sha256.Sum256(msg) |
| 57 | |
| 58 | // Now send into the stream as chunks. |
| 59 | const chunkSize = 1024 |
| 60 | for i := 0; i < mlen; i += chunkSize { |
| 61 | var chunk []byte |
| 62 | if mlen-i <= chunkSize { |
| 63 | chunk = msg[i:] |
| 64 | } else { |
| 65 | chunk = msg[i : i+chunkSize] |
| 66 | } |
| 67 | msg := nats.NewMsg("a") |
| 68 | msg.Data = chunk |
| 69 | msg.Header.Set("data", "true") |
| 70 | js.PublishMsgAsync(msg) |
| 71 | } |
| 72 | js.PublishAsync("a", nil) // eof |
| 73 | |
| 74 | select { |
| 75 | case <-js.PublishAsyncComplete(): |
| 76 | case <-time.After(time.Second): |
| 77 | t.Fatalf("Did not receive completion signal") |
| 78 | } |
| 79 | |
| 80 | // Do some tests on simple misconfigurations first. |
| 81 | // For ordered delivery a couple of things need to be set properly. |
| 82 | // Can't be durable or have ack policy that is not ack none or max deliver set. |
| 83 | _, err = js.SubscribeSync("a", nats.OrderedConsumer(), nats.Durable("dlc")) |
| 84 | if err == nil || !strings.Contains(err.Error(), "ordered consumer") { |
| 85 | t.Fatalf("Expected an error, got %v", err) |
| 86 | } |
| 87 | |
| 88 | _, err = js.SubscribeSync("a", nats.OrderedConsumer(), nats.AckExplicit()) |
| 89 | if err == nil || !strings.Contains(err.Error(), "ordered consumer") { |
nothing calls this directly
no test coverage detected