MCPcopy
hub / github.com/nats-io/nats.go / TestOrderedConsumerNext

Function TestOrderedConsumerNext

jetstream/test/ordered_test.go:1529–1679  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

1527}
1528
1529func TestOrderedConsumerNext(t *testing.T) {
1530 testSubject := "FOO.123"
1531 testMsgs := []string{"m1", "m2", "m3", "m4", "m5"}
1532 publishTestMsgs := func(t *testing.T, js jetstream.JetStream) {
1533 for _, msg := range testMsgs {
1534 if _, err := js.Publish(context.Background(), testSubject, []byte(msg)); err != nil {
1535 t.Fatalf("Unexpected error during publish: %s", err)
1536 }
1537 }
1538 }
1539 t.Run("base usage, delete consumer", func(t *testing.T) {
1540 srv := RunBasicJetStreamServer()
1541 defer shutdownJSServerAndRemoveStorage(t, srv)
1542 nc, err := nats.Connect(srv.ClientURL())
1543 if err != nil {
1544 t.Fatalf("Unexpected error: %v", err)
1545 }
1546
1547 js, err := jetstream.New(nc)
1548 if err != nil {
1549 t.Fatalf("Unexpected error: %v", err)
1550 }
1551 defer nc.Close()
1552
1553 ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
1554 defer cancel()
1555 s, err := js.CreateStream(ctx, jetstream.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}})
1556 if err != nil {
1557 t.Fatalf("Unexpected error: %v", err)
1558 }
1559 c, err := s.OrderedConsumer(ctx, jetstream.OrderedConsumerConfig{})
1560 if err != nil {
1561 t.Fatalf("Unexpected error: %v", err)
1562 }
1563
1564 publishTestMsgs(t, js)
1565 _, err = c.Next()
1566 if err != nil {
1567 t.Fatalf("Unexpected error: %s", err)
1568 }
1569
1570 name := c.CachedInfo().Name
1571 if err := s.DeleteConsumer(ctx, name); err != nil {
1572 t.Fatal(err)
1573 }
1574 _, err = c.Next()
1575 if err != nil {
1576 t.Fatalf("Unexpected error: %s", err)
1577 }
1578 })
1579
1580 t.Run("consumer used as consume", func(t *testing.T) {
1581 srv := RunBasicJetStreamServer()
1582 defer shutdownJSServerAndRemoveStorage(t, srv)
1583 nc, err := nats.Connect(srv.ClientURL())
1584 if err != nil {
1585 t.Fatalf("Unexpected error: %v", err)
1586 }

Callers

nothing calls this directly

Calls 15

NewFunction · 0.92
FetchMaxWaitFunction · 0.92
FatalfMethod · 0.80
ConnectMethod · 0.80
RunBasicJetStreamServerFunction · 0.70
PublishMethod · 0.65
CreateStreamMethod · 0.65
OrderedConsumerMethod · 0.65
NextMethod · 0.65
CachedInfoMethod · 0.65
DeleteConsumerMethod · 0.65

Tested by

no test coverage detected