(t *testing.T)
| 10049 | } |
| 10050 | |
| 10051 | func TestJetStreamOrderedConsumerRecreateAfterReconnect(t *testing.T) { |
| 10052 | s := RunBasicJetStreamServer() |
| 10053 | |
| 10054 | // monitor for ErrConsumerNotActive error and suppress logging |
| 10055 | hbMissed := make(chan struct{}, 10) |
| 10056 | errHandler := func(c *nats.Conn, s *nats.Subscription, err error) { |
| 10057 | if !errors.Is(err, nats.ErrConsumerNotActive) { |
| 10058 | t.Fatalf("Unexpected error: %v", err) |
| 10059 | } |
| 10060 | hbMissed <- struct{}{} |
| 10061 | } |
| 10062 | nc, js := jsClient(t, s, nats.ErrorHandler(errHandler)) |
| 10063 | defer nc.Close() |
| 10064 | |
| 10065 | if _, err := js.AddStream(&nats.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}}); err != nil { |
| 10066 | t.Fatalf("Unexpected error: %v", err) |
| 10067 | } |
| 10068 | sub, err := js.SubscribeSync("FOO.A", nats.OrderedConsumer(), nats.IdleHeartbeat(100*time.Millisecond)) |
| 10069 | if err != nil { |
| 10070 | t.Fatalf("Unexpected error: %v", err) |
| 10071 | } |
| 10072 | consInfo, err := sub.ConsumerInfo() |
| 10073 | if err != nil { |
| 10074 | t.Fatalf("Unexpected error: %v", err) |
| 10075 | } |
| 10076 | consName := consInfo.Name |
| 10077 | // validate that the generated name of the consumer is 8 |
| 10078 | // characters long (shorter than standard nuid) |
| 10079 | if len(consName) != 8 { |
| 10080 | t.Fatalf("Unexpected consumer name: %q", consName) |
| 10081 | } |
| 10082 | if _, err := js.Publish("FOO.A", []byte("msg 1")); err != nil { |
| 10083 | t.Fatalf("Unexpected error: %v", err) |
| 10084 | } |
| 10085 | msg, err := sub.NextMsg(2 * time.Second) |
| 10086 | if err != nil { |
| 10087 | t.Fatalf("Unexpected error: %v", err) |
| 10088 | } |
| 10089 | if string(msg.Data) != "msg 1" { |
| 10090 | t.Fatalf("Invalid msg value; want: 'msg 1'; got: %q", string(msg.Data)) |
| 10091 | } |
| 10092 | |
| 10093 | apiSub, err := nc.SubscribeSync("$JS.API.CONSUMER.*.>") |
| 10094 | if err != nil { |
| 10095 | t.Fatalf("Unexpected error: %v", err) |
| 10096 | } |
| 10097 | |
| 10098 | // restart the server |
| 10099 | s = restartBasicJSServer(t, s) |
| 10100 | defer shutdownJSServerAndRemoveStorage(t, s) |
| 10101 | |
| 10102 | // wait until we miss heartbeat |
| 10103 | select { |
| 10104 | case <-hbMissed: |
| 10105 | case <-time.After(10 * time.Second): |
| 10106 | t.Fatalf("Did not receive consumer not active error") |
| 10107 | } |
| 10108 | consDeleteMsg, err := apiSub.NextMsg(2 * time.Second) |
nothing calls this directly
no test coverage detected