MCPcopy
hub / github.com/nats-io/nats.go / TestKeyValueWatch

Function TestKeyValueWatch

jetstream/test/kv_test.go:271–700  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

269}
270
271func TestKeyValueWatch(t *testing.T) {
272 expectUpdateF := func(t *testing.T, watcher jetstream.KeyWatcher) func(key, value string, revision uint64) {
273 return func(key, value string, revision uint64) {
274 t.Helper()
275 select {
276 case v := <-watcher.Updates():
277 if v.Key() != key || string(v.Value()) != value || v.Revision() != revision {
278 t.Fatalf("Did not get expected: %q %q %d vs %q %q %d", v.Key(), string(v.Value()), v.Revision(), key, value, revision)
279 }
280 case <-time.After(time.Second):
281 t.Fatalf("Did not receive an update like expected")
282 }
283 }
284 }
285 expectDeleteF := func(t *testing.T, watcher jetstream.KeyWatcher) func(key string, revision uint64) {
286 return func(key string, revision uint64) {
287 t.Helper()
288 select {
289 case v := <-watcher.Updates():
290 if v.Operation() != jetstream.KeyValueDelete {
291 t.Fatalf("Expected a delete operation but got %+v", v)
292 }
293 if v.Revision() != revision {
294 t.Fatalf("Did not get expected revision: %d vs %d", revision, v.Revision())
295 }
296 case <-time.After(time.Second):
297 t.Fatalf("Did not receive an update like expected")
298 }
299 }
300 }
301 expectPurgeF := func(t *testing.T, watcher jetstream.KeyWatcher) func(key string, revision uint64) {
302 return func(key string, revision uint64) {
303 t.Helper()
304 select {
305 case v := <-watcher.Updates():
306 if v.Operation() != jetstream.KeyValuePurge {
307 t.Fatalf("Expected a delete operation but got %+v", v)
308 }
309 if v.Revision() != revision {
310 t.Fatalf("Did not get expected revision: %d vs %d", revision, v.Revision())
311 }
312 case <-time.After(time.Second):
313 t.Fatalf("Did not receive an update like expected")
314 }
315 }
316 }
317 expectInitDoneF := func(t *testing.T, watcher jetstream.KeyWatcher) func() {
318 return func() {
319 t.Helper()
320 select {
321 case v := <-watcher.Updates():
322 if v != nil {
323 t.Fatalf("Did not get expected: %+v", v)
324 }
325 case <-time.After(time.Second):
326 t.Fatalf("Did not receive a init done like expected")
327 }
328 }

Callers

nothing calls this directly

Calls 15

IncludeHistoryFunction · 0.92
UpdatesOnlyFunction · 0.92
ResumeFromRevisionFunction · 0.92
FatalfMethod · 0.80
RunBasicJetStreamServerFunction · 0.70
jsClientFunction · 0.70
expectOkFunction · 0.70
expectErrFunction · 0.70
UpdatesMethod · 0.65
KeyMethod · 0.65
ValueMethod · 0.65

Tested by

no test coverage detected