MCPcopy
hub / github.com/nats-io/nats.go / PurgeDeletes

Method PurgeDeletes

jetstream/kv.go:1499–1552  ·  view source on GitHub ↗

PurgeDeletes will remove all current delete markers.

(ctx context.Context, opts ...KVPurgeOpt)

Source from the content-addressed store, hash-verified

1497
1498// PurgeDeletes will remove all current delete markers.
1499func (kv *kvs) PurgeDeletes(ctx context.Context, opts ...KVPurgeOpt) error {
1500 var o purgeOpts
1501 for _, opt := range opts {
1502 if opt != nil {
1503 if err := opt.configurePurge(&o); err != nil {
1504 return err
1505 }
1506 }
1507 }
1508 watcher, err := kv.WatchAll(ctx)
1509 if err != nil {
1510 return err
1511 }
1512 defer watcher.Stop()
1513
1514 var limit time.Time
1515 olderThan := o.dmthr
1516 // Negative value is used to instruct to always remove markers, regardless
1517 // of age. If set to 0 (or not set), use our default value.
1518 if olderThan == 0 {
1519 olderThan = kvDefaultPurgeDeletesMarkerThreshold
1520 }
1521 if olderThan > 0 {
1522 limit = time.Now().Add(-olderThan)
1523 }
1524
1525 var deleteMarkers []KeyValueEntry
1526 for entry := range watcher.Updates() {
1527 if entry == nil {
1528 break
1529 }
1530 if op := entry.Operation(); op == KeyValueDelete || op == KeyValuePurge {
1531 deleteMarkers = append(deleteMarkers, entry)
1532 }
1533 }
1534 // Stop watcher here so as we purge we do not have the system continually updating numPending.
1535 watcher.Stop()
1536
1537 var b strings.Builder
1538 // Do actual purges here.
1539 for _, entry := range deleteMarkers {
1540 b.WriteString(kv.pre)
1541 b.WriteString(entry.Key())
1542 purgeOpts := []StreamPurgeOpt{WithPurgeSubject(b.String())}
1543 if olderThan > 0 && entry.Created().After(limit) {
1544 purgeOpts = append(purgeOpts, WithPurgeKeep(1))
1545 }
1546 if err := kv.stream.Purge(ctx, purgeOpts...); err != nil {
1547 return err
1548 }
1549 b.Reset()
1550 }
1551 return nil
1552}
1553
1554// Status retrieves the status and configuration of a bucket
1555func (kv *kvs) Status(ctx context.Context) (KeyValueStatus, error) {

Callers

nothing calls this directly

Calls 13

WatchAllMethod · 0.95
WithPurgeSubjectFunction · 0.85
WithPurgeKeepFunction · 0.85
configurePurgeMethod · 0.65
StopMethod · 0.65
AddMethod · 0.65
UpdatesMethod · 0.65
OperationMethod · 0.65
KeyMethod · 0.65
CreatedMethod · 0.65
PurgeMethod · 0.65
ResetMethod · 0.65

Tested by

no test coverage detected