MCPcopy
hub / github.com/nats-io/nats.go / TestJetStreamAckPending_Pull

Function TestJetStreamAckPending_Pull

test/js_test.go:2079–2129  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

2077}
2078
2079func TestJetStreamAckPending_Pull(t *testing.T) {
2080 s := RunBasicJetStreamServer()
2081 defer shutdownJSServerAndRemoveStorage(t, s)
2082
2083 nc, js := jsClient(t, s)
2084 defer nc.Close()
2085
2086 var err error
2087
2088 _, err = js.AddStream(&nats.StreamConfig{
2089 Name: "TEST",
2090 Subjects: []string{"foo"},
2091 })
2092 if err != nil {
2093 t.Fatalf("Unexpected error: %v", err)
2094 }
2095
2096 const totalMsgs = 4
2097 for i := 0; i < totalMsgs; i++ {
2098 if _, err := js.Publish("foo", []byte(fmt.Sprintf("msg %d", i))); err != nil {
2099 t.Fatal(err)
2100 }
2101 }
2102
2103 ackPendingLimit := 3
2104 sub, err := js.PullSubscribe("foo", "dname-pull-ack-wait", nats.MaxAckPending(ackPendingLimit))
2105 if err != nil {
2106 t.Fatal(err)
2107 }
2108 defer sub.Unsubscribe()
2109
2110 var msgs []*nats.Msg
2111 for i := 0; i < ackPendingLimit; i++ {
2112 ms, err := sub.Fetch(1)
2113 if err != nil {
2114 t.Fatalf("Error on fetch: %v", err)
2115 }
2116 msgs = append(msgs, ms...)
2117 }
2118
2119 // Since we don't ack, the next fetch should time out because the server
2120 // won't send new ones until we ack some.
2121 if _, err := sub.Fetch(1, nats.MaxWait(250*time.Millisecond)); err != nats.ErrTimeout {
2122 t.Fatalf("Expected timeout, got: %v", err)
2123 }
2124 // Ack one message, then we should be able to get the next
2125 msgs[0].Ack()
2126 if _, err := sub.Fetch(1); err != nil {
2127 t.Fatalf("Unexpected error: %v", err)
2128 }
2129}
2130
2131func TestJetStreamAckPending_Push(t *testing.T) {
2132 s := RunBasicJetStreamServer()

Callers

nothing calls this directly

Calls 11

FatalfMethod · 0.80
UnsubscribeMethod · 0.80
RunBasicJetStreamServerFunction · 0.70
jsClientFunction · 0.70
AddStreamMethod · 0.65
PublishMethod · 0.65
PullSubscribeMethod · 0.65
FetchMethod · 0.65
AckMethod · 0.65
CloseMethod · 0.45

Tested by

no test coverage detected