| 905 | var errLagged = errors.New("cannot guarantee complete results") |
| 906 | |
| 907 | func (s *LiveStore) isLagged(endNanos int64) bool { |
| 908 | if !s.cfg.ConsumeFromKafka { // if config disabled or no kafka consumption, never lagged |
| 909 | return false |
| 910 | } |
| 911 | lag := s.calculateTimeLag(0) |
| 912 | if lag == nil { // lag is unknown |
| 913 | return true // prefer error over potentially incomplete results |
| 914 | } |
| 915 | return time.Since(time.Unix(0, endNanos)) < *lag |
| 916 | } |
| 917 | |
| 918 | // withInstance extracts the tenant ID from the context, gets the instance, |
| 919 | // and calls the provided function if the instance exists. If the instance |