MCPcopy
hub / github.com/nats-io/nats.go / TestJetStreamOrderedConsumerDeleteAssets

Function TestJetStreamOrderedConsumerDeleteAssets

test/js_test.go:10363–10488  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

10361}
10362
10363func TestJetStreamOrderedConsumerDeleteAssets(t *testing.T) {
10364 s := RunBasicJetStreamServer()
10365 defer shutdownJSServerAndRemoveStorage(t, s)
10366
10367 nc, js := jsClient(t, s)
10368 defer nc.Close()
10369
10370 var err error
10371
10372 // For capturing errors.
10373 errCh := make(chan error, 1)
10374 nc.SetErrorHandler(func(_ *nats.Conn, _ *nats.Subscription, err error) {
10375 errCh <- err
10376 })
10377
10378 // Create a sample asset.
10379 mlen := 128 * 1024
10380 msg := make([]byte, mlen)
10381
10382 createStream := func() {
10383 t.Helper()
10384 _, err = js.AddStream(&nats.StreamConfig{
10385 Name: "OBJECT",
10386 Subjects: []string{"a"},
10387 Storage: nats.MemoryStorage,
10388 })
10389 if err != nil {
10390 t.Fatalf("Unexpected error: %v", err)
10391 }
10392
10393 // Now send into the stream as chunks.
10394 const chunkSize = 256
10395 for i := 0; i < mlen; i += chunkSize {
10396 var chunk []byte
10397 if mlen-i <= chunkSize {
10398 chunk = msg[i:]
10399 } else {
10400 chunk = msg[i : i+chunkSize]
10401 }
10402 js.PublishAsync("a", chunk)
10403 }
10404 select {
10405 case <-js.PublishAsyncComplete():
10406 case <-time.After(time.Second):
10407 t.Fatalf("Did not receive completion signal")
10408 }
10409 }
10410
10411 t.Run("remove stream, expect error", func(t *testing.T) {
10412 createStream()
10413
10414 sub, err := js.SubscribeSync("a", nats.OrderedConsumer(), nats.IdleHeartbeat(200*time.Millisecond))
10415 if err != nil {
10416 t.Fatalf("Unexpected error: %v", err)
10417 }
10418 defer sub.Unsubscribe()
10419
10420 // Since we are sync we will be paused here due to flow control.

Callers

nothing calls this directly

Calls 15

SetErrorHandlerMethod · 0.80
FatalfMethod · 0.80
UnsubscribeMethod · 0.80
NextMsgMethod · 0.80
RunBasicJetStreamServerFunction · 0.70
jsClientFunction · 0.70
AddStreamMethod · 0.65
PublishAsyncMethod · 0.65
PublishAsyncCompleteMethod · 0.65
SubscribeSyncMethod · 0.65
OrderedConsumerMethod · 0.65

Tested by

no test coverage detected