MCPcopy
hub / github.com/nats-io/nats.go / TestOrderedConsumerCloseConn

Function TestOrderedConsumerCloseConn

jetstream/test/ordered_test.go:2231–2283  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

2229}
2230
2231func TestOrderedConsumerCloseConn(t *testing.T) {
2232 srv := RunBasicJetStreamServer()
2233 defer shutdownJSServerAndRemoveStorage(t, srv)
2234 nc, err := nats.Connect(srv.ClientURL())
2235 if err != nil {
2236 t.Fatalf("Unexpected error: %v", err)
2237 }
2238
2239 js, err := jetstream.New(nc)
2240 if err != nil {
2241 t.Fatalf("Unexpected error: %v", err)
2242 }
2243 defer nc.Close()
2244
2245 ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
2246 defer cancel()
2247 s, err := js.CreateStream(ctx, jetstream.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}})
2248 if err != nil {
2249 t.Fatalf("Unexpected error: %v", err)
2250 }
2251 consumer, err := s.OrderedConsumer(context.Background(), jetstream.OrderedConsumerConfig{})
2252 if err != nil {
2253 t.Fatalf("Failed to create ordered consumer: %v", err)
2254 }
2255
2256 gotConnClosedErr := make(chan struct{})
2257
2258 time.AfterFunc(500*time.Millisecond, func() {
2259 nc.Close()
2260 })
2261
2262 oc, err := consumer.Consume(func(msg jetstream.Msg) {
2263 }, jetstream.ConsumeErrHandler(func(consumeCtx jetstream.ConsumeContext, err error) {
2264 if errors.Is(err, jetstream.ErrConnectionClosed) {
2265 close(gotConnClosedErr)
2266 }
2267 }))
2268 if err != nil {
2269 t.Fatalf("Failed to consume: %v", err)
2270 }
2271
2272 select {
2273 case <-gotConnClosedErr:
2274 case <-time.After(2 * time.Second):
2275 t.Fatal("Timeout waiting for connection closed error")
2276 }
2277
2278 select {
2279 case <-oc.Closed():
2280 case <-time.After(2 * time.Second):
2281 t.Fatalf("Timeout waiting for close")
2282 }
2283}
2284
2285func TestOrderedConsumerCustomPrefix(t *testing.T) {
2286 srv := RunBasicJetStreamServer()

Callers

nothing calls this directly

Calls 12

NewFunction · 0.92
ConsumeErrHandlerTypeAlias · 0.92
ConnectMethod · 0.80
FatalfMethod · 0.80
RunBasicJetStreamServerFunction · 0.70
CreateStreamMethod · 0.65
OrderedConsumerMethod · 0.65
ConsumeMethod · 0.65
ClosedMethod · 0.65
CloseMethod · 0.45
IsMethod · 0.45

Tested by

no test coverage detected