(ctx context.Context)
| 82 | } |
| 83 | |
| 84 | func (r *Rolluper) start(ctx context.Context) { |
| 85 | defer close(r.closed) |
| 86 | |
| 87 | do := func() { |
| 88 | var eg errgroup.Group |
| 89 | |
| 90 | r.logger.Debug(ctx, "rolling up data") |
| 91 | now := time.Now() |
| 92 | |
| 93 | // Track whether or not we performed a rollup (we got the advisory lock). |
| 94 | var ev Event |
| 95 | |
| 96 | eg.Go(func() error { |
| 97 | return r.db.InTx(func(tx database.Store) error { |
| 98 | // Acquire a lock to ensure that only one instance of |
| 99 | // the rollup is running at a time. |
| 100 | ok, err := tx.TryAcquireLock(ctx, database.LockIDDBRollup) |
| 101 | if err != nil { |
| 102 | return err |
| 103 | } |
| 104 | if !ok { |
| 105 | return nil |
| 106 | } |
| 107 | |
| 108 | ev.TemplateUsageStats = true |
| 109 | return tx.UpsertTemplateUsageStats(ctx) |
| 110 | }, database.DefaultTXOptions().WithID("db_rollup")) |
| 111 | }) |
| 112 | |
| 113 | err := eg.Wait() |
| 114 | if err != nil { |
| 115 | if database.IsQueryCanceledError(err) { |
| 116 | return |
| 117 | } |
| 118 | // Only log if Close hasn't been called. |
| 119 | if ctx.Err() == nil { |
| 120 | r.logger.Error(ctx, "failed to rollup data", slog.Error(err)) |
| 121 | } |
| 122 | return |
| 123 | } |
| 124 | |
| 125 | r.logger.Debug(ctx, |
| 126 | "rolled up data", |
| 127 | slog.F("took", time.Since(now)), |
| 128 | slog.F("event", ev), |
| 129 | ) |
| 130 | |
| 131 | // For testing. |
| 132 | if r.event != nil { |
| 133 | select { |
| 134 | case <-ctx.Done(): |
| 135 | return |
| 136 | case r.event <- ev: |
| 137 | } |
| 138 | } |
| 139 | } |
| 140 | |
| 141 | // For testing. |
no test coverage detected