MCPcopy Index your code
hub / github.com/coder/coder / TestBackpressure

Function TestBackpressure

coderd/notifications/notifications_test.go:269–391  ·  view source on GitHub ↗

TestBackpressure validates that delays in processing the buffered updates will result in slowed dequeue rates. As a side-effect, this also tests the graceful shutdown and flushing of the buffers.

(t *testing.T)

Source from the content-addressed store, hash-verified

267// TestBackpressure validates that delays in processing the buffered updates will result in slowed dequeue rates.
268// As a side-effect, this also tests the graceful shutdown and flushing of the buffers.
269func TestBackpressure(t *testing.T) {
270 t.Parallel()
271
272 store, pubsub := dbtestutil.NewDB(t)
273 logger := testutil.Logger(t)
274 ctx := dbauthz.AsNotifier(testutil.Context(t, testutil.WaitShort))
275
276 const method = database.NotificationMethodWebhook
277 cfg := defaultNotificationsConfig(method)
278
279 // Tune the queue to fetch often.
280 const fetchInterval = time.Millisecond * 200
281 const batchSize = 10
282 cfg.FetchInterval = serpent.Duration(fetchInterval)
283 cfg.LeaseCount = serpent.Int64(batchSize)
284 // never time out for this test
285 cfg.LeasePeriod = serpent.Duration(time.Hour)
286 cfg.DispatchTimeout = serpent.Duration(time.Hour - time.Millisecond)
287
288 // Shrink buffers down and increase flush interval to provoke backpressure.
289 // Flush buffers every 5 fetch intervals.
290 const syncInterval = time.Second
291 cfg.StoreSyncInterval = serpent.Duration(syncInterval)
292 cfg.StoreSyncBufferSize = serpent.Int64(2)
293
294 handler := &chanHandler{calls: make(chan dispatchCall)}
295
296 // Intercept calls to submit the buffered updates to the store.
297 storeInterceptor := &syncInterceptor{Store: store}
298
299 mClock := quartz.NewMock(t)
300 syncTrap := mClock.Trap().NewTicker("Manager", "storeSync")
301 defer syncTrap.Close()
302 fetchTrap := mClock.Trap().TickerFunc("notifier", "fetchInterval")
303 defer fetchTrap.Close()
304
305 // GIVEN: a notification manager whose updates will be intercepted
306 mgr, err := notifications.NewManager(cfg, storeInterceptor, pubsub, defaultHelpers(), createMetrics(),
307 logger.Named("manager"), notifications.WithTestClock(mClock))
308 require.NoError(t, err)
309 mgr.WithHandlers(map[database.NotificationMethod]notifications.Handler{
310 method: handler,
311 database.NotificationMethodInbox: handler,
312 })
313 enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"), mClock)
314 require.NoError(t, err)
315
316 user := createSampleUser(t, store)
317
318 // WHEN: a set of notifications are enqueued, which causes backpressure due to the batchSize which can be processed per fetch
319 const totalMessages = 30
320 for i := range totalMessages {
321 _, err = enq.Enqueue(ctx, user.ID, notifications.TemplateWorkspaceDeleted, map[string]string{"i": fmt.Sprintf("%d", i)}, "test")
322 require.NoError(t, err)
323 }
324
325 // Start the notifier.
326 mgr.Run(ctx)

Callers

nothing calls this directly

Calls 15

WithHandlersMethod · 0.95
EnqueueMethod · 0.95
RunMethod · 0.95
StopMethod · 0.95
NewDBFunction · 0.92
LoggerFunction · 0.92
AsNotifierFunction · 0.92
ContextFunction · 0.92
NewManagerFunction · 0.92
WithTestClockFunction · 0.92
NewStoreEnqueuerFunction · 0.92
TryReceiveFunction · 0.92

Tested by

no test coverage detected