MCPcopy
hub / github.com/nats-io/nats.go / TestJetStreamOrderedConsumerRecreateAfterReconnect

Function TestJetStreamOrderedConsumerRecreateAfterReconnect

test/js_test.go:10051–10153  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

10049}
10050
10051func TestJetStreamOrderedConsumerRecreateAfterReconnect(t *testing.T) {
10052 s := RunBasicJetStreamServer()
10053
10054 // monitor for ErrConsumerNotActive error and suppress logging
10055 hbMissed := make(chan struct{}, 10)
10056 errHandler := func(c *nats.Conn, s *nats.Subscription, err error) {
10057 if !errors.Is(err, nats.ErrConsumerNotActive) {
10058 t.Fatalf("Unexpected error: %v", err)
10059 }
10060 hbMissed <- struct{}{}
10061 }
10062 nc, js := jsClient(t, s, nats.ErrorHandler(errHandler))
10063 defer nc.Close()
10064
10065 if _, err := js.AddStream(&nats.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}}); err != nil {
10066 t.Fatalf("Unexpected error: %v", err)
10067 }
10068 sub, err := js.SubscribeSync("FOO.A", nats.OrderedConsumer(), nats.IdleHeartbeat(100*time.Millisecond))
10069 if err != nil {
10070 t.Fatalf("Unexpected error: %v", err)
10071 }
10072 consInfo, err := sub.ConsumerInfo()
10073 if err != nil {
10074 t.Fatalf("Unexpected error: %v", err)
10075 }
10076 consName := consInfo.Name
10077 // validate that the generated name of the consumer is 8
10078 // characters long (shorter than standard nuid)
10079 if len(consName) != 8 {
10080 t.Fatalf("Unexpected consumer name: %q", consName)
10081 }
10082 if _, err := js.Publish("FOO.A", []byte("msg 1")); err != nil {
10083 t.Fatalf("Unexpected error: %v", err)
10084 }
10085 msg, err := sub.NextMsg(2 * time.Second)
10086 if err != nil {
10087 t.Fatalf("Unexpected error: %v", err)
10088 }
10089 if string(msg.Data) != "msg 1" {
10090 t.Fatalf("Invalid msg value; want: 'msg 1'; got: %q", string(msg.Data))
10091 }
10092
10093 apiSub, err := nc.SubscribeSync("$JS.API.CONSUMER.*.>")
10094 if err != nil {
10095 t.Fatalf("Unexpected error: %v", err)
10096 }
10097
10098 // restart the server
10099 s = restartBasicJSServer(t, s)
10100 defer shutdownJSServerAndRemoveStorage(t, s)
10101
10102 // wait until we miss heartbeat
10103 select {
10104 case <-hbMissed:
10105 case <-time.After(10 * time.Second):
10106 t.Fatalf("Did not receive consumer not active error")
10107 }
10108 consDeleteMsg, err := apiSub.NextMsg(2 * time.Second)

Callers

nothing calls this directly

Calls 14

FatalfMethod · 0.80
ErrorHandlerMethod · 0.80
NextMsgMethod · 0.80
RunBasicJetStreamServerFunction · 0.70
jsClientFunction · 0.70
restartBasicJSServerFunction · 0.70
AddStreamMethod · 0.65
SubscribeSyncMethod · 0.65
OrderedConsumerMethod · 0.65
ConsumerInfoMethod · 0.65
PublishMethod · 0.65

Tested by

no test coverage detected