MCPcopy
hub / github.com/nats-io/nats.go / TestJetStreamOrderedConsumer

Function TestJetStreamOrderedConsumer

test/js_internal_test.go:32–228  ·  view source on GitHub ↗

Need access to internals for loss testing.

(t *testing.T)

Source from the content-addressed store, hash-verified

30
31// Need access to internals for loss testing.
32func TestJetStreamOrderedConsumer(t *testing.T) {
33 s := RunBasicJetStreamServer()
34 defer shutdownJSServerAndRemoveStorage(t, s)
35
36 nc, js := jsClient(t, s)
37 defer nc.Close()
38
39 var err error
40 _, err = js.AddStream(&nats.StreamConfig{
41 Name: "OBJECT",
42 Subjects: []string{"a"},
43 Storage: nats.MemoryStorage,
44 })
45 if err != nil {
46 t.Fatalf("Unexpected error: %v", err)
47 }
48
49 // Will be used as start time to validate proper reset to sequence on retries.
50 startTime := time.Now()
51
52 // Create a sample asset.
53 msg := make([]byte, 1024*1024)
54 rand.Read(msg)
55 msg = []byte(base64.StdEncoding.EncodeToString(msg))
56 mlen, sum := len(msg), sha256.Sum256(msg)
57
58 // Now send into the stream as chunks.
59 const chunkSize = 1024
60 for i := 0; i < mlen; i += chunkSize {
61 var chunk []byte
62 if mlen-i <= chunkSize {
63 chunk = msg[i:]
64 } else {
65 chunk = msg[i : i+chunkSize]
66 }
67 msg := nats.NewMsg("a")
68 msg.Data = chunk
69 msg.Header.Set("data", "true")
70 js.PublishMsgAsync(msg)
71 }
72 js.PublishAsync("a", nil) // eof
73
74 select {
75 case <-js.PublishAsyncComplete():
76 case <-time.After(time.Second):
77 t.Fatalf("Did not receive completion signal")
78 }
79
80 // Do some tests on simple misconfigurations first.
81 // For ordered delivery a couple of things need to be set properly.
82 // Can't be durable or have ack policy that is not ack none or max deliver set.
83 _, err = js.SubscribeSync("a", nats.OrderedConsumer(), nats.Durable("dlc"))
84 if err == nil || !strings.Contains(err.Error(), "ordered consumer") {
85 t.Fatalf("Expected an error, got %v", err)
86 }
87
88 _, err = js.SubscribeSync("a", nats.OrderedConsumer(), nats.AckExplicit())
89 if err == nil || !strings.Contains(err.Error(), "ordered consumer") {

Callers

nothing calls this directly

Calls 15

FatalfMethod · 0.80
UnsubscribeMethod · 0.80
NextMsgMethod · 0.80
RemoveMsgFilterMethod · 0.80
AddMsgFilterMethod · 0.80
RunBasicJetStreamServerFunction · 0.70
jsClientFunction · 0.70
AddStreamMethod · 0.65
SetMethod · 0.65
PublishMsgAsyncMethod · 0.65
PublishAsyncMethod · 0.65

Tested by

no test coverage detected