(t *testing.T)
| 2077 | } |
| 2078 | |
| 2079 | func TestJetStreamAckPending_Pull(t *testing.T) { |
| 2080 | s := RunBasicJetStreamServer() |
| 2081 | defer shutdownJSServerAndRemoveStorage(t, s) |
| 2082 | |
| 2083 | nc, js := jsClient(t, s) |
| 2084 | defer nc.Close() |
| 2085 | |
| 2086 | var err error |
| 2087 | |
| 2088 | _, err = js.AddStream(&nats.StreamConfig{ |
| 2089 | Name: "TEST", |
| 2090 | Subjects: []string{"foo"}, |
| 2091 | }) |
| 2092 | if err != nil { |
| 2093 | t.Fatalf("Unexpected error: %v", err) |
| 2094 | } |
| 2095 | |
| 2096 | const totalMsgs = 4 |
| 2097 | for i := 0; i < totalMsgs; i++ { |
| 2098 | if _, err := js.Publish("foo", []byte(fmt.Sprintf("msg %d", i))); err != nil { |
| 2099 | t.Fatal(err) |
| 2100 | } |
| 2101 | } |
| 2102 | |
| 2103 | ackPendingLimit := 3 |
| 2104 | sub, err := js.PullSubscribe("foo", "dname-pull-ack-wait", nats.MaxAckPending(ackPendingLimit)) |
| 2105 | if err != nil { |
| 2106 | t.Fatal(err) |
| 2107 | } |
| 2108 | defer sub.Unsubscribe() |
| 2109 | |
| 2110 | var msgs []*nats.Msg |
| 2111 | for i := 0; i < ackPendingLimit; i++ { |
| 2112 | ms, err := sub.Fetch(1) |
| 2113 | if err != nil { |
| 2114 | t.Fatalf("Error on fetch: %v", err) |
| 2115 | } |
| 2116 | msgs = append(msgs, ms...) |
| 2117 | } |
| 2118 | |
| 2119 | // Since we don't ack, the next fetch should time out because the server |
| 2120 | // won't send new ones until we ack some. |
| 2121 | if _, err := sub.Fetch(1, nats.MaxWait(250*time.Millisecond)); err != nats.ErrTimeout { |
| 2122 | t.Fatalf("Expected timeout, got: %v", err) |
| 2123 | } |
| 2124 | // Ack one message, then we should be able to get the next |
| 2125 | msgs[0].Ack() |
| 2126 | if _, err := sub.Fetch(1); err != nil { |
| 2127 | t.Fatalf("Unexpected error: %v", err) |
| 2128 | } |
| 2129 | } |
| 2130 | |
| 2131 | func TestJetStreamAckPending_Push(t *testing.T) { |
| 2132 | s := RunBasicJetStreamServer() |
nothing calls this directly
no test coverage detected