TestBackpressure validates that delays in processing the buffered updates will result in slowed dequeue rates. As a side-effect, this also tests the graceful shutdown and flushing of the buffers.
(t *testing.T)
| 267 | // TestBackpressure validates that delays in processing the buffered updates will result in slowed dequeue rates. |
| 268 | // As a side-effect, this also tests the graceful shutdown and flushing of the buffers. |
| 269 | func TestBackpressure(t *testing.T) { |
| 270 | t.Parallel() |
| 271 | |
| 272 | store, pubsub := dbtestutil.NewDB(t) |
| 273 | logger := testutil.Logger(t) |
| 274 | ctx := dbauthz.AsNotifier(testutil.Context(t, testutil.WaitShort)) |
| 275 | |
| 276 | const method = database.NotificationMethodWebhook |
| 277 | cfg := defaultNotificationsConfig(method) |
| 278 | |
| 279 | // Tune the queue to fetch often. |
| 280 | const fetchInterval = time.Millisecond * 200 |
| 281 | const batchSize = 10 |
| 282 | cfg.FetchInterval = serpent.Duration(fetchInterval) |
| 283 | cfg.LeaseCount = serpent.Int64(batchSize) |
| 284 | // never time out for this test |
| 285 | cfg.LeasePeriod = serpent.Duration(time.Hour) |
| 286 | cfg.DispatchTimeout = serpent.Duration(time.Hour - time.Millisecond) |
| 287 | |
| 288 | // Shrink buffers down and increase flush interval to provoke backpressure. |
| 289 | // Flush buffers every 5 fetch intervals. |
| 290 | const syncInterval = time.Second |
| 291 | cfg.StoreSyncInterval = serpent.Duration(syncInterval) |
| 292 | cfg.StoreSyncBufferSize = serpent.Int64(2) |
| 293 | |
| 294 | handler := &chanHandler{calls: make(chan dispatchCall)} |
| 295 | |
| 296 | // Intercept calls to submit the buffered updates to the store. |
| 297 | storeInterceptor := &syncInterceptor{Store: store} |
| 298 | |
| 299 | mClock := quartz.NewMock(t) |
| 300 | syncTrap := mClock.Trap().NewTicker("Manager", "storeSync") |
| 301 | defer syncTrap.Close() |
| 302 | fetchTrap := mClock.Trap().TickerFunc("notifier", "fetchInterval") |
| 303 | defer fetchTrap.Close() |
| 304 | |
| 305 | // GIVEN: a notification manager whose updates will be intercepted |
| 306 | mgr, err := notifications.NewManager(cfg, storeInterceptor, pubsub, defaultHelpers(), createMetrics(), |
| 307 | logger.Named("manager"), notifications.WithTestClock(mClock)) |
| 308 | require.NoError(t, err) |
| 309 | mgr.WithHandlers(map[database.NotificationMethod]notifications.Handler{ |
| 310 | method: handler, |
| 311 | database.NotificationMethodInbox: handler, |
| 312 | }) |
| 313 | enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"), mClock) |
| 314 | require.NoError(t, err) |
| 315 | |
| 316 | user := createSampleUser(t, store) |
| 317 | |
| 318 | // WHEN: a set of notifications are enqueued, which causes backpressure due to the batchSize which can be processed per fetch |
| 319 | const totalMessages = 30 |
| 320 | for i := range totalMessages { |
| 321 | _, err = enq.Enqueue(ctx, user.ID, notifications.TemplateWorkspaceDeleted, map[string]string{"i": fmt.Sprintf("%d", i)}, "test") |
| 322 | require.NoError(t, err) |
| 323 | } |
| 324 | |
| 325 | // Start the notifier. |
| 326 | mgr.Run(ctx) |
nothing calls this directly
no test coverage detected