()
| 1076 | } |
| 1077 | |
| 1078 | func (s *pullSubscription) cleanup() { |
| 1079 | // For now this function does not need to hold the lock. |
| 1080 | // Holding the lock here might cause a deadlock if Next() |
| 1081 | // is already holding the lock and waiting. |
| 1082 | // The fields that are read (subscription, hbMonitor) |
| 1083 | // are read only (Only written on creation of pullSubscription). |
| 1084 | if s.subscription == nil || !s.subscription.IsValid() { |
| 1085 | return |
| 1086 | } |
| 1087 | if s.consumer != nil { |
| 1088 | nc := s.consumer.js.conn |
| 1089 | nc.RemoveStatusListener(s.connStatusChanged) |
| 1090 | } |
| 1091 | if s.hbMonitor != nil { |
| 1092 | s.hbMonitor.Stop() |
| 1093 | } |
| 1094 | drainMode := s.draining.Load() == 1 |
| 1095 | if drainMode { |
| 1096 | s.subscription.Drain() |
| 1097 | } else { |
| 1098 | s.subscription.Unsubscribe() |
| 1099 | } |
| 1100 | s.closed.Store(1) |
| 1101 | } |
| 1102 | |
| 1103 | // pull sends a pull request to the server and waits for messages using a subscription from [pullSubscription]. |
| 1104 | // Messages will be fetched up to given batch_size or until there are no more messages or timeout is returned |
no test coverage detected