MCPcopy
hub / github.com/nats-io/nats.go / Watch

Method Watch

jetstream/object.go:1278–1349  ·  view source on GitHub ↗

Watch for changes in the underlying store and receive meta information updates.

(ctx context.Context, opts ...WatchOpt)

Source from the content-addressed store, hash-verified

1276
1277// Watch for changes in the underlying store and receive meta information updates.
1278func (obs *obs) Watch(ctx context.Context, opts ...WatchOpt) (ObjectWatcher, error) {
1279 var o watchOpts
1280 for _, opt := range opts {
1281 if opt != nil {
1282 if err := opt.configureWatcher(&o); err != nil {
1283 return nil, err
1284 }
1285 }
1286 }
1287
1288 var initDoneMarker bool
1289
1290 w := &objWatcher{updates: make(chan *ObjectInfo, 32)}
1291
1292 update := func(m *nats.Msg) {
1293 var info ObjectInfo
1294 if err := json.Unmarshal(m.Data, &info); err != nil {
1295 return // TODO(dlc) - Communicate this upwards?
1296 }
1297 meta, err := m.Metadata()
1298 if err != nil {
1299 return
1300 }
1301
1302 if !o.ignoreDeletes || !info.Deleted {
1303 info.ModTime = meta.Timestamp
1304 w.updates <- &info
1305 }
1306
1307 // if UpdatesOnly is set, no not send nil to the channel
1308 // as it would always be triggered after initializing the watcher
1309 if !initDoneMarker && meta.NumPending == 0 {
1310 initDoneMarker = true
1311 w.updates <- nil
1312 }
1313 }
1314
1315 allMeta := fmt.Sprintf(objAllMetaPreTmpl, obs.name)
1316 _, err := obs.stream.GetLastMsgForSubject(ctx, allMeta)
1317 // if there are no messages on the stream and we are not watching
1318 // updates only, send nil to the channel to indicate that the initial
1319 // watch is done
1320 if !o.updatesOnly {
1321 if errors.Is(err, ErrMsgNotFound) {
1322 initDoneMarker = true
1323 w.updates <- nil
1324 }
1325 } else {
1326 // if UpdatesOnly was used, mark initialization as complete
1327 initDoneMarker = true
1328 }
1329
1330 // Used ordered consumer to deliver results.
1331 streamName := fmt.Sprintf(objNameTmpl, obs.name)
1332 subOpts := []nats.SubOpt{nats.OrderedConsumer(), nats.BindStream(streamName)}
1333 if !o.includeHistory {
1334 subOpts = append(subOpts, nats.DeliverLastPerSubject())
1335 }

Callers 1

ListMethod · 0.95

Calls 8

configureWatcherMethod · 0.65
MetadataMethod · 0.65
GetLastMsgForSubjectMethod · 0.65
OrderedConsumerMethod · 0.65
ContextMethod · 0.65
SubscribeMethod · 0.65
IsMethod · 0.45
SetClosedHandlerMethod · 0.45

Tested by

no test coverage detected