(t *testing.T)
| 2280 | } |
| 2281 | |
| 2282 | func TestJetStream_Drain(t *testing.T) { |
| 2283 | s := RunBasicJetStreamServer() |
| 2284 | defer shutdownJSServerAndRemoveStorage(t, s) |
| 2285 | |
| 2286 | ctx, done := context.WithTimeout(context.Background(), 10*time.Second) |
| 2287 | |
| 2288 | nc, err := nats.Connect(s.ClientURL(), nats.ClosedHandler(func(_ *nats.Conn) { |
| 2289 | done() |
| 2290 | })) |
| 2291 | if err != nil { |
| 2292 | t.Fatalf("Unexpected error: %v", err) |
| 2293 | } |
| 2294 | defer nc.Close() |
| 2295 | |
| 2296 | js, err := nc.JetStream(nats.MaxWait(250 * time.Millisecond)) |
| 2297 | if err != nil { |
| 2298 | t.Fatalf("Unexpected error: %v", err) |
| 2299 | } |
| 2300 | _, err = js.AddStream(&nats.StreamConfig{ |
| 2301 | Name: "TEST", |
| 2302 | Subjects: []string{"drain"}, |
| 2303 | }) |
| 2304 | if err != nil { |
| 2305 | t.Fatalf("Unexpected error: %v", err) |
| 2306 | } |
| 2307 | |
| 2308 | total := 500 |
| 2309 | for i := 0; i < total; i++ { |
| 2310 | _, err := js.Publish("drain", []byte(fmt.Sprintf("i:%d", i))) |
| 2311 | if err != nil { |
| 2312 | t.Error(err) |
| 2313 | } |
| 2314 | } |
| 2315 | |
| 2316 | // Create some consumers and ensure that there are no timeouts. |
| 2317 | errCh := make(chan error, 2048) |
| 2318 | createSub := func(name string) (*nats.Subscription, error) { |
| 2319 | return js.Subscribe("drain", func(m *nats.Msg) { |
| 2320 | err := m.AckSync() |
| 2321 | if err != nil { |
| 2322 | errCh <- err |
| 2323 | } |
| 2324 | }, nats.Durable(name), nats.ManualAck()) |
| 2325 | } |
| 2326 | |
| 2327 | subA, err := createSub("A") |
| 2328 | if err != nil { |
| 2329 | t.Fatalf("Unexpected error: %v", err) |
| 2330 | } |
| 2331 | |
| 2332 | subB, err := createSub("B") |
| 2333 | if err != nil { |
| 2334 | t.Fatalf("Unexpected error: %v", err) |
| 2335 | } |
| 2336 | |
| 2337 | subC, err := createSub("C") |
| 2338 | if err != nil { |
| 2339 | t.Fatalf("Unexpected error: %v", err) |
nothing calls this directly
no test coverage detected