BatchDebounceEvents groups identical file events within a sliding time window and writes the results to the returned channel. The returned channel is closed when the debouncer is stopped via context cancellation or by closing the input channel.
(ctx context.Context, clock clockwork.Clock, input <-chan FileEvent)
| 31 | // |
| 32 | // The returned channel is closed when the debouncer is stopped via context cancellation or by closing the input channel. |
| 33 | func BatchDebounceEvents(ctx context.Context, clock clockwork.Clock, input <-chan FileEvent) <-chan []FileEvent { |
| 34 | out := make(chan []FileEvent) |
| 35 | go func() { |
| 36 | defer close(out) |
| 37 | seen := utils.Set[FileEvent]{} |
| 38 | flushEvents := func() { |
| 39 | if len(seen) == 0 { |
| 40 | return |
| 41 | } |
| 42 | logrus.Debugf("flush: %d events %s", len(seen), seen) |
| 43 | |
| 44 | events := make([]FileEvent, 0, len(seen)) |
| 45 | for e := range seen { |
| 46 | events = append(events, e) |
| 47 | } |
| 48 | out <- events |
| 49 | seen = utils.Set[FileEvent]{} |
| 50 | } |
| 51 | |
| 52 | t := clock.NewTicker(QuietPeriod) |
| 53 | defer t.Stop() |
| 54 | for { |
| 55 | select { |
| 56 | case <-ctx.Done(): |
| 57 | return |
| 58 | case <-t.Chan(): |
| 59 | flushEvents() |
| 60 | case e, ok := <-input: |
| 61 | if !ok { |
| 62 | // input channel was closed |
| 63 | flushEvents() |
| 64 | return |
| 65 | } |
| 66 | if _, ok := seen[e]; !ok { |
| 67 | seen.Add(e) |
| 68 | } |
| 69 | t.Reset(QuietPeriod) |
| 70 | } |
| 71 | } |
| 72 | }() |
| 73 | return out |
| 74 | } |