MCPcopy
hub / github.com/nats-io/nats.go / TestJetStream_CreateConsumer

Function TestJetStream_CreateConsumer

jetstream/test/jetstream_test.go:1425–1530  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

1423}
1424
1425func TestJetStream_CreateConsumer(t *testing.T) {
1426 tests := []struct {
1427 name string
1428 consumerConfig jetstream.ConsumerConfig
1429 shouldCreate bool
1430 stream string
1431 withError error
1432 }{
1433 {
1434 name: "create consumer",
1435 consumerConfig: jetstream.ConsumerConfig{Durable: "dur"},
1436 stream: "foo",
1437 shouldCreate: true,
1438 },
1439 {
1440 name: "consumer already exists, error",
1441 consumerConfig: jetstream.ConsumerConfig{Durable: "dur", Description: "test consumer"},
1442 stream: "foo",
1443 withError: jetstream.ErrConsumerExists,
1444 },
1445 {
1446 name: "stream does not exist",
1447 stream: "abc",
1448 consumerConfig: jetstream.ConsumerConfig{Durable: "dur"},
1449 withError: jetstream.ErrStreamNotFound,
1450 },
1451 {
1452 name: "invalid stream name",
1453 stream: "foo.1",
1454 withError: jetstream.ErrInvalidStreamName,
1455 },
1456 }
1457
1458 srv := RunBasicJetStreamServer()
1459 defer shutdownJSServerAndRemoveStorage(t, srv)
1460 nc, err := nats.Connect(srv.ClientURL())
1461 if err != nil {
1462 t.Fatalf("Unexpected error: %v", err)
1463 }
1464
1465 js, err := jetstream.New(nc)
1466 if err != nil {
1467 t.Fatalf("Unexpected error: %v", err)
1468 }
1469 defer nc.Close()
1470
1471 for _, consType := range []string{"pull", "push"} {
1472 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
1473 defer cancel()
1474 s, err := js.CreateStream(ctx, jetstream.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}})
1475 if err != nil {
1476 t.Fatalf("Unexpected error: %v", err)
1477 }
1478
1479 for _, test := range tests {
1480 t.Run(fmt.Sprintf("%s %s", consType, test.name), func(t *testing.T) {
1481 var sub *nats.Subscription
1482 if test.consumerConfig.FilterSubject != "" {

Callers

nothing calls this directly

Calls 15

NextMsgWithContextMethod · 0.95
CachedInfoMethod · 0.95
NewFunction · 0.92
ConnectMethod · 0.80
FatalfMethod · 0.80
RunBasicJetStreamServerFunction · 0.70
CreateStreamMethod · 0.65
SubscribeSyncMethod · 0.65
CreateConsumerMethod · 0.65
CreatePushConsumerMethod · 0.65
ConsumerMethod · 0.65

Tested by

no test coverage detected