MCPcopy
hub / github.com/nats-io/nats.go / TestPullSubscribeFetchWithHeartbeat

Function TestPullSubscribeFetchWithHeartbeat

test/js_test.go:1229–1337  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

1227}
1228
1229func TestPullSubscribeFetchWithHeartbeat(t *testing.T) {
1230 t.Skip("Since v2.10.26 server sends no responders if the consumer is deleted, we need to figure out how else to test missing heartbeats")
1231 s := RunBasicJetStreamServer()
1232 defer shutdownJSServerAndRemoveStorage(t, s)
1233
1234 nc, js := jsClient(t, s)
1235 defer nc.Close()
1236
1237 _, err := js.AddStream(&nats.StreamConfig{
1238 Name: "TEST",
1239 Subjects: []string{"foo"},
1240 })
1241 if err != nil {
1242 t.Fatalf("Unexpected error: %v", err)
1243 }
1244
1245 sub, err := js.PullSubscribe("foo", "")
1246 if err != nil {
1247 t.Fatalf("Unexpected error: %s", err)
1248 }
1249 defer sub.Unsubscribe()
1250 for i := 0; i < 5; i++ {
1251 if _, err := js.Publish("foo", []byte("msg")); err != nil {
1252 t.Fatalf("Unexpected error: %s", err)
1253 }
1254 }
1255
1256 // fetch 5 messages, should finish immediately
1257 msgs, err := sub.Fetch(5, nats.PullHeartbeat(100*time.Millisecond))
1258 if err != nil {
1259 t.Fatalf("Unexpected error: %s", err)
1260 }
1261 if len(msgs) != 5 {
1262 t.Fatalf("Expected %d messages; got: %d", 5, len(msgs))
1263 }
1264 now := time.Now()
1265 // no messages available, should time out normally
1266 _, err = sub.Fetch(5, nats.PullHeartbeat(50*time.Millisecond), nats.MaxWait(300*time.Millisecond))
1267 elapsed := time.Since(now)
1268 if elapsed < 300*time.Millisecond {
1269 t.Fatalf("Expected timeout after 300ms; got: %v", elapsed)
1270 }
1271 if !errors.Is(err, nats.ErrTimeout) {
1272 t.Fatalf("Expected timeout error; got: %v", err)
1273 }
1274
1275 // delete consumer to verify heartbeats are not sent anymore
1276 info, err := sub.ConsumerInfo()
1277 if err != nil {
1278 t.Fatalf("Unexpected error: %v", err)
1279 }
1280 if err := js.DeleteConsumer("TEST", info.Name); err != nil {
1281 t.Fatalf("Unexpected error: %v", err)
1282 }
1283 _, err = sub.Fetch(5, nats.PullHeartbeat(100*time.Millisecond), nats.MaxWait(1*time.Second))
1284 if !errors.Is(err, nats.ErrNoHeartbeat) {
1285 t.Fatalf("Expected no heartbeat error; got: %v", err)
1286 }

Callers

nothing calls this directly

Calls 15

FatalfMethod · 0.80
UnsubscribeMethod · 0.80
JetStreamMethod · 0.80
RunBasicJetStreamServerFunction · 0.70
jsClientFunction · 0.70
AddStreamMethod · 0.65
PullSubscribeMethod · 0.65
PublishMethod · 0.65
FetchMethod · 0.65
ConsumerInfoMethod · 0.65
DeleteConsumerMethod · 0.65

Tested by

no test coverage detected