(t *testing.T)
| 10361 | } |
| 10362 | |
| 10363 | func TestJetStreamOrderedConsumerDeleteAssets(t *testing.T) { |
| 10364 | s := RunBasicJetStreamServer() |
| 10365 | defer shutdownJSServerAndRemoveStorage(t, s) |
| 10366 | |
| 10367 | nc, js := jsClient(t, s) |
| 10368 | defer nc.Close() |
| 10369 | |
| 10370 | var err error |
| 10371 | |
| 10372 | // For capturing errors. |
| 10373 | errCh := make(chan error, 1) |
| 10374 | nc.SetErrorHandler(func(_ *nats.Conn, _ *nats.Subscription, err error) { |
| 10375 | errCh <- err |
| 10376 | }) |
| 10377 | |
| 10378 | // Create a sample asset. |
| 10379 | mlen := 128 * 1024 |
| 10380 | msg := make([]byte, mlen) |
| 10381 | |
| 10382 | createStream := func() { |
| 10383 | t.Helper() |
| 10384 | _, err = js.AddStream(&nats.StreamConfig{ |
| 10385 | Name: "OBJECT", |
| 10386 | Subjects: []string{"a"}, |
| 10387 | Storage: nats.MemoryStorage, |
| 10388 | }) |
| 10389 | if err != nil { |
| 10390 | t.Fatalf("Unexpected error: %v", err) |
| 10391 | } |
| 10392 | |
| 10393 | // Now send into the stream as chunks. |
| 10394 | const chunkSize = 256 |
| 10395 | for i := 0; i < mlen; i += chunkSize { |
| 10396 | var chunk []byte |
| 10397 | if mlen-i <= chunkSize { |
| 10398 | chunk = msg[i:] |
| 10399 | } else { |
| 10400 | chunk = msg[i : i+chunkSize] |
| 10401 | } |
| 10402 | js.PublishAsync("a", chunk) |
| 10403 | } |
| 10404 | select { |
| 10405 | case <-js.PublishAsyncComplete(): |
| 10406 | case <-time.After(time.Second): |
| 10407 | t.Fatalf("Did not receive completion signal") |
| 10408 | } |
| 10409 | } |
| 10410 | |
| 10411 | t.Run("remove stream, expect error", func(t *testing.T) { |
| 10412 | createStream() |
| 10413 | |
| 10414 | sub, err := js.SubscribeSync("a", nats.OrderedConsumer(), nats.IdleHeartbeat(200*time.Millisecond)) |
| 10415 | if err != nil { |
| 10416 | t.Fatalf("Unexpected error: %v", err) |
| 10417 | } |
| 10418 | defer sub.Unsubscribe() |
| 10419 | |
| 10420 | // Since we are sync we will be paused here due to flow control. |
nothing calls this directly
no test coverage detected