Watch for changes in the underlying store and receive meta information updates.
(ctx context.Context, opts ...WatchOpt)
| 1276 | |
| 1277 | // Watch for changes in the underlying store and receive meta information updates. |
| 1278 | func (obs *obs) Watch(ctx context.Context, opts ...WatchOpt) (ObjectWatcher, error) { |
| 1279 | var o watchOpts |
| 1280 | for _, opt := range opts { |
| 1281 | if opt != nil { |
| 1282 | if err := opt.configureWatcher(&o); err != nil { |
| 1283 | return nil, err |
| 1284 | } |
| 1285 | } |
| 1286 | } |
| 1287 | |
| 1288 | var initDoneMarker bool |
| 1289 | |
| 1290 | w := &objWatcher{updates: make(chan *ObjectInfo, 32)} |
| 1291 | |
| 1292 | update := func(m *nats.Msg) { |
| 1293 | var info ObjectInfo |
| 1294 | if err := json.Unmarshal(m.Data, &info); err != nil { |
| 1295 | return // TODO(dlc) - Communicate this upwards? |
| 1296 | } |
| 1297 | meta, err := m.Metadata() |
| 1298 | if err != nil { |
| 1299 | return |
| 1300 | } |
| 1301 | |
| 1302 | if !o.ignoreDeletes || !info.Deleted { |
| 1303 | info.ModTime = meta.Timestamp |
| 1304 | w.updates <- &info |
| 1305 | } |
| 1306 | |
| 1307 | // if UpdatesOnly is set, no not send nil to the channel |
| 1308 | // as it would always be triggered after initializing the watcher |
| 1309 | if !initDoneMarker && meta.NumPending == 0 { |
| 1310 | initDoneMarker = true |
| 1311 | w.updates <- nil |
| 1312 | } |
| 1313 | } |
| 1314 | |
| 1315 | allMeta := fmt.Sprintf(objAllMetaPreTmpl, obs.name) |
| 1316 | _, err := obs.stream.GetLastMsgForSubject(ctx, allMeta) |
| 1317 | // if there are no messages on the stream and we are not watching |
| 1318 | // updates only, send nil to the channel to indicate that the initial |
| 1319 | // watch is done |
| 1320 | if !o.updatesOnly { |
| 1321 | if errors.Is(err, ErrMsgNotFound) { |
| 1322 | initDoneMarker = true |
| 1323 | w.updates <- nil |
| 1324 | } |
| 1325 | } else { |
| 1326 | // if UpdatesOnly was used, mark initialization as complete |
| 1327 | initDoneMarker = true |
| 1328 | } |
| 1329 | |
| 1330 | // Used ordered consumer to deliver results. |
| 1331 | streamName := fmt.Sprintf(objNameTmpl, obs.name) |
| 1332 | subOpts := []nats.SubOpt{nats.OrderedConsumer(), nats.BindStream(streamName)} |
| 1333 | if !o.includeHistory { |
| 1334 | subOpts = append(subOpts, nats.DeliverLastPerSubject()) |
| 1335 | } |
no test coverage detected