Watch for changes in the underlying store and receive meta information updates.
(opts ...WatchOpt)
| 1063 | |
| 1064 | // Watch for changes in the underlying store and receive meta information updates. |
| 1065 | func (obs *obs) Watch(opts ...WatchOpt) (ObjectWatcher, error) { |
| 1066 | var o watchOpts |
| 1067 | for _, opt := range opts { |
| 1068 | if opt != nil { |
| 1069 | if err := opt.configureWatcher(&o); err != nil { |
| 1070 | return nil, err |
| 1071 | } |
| 1072 | } |
| 1073 | } |
| 1074 | |
| 1075 | var initDoneMarker bool |
| 1076 | |
| 1077 | w := &objWatcher{updates: make(chan *ObjectInfo, 32)} |
| 1078 | |
| 1079 | update := func(m *Msg) { |
| 1080 | var info ObjectInfo |
| 1081 | if err := json.Unmarshal(m.Data, &info); err != nil { |
| 1082 | return // TODO(dlc) - Communicate this upwards? |
| 1083 | } |
| 1084 | meta, err := m.Metadata() |
| 1085 | if err != nil { |
| 1086 | return |
| 1087 | } |
| 1088 | |
| 1089 | if !o.ignoreDeletes || !info.Deleted { |
| 1090 | info.ModTime = meta.Timestamp |
| 1091 | w.updates <- &info |
| 1092 | } |
| 1093 | |
| 1094 | // if UpdatesOnly is set, no not send nil to the channel |
| 1095 | // as it would always be triggered after initializing the watcher |
| 1096 | if !initDoneMarker && meta.NumPending == 0 { |
| 1097 | initDoneMarker = true |
| 1098 | w.updates <- nil |
| 1099 | } |
| 1100 | } |
| 1101 | |
| 1102 | allMeta := fmt.Sprintf(objAllMetaPreTmpl, obs.name) |
| 1103 | _, err := obs.js.GetLastMsg(obs.stream, allMeta) |
| 1104 | // if there are no messages on the stream and we are not watching |
| 1105 | // updates only, send nil to the channel to indicate that the initial |
| 1106 | // watch is done |
| 1107 | if !o.updatesOnly { |
| 1108 | if errors.Is(err, ErrMsgNotFound) { |
| 1109 | initDoneMarker = true |
| 1110 | w.updates <- nil |
| 1111 | } |
| 1112 | } else { |
| 1113 | // if UpdatesOnly was used, mark initialization as complete |
| 1114 | initDoneMarker = true |
| 1115 | } |
| 1116 | |
| 1117 | // Used ordered consumer to deliver results. |
| 1118 | streamName := fmt.Sprintf(objNameTmpl, obs.name) |
| 1119 | subOpts := []SubOpt{OrderedConsumer(), BindStream(streamName)} |
| 1120 | if !o.includeHistory { |
| 1121 | subOpts = append(subOpts, DeliverLastPerSubject()) |
| 1122 | } |
no test coverage detected