(t *testing.T)
| 1227 | } |
| 1228 | |
| 1229 | func TestPullSubscribeFetchWithHeartbeat(t *testing.T) { |
| 1230 | t.Skip("Since v2.10.26 server sends no responders if the consumer is deleted, we need to figure out how else to test missing heartbeats") |
| 1231 | s := RunBasicJetStreamServer() |
| 1232 | defer shutdownJSServerAndRemoveStorage(t, s) |
| 1233 | |
| 1234 | nc, js := jsClient(t, s) |
| 1235 | defer nc.Close() |
| 1236 | |
| 1237 | _, err := js.AddStream(&nats.StreamConfig{ |
| 1238 | Name: "TEST", |
| 1239 | Subjects: []string{"foo"}, |
| 1240 | }) |
| 1241 | if err != nil { |
| 1242 | t.Fatalf("Unexpected error: %v", err) |
| 1243 | } |
| 1244 | |
| 1245 | sub, err := js.PullSubscribe("foo", "") |
| 1246 | if err != nil { |
| 1247 | t.Fatalf("Unexpected error: %s", err) |
| 1248 | } |
| 1249 | defer sub.Unsubscribe() |
| 1250 | for i := 0; i < 5; i++ { |
| 1251 | if _, err := js.Publish("foo", []byte("msg")); err != nil { |
| 1252 | t.Fatalf("Unexpected error: %s", err) |
| 1253 | } |
| 1254 | } |
| 1255 | |
| 1256 | // fetch 5 messages, should finish immediately |
| 1257 | msgs, err := sub.Fetch(5, nats.PullHeartbeat(100*time.Millisecond)) |
| 1258 | if err != nil { |
| 1259 | t.Fatalf("Unexpected error: %s", err) |
| 1260 | } |
| 1261 | if len(msgs) != 5 { |
| 1262 | t.Fatalf("Expected %d messages; got: %d", 5, len(msgs)) |
| 1263 | } |
| 1264 | now := time.Now() |
| 1265 | // no messages available, should time out normally |
| 1266 | _, err = sub.Fetch(5, nats.PullHeartbeat(50*time.Millisecond), nats.MaxWait(300*time.Millisecond)) |
| 1267 | elapsed := time.Since(now) |
| 1268 | if elapsed < 300*time.Millisecond { |
| 1269 | t.Fatalf("Expected timeout after 300ms; got: %v", elapsed) |
| 1270 | } |
| 1271 | if !errors.Is(err, nats.ErrTimeout) { |
| 1272 | t.Fatalf("Expected timeout error; got: %v", err) |
| 1273 | } |
| 1274 | |
| 1275 | // delete consumer to verify heartbeats are not sent anymore |
| 1276 | info, err := sub.ConsumerInfo() |
| 1277 | if err != nil { |
| 1278 | t.Fatalf("Unexpected error: %v", err) |
| 1279 | } |
| 1280 | if err := js.DeleteConsumer("TEST", info.Name); err != nil { |
| 1281 | t.Fatalf("Unexpected error: %v", err) |
| 1282 | } |
| 1283 | _, err = sub.Fetch(5, nats.PullHeartbeat(100*time.Millisecond), nats.MaxWait(1*time.Second)) |
| 1284 | if !errors.Is(err, nats.ErrNoHeartbeat) { |
| 1285 | t.Fatalf("Expected no heartbeat error; got: %v", err) |
| 1286 | } |
nothing calls this directly
no test coverage detected