MCPcopy
hub / github.com/nats-io/nats.go / cleanup

Method cleanup

jetstream/pull.go:1078–1101  ·  view source on GitHub ↗
()

Source from the content-addressed store, hash-verified

1076}
1077
1078func (s *pullSubscription) cleanup() {
1079 // For now this function does not need to hold the lock.
1080 // Holding the lock here might cause a deadlock if Next()
1081 // is already holding the lock and waiting.
1082 // The fields that are read (subscription, hbMonitor)
1083 // are read only (Only written on creation of pullSubscription).
1084 if s.subscription == nil || !s.subscription.IsValid() {
1085 return
1086 }
1087 if s.consumer != nil {
1088 nc := s.consumer.js.conn
1089 nc.RemoveStatusListener(s.connStatusChanged)
1090 }
1091 if s.hbMonitor != nil {
1092 s.hbMonitor.Stop()
1093 }
1094 drainMode := s.draining.Load() == 1
1095 if drainMode {
1096 s.subscription.Drain()
1097 } else {
1098 s.subscription.Unsubscribe()
1099 }
1100 s.closed.Store(1)
1101}
1102
1103// pull sends a pull request to the server and waits for messages using a subscription from [pullSubscription].
1104// Messages will be fetched up to given batch_size or until there are no more messages or timeout is returned

Callers 1

pullMessagesMethod · 0.95

Calls 7

IsValidMethod · 0.80
RemoveStatusListenerMethod · 0.80
LoadMethod · 0.80
UnsubscribeMethod · 0.80
StoreMethod · 0.80
StopMethod · 0.65
DrainMethod · 0.65

Tested by

no test coverage detected