MCPcopy
hub / github.com/nats-io/nats.go / TestJetStreamDrainFailsToDeleteConsumer

Function TestJetStreamDrainFailsToDeleteConsumer

test/js_test.go:9144–9202  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

9142}
9143
9144func TestJetStreamDrainFailsToDeleteConsumer(t *testing.T) {
9145 s := RunBasicJetStreamServer()
9146 defer shutdownJSServerAndRemoveStorage(t, s)
9147
9148 errCh := make(chan error, 1)
9149 nc, err := nats.Connect(s.ClientURL(), nats.ErrorHandler(func(_ *nats.Conn, _ *nats.Subscription, err error) {
9150 select {
9151 case errCh <- err:
9152 default:
9153 }
9154 }))
9155 if err != nil {
9156 t.Fatalf("Unexpected error: %v", err)
9157 }
9158 defer nc.Close()
9159
9160 js, err := nc.JetStream()
9161 if err != nil {
9162 t.Fatalf("Unexpected error: %v", err)
9163 }
9164
9165 if _, err := js.AddStream(&nats.StreamConfig{
9166 Name: "TEST",
9167 Subjects: []string{"foo"},
9168 }); err != nil {
9169 t.Fatalf("Unexpected error: %v", err)
9170 }
9171
9172 js.Publish("foo", []byte("hi"))
9173
9174 blockCh := make(chan struct{})
9175 sub, err := js.Subscribe("foo", func(m *nats.Msg) {
9176 <-blockCh
9177 }, nats.Durable("dur"))
9178 if err != nil {
9179 t.Fatalf("Error subscribing: %v", err)
9180 }
9181
9182 // Initiate the drain... it won't complete because we have blocked the
9183 // message callback.
9184 sub.Drain()
9185
9186 // Now delete the JS consumer
9187 if err := js.DeleteConsumer("TEST", "dur"); err != nil {
9188 t.Fatalf("Error deleting consumer: %v", err)
9189 }
9190
9191 // Now unblock and make sure we get the async error
9192 close(blockCh)
9193
9194 select {
9195 case err := <-errCh:
9196 if !strings.Contains(err.Error(), "consumer not found") {
9197 t.Fatalf("Unexpected async error: %v", err)
9198 }
9199 case <-time.After(time.Second):
9200 t.Fatal("Did not get async error")
9201 }

Callers

nothing calls this directly

Calls 13

ConnectMethod · 0.80
ErrorHandlerMethod · 0.80
FatalfMethod · 0.80
JetStreamMethod · 0.80
RunBasicJetStreamServerFunction · 0.70
AddStreamMethod · 0.65
PublishMethod · 0.65
SubscribeMethod · 0.65
DrainMethod · 0.65
DeleteConsumerMethod · 0.65
ErrorMethod · 0.65

Tested by

no test coverage detected