MCPcopy
hub / github.com/nats-io/nats.go / Watch

Method Watch

object.go:1065–1136  ·  view source on GitHub ↗

Watch for changes in the underlying store and receive meta information updates.

(opts ...WatchOpt)

Source from the content-addressed store, hash-verified

1063
1064// Watch for changes in the underlying store and receive meta information updates.
1065func (obs *obs) Watch(opts ...WatchOpt) (ObjectWatcher, error) {
1066 var o watchOpts
1067 for _, opt := range opts {
1068 if opt != nil {
1069 if err := opt.configureWatcher(&o); err != nil {
1070 return nil, err
1071 }
1072 }
1073 }
1074
1075 var initDoneMarker bool
1076
1077 w := &objWatcher{updates: make(chan *ObjectInfo, 32)}
1078
1079 update := func(m *Msg) {
1080 var info ObjectInfo
1081 if err := json.Unmarshal(m.Data, &info); err != nil {
1082 return // TODO(dlc) - Communicate this upwards?
1083 }
1084 meta, err := m.Metadata()
1085 if err != nil {
1086 return
1087 }
1088
1089 if !o.ignoreDeletes || !info.Deleted {
1090 info.ModTime = meta.Timestamp
1091 w.updates <- &info
1092 }
1093
1094 // if UpdatesOnly is set, no not send nil to the channel
1095 // as it would always be triggered after initializing the watcher
1096 if !initDoneMarker && meta.NumPending == 0 {
1097 initDoneMarker = true
1098 w.updates <- nil
1099 }
1100 }
1101
1102 allMeta := fmt.Sprintf(objAllMetaPreTmpl, obs.name)
1103 _, err := obs.js.GetLastMsg(obs.stream, allMeta)
1104 // if there are no messages on the stream and we are not watching
1105 // updates only, send nil to the channel to indicate that the initial
1106 // watch is done
1107 if !o.updatesOnly {
1108 if errors.Is(err, ErrMsgNotFound) {
1109 initDoneMarker = true
1110 w.updates <- nil
1111 }
1112 } else {
1113 // if UpdatesOnly was used, mark initialization as complete
1114 initDoneMarker = true
1115 }
1116
1117 // Used ordered consumer to deliver results.
1118 streamName := fmt.Sprintf(objNameTmpl, obs.name)
1119 subOpts := []SubOpt{OrderedConsumer(), BindStream(streamName)}
1120 if !o.includeHistory {
1121 subOpts = append(subOpts, DeliverLastPerSubject())
1122 }

Callers 1

ListMethod · 0.95

Calls 9

OrderedConsumerFunction · 0.85
BindStreamFunction · 0.85
DeliverLastPerSubjectFunction · 0.85
DeliverNewFunction · 0.85
configureWatcherMethod · 0.65
MetadataMethod · 0.65
GetLastMsgMethod · 0.65
SubscribeMethod · 0.65
IsMethod · 0.45

Tested by

no test coverage detected