PurgeDeletes will remove all current delete markers.
(ctx context.Context, opts ...KVPurgeOpt)
| 1497 | |
| 1498 | // PurgeDeletes will remove all current delete markers. |
| 1499 | func (kv *kvs) PurgeDeletes(ctx context.Context, opts ...KVPurgeOpt) error { |
| 1500 | var o purgeOpts |
| 1501 | for _, opt := range opts { |
| 1502 | if opt != nil { |
| 1503 | if err := opt.configurePurge(&o); err != nil { |
| 1504 | return err |
| 1505 | } |
| 1506 | } |
| 1507 | } |
| 1508 | watcher, err := kv.WatchAll(ctx) |
| 1509 | if err != nil { |
| 1510 | return err |
| 1511 | } |
| 1512 | defer watcher.Stop() |
| 1513 | |
| 1514 | var limit time.Time |
| 1515 | olderThan := o.dmthr |
| 1516 | // Negative value is used to instruct to always remove markers, regardless |
| 1517 | // of age. If set to 0 (or not set), use our default value. |
| 1518 | if olderThan == 0 { |
| 1519 | olderThan = kvDefaultPurgeDeletesMarkerThreshold |
| 1520 | } |
| 1521 | if olderThan > 0 { |
| 1522 | limit = time.Now().Add(-olderThan) |
| 1523 | } |
| 1524 | |
| 1525 | var deleteMarkers []KeyValueEntry |
| 1526 | for entry := range watcher.Updates() { |
| 1527 | if entry == nil { |
| 1528 | break |
| 1529 | } |
| 1530 | if op := entry.Operation(); op == KeyValueDelete || op == KeyValuePurge { |
| 1531 | deleteMarkers = append(deleteMarkers, entry) |
| 1532 | } |
| 1533 | } |
| 1534 | // Stop watcher here so as we purge we do not have the system continually updating numPending. |
| 1535 | watcher.Stop() |
| 1536 | |
| 1537 | var b strings.Builder |
| 1538 | // Do actual purges here. |
| 1539 | for _, entry := range deleteMarkers { |
| 1540 | b.WriteString(kv.pre) |
| 1541 | b.WriteString(entry.Key()) |
| 1542 | purgeOpts := []StreamPurgeOpt{WithPurgeSubject(b.String())} |
| 1543 | if olderThan > 0 && entry.Created().After(limit) { |
| 1544 | purgeOpts = append(purgeOpts, WithPurgeKeep(1)) |
| 1545 | } |
| 1546 | if err := kv.stream.Purge(ctx, purgeOpts...); err != nil { |
| 1547 | return err |
| 1548 | } |
| 1549 | b.Reset() |
| 1550 | } |
| 1551 | return nil |
| 1552 | } |
| 1553 | |
| 1554 | // Status retrieves the status and configuration of a bucket |
| 1555 | func (kv *kvs) Status(ctx context.Context) (KeyValueStatus, error) { |
nothing calls this directly
no test coverage detected