MCPcopy
hub / github.com/elastic/go-elasticsearch / TestBulkIndexerFlushJitter

Function TestBulkIndexerFlushJitter

esutil/bulk_indexer_internal_test.go:2326–2399  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

2324}
2325
2326func TestBulkIndexerFlushJitter(t *testing.T) {
2327 t.Run("auto-flush fires with jitter configured", func(t *testing.T) {
2328 es, err := elasticsearch.New(elasticsearch.WithTransportOptions(
2329 elastictransport.WithTransport(&mockTransport{
2330 RoundTripFunc: func(*http.Request) (*http.Response, error) {
2331 return &http.Response{
2332 StatusCode: http.StatusOK,
2333 Status: "200 OK",
2334 Body: io.NopCloser(strings.NewReader(`{"items":[{"index": {}}]}`)),
2335 Header: http.Header{"X-Elastic-Product": []string{"Elasticsearch"}},
2336 }, nil
2337 },
2338 }),
2339 ))
2340 if err != nil {
2341 t.Fatalf("Unexpected error: %s", err)
2342 }
2343
2344 bi, err := NewBulkIndexer(BulkIndexerConfig{
2345 NumWorkers: 1,
2346 Client: es,
2347 FlushInterval: 50 * time.Millisecond,
2348 FlushJitter: 50 * time.Millisecond,
2349 })
2350 if err != nil {
2351 t.Fatalf("Unexpected error: %s", err)
2352 }
2353
2354 if err := bi.Add(context.Background(),
2355 BulkIndexerItem{Action: "index", Body: strings.NewReader(`{"title":"foo"}`)}); err != nil {
2356 t.Fatalf("Unexpected error: %s", err)
2357 }
2358
2359 // Upper bound of the jittered interval is 100ms; 300ms is generous.
2360 time.Sleep(300 * time.Millisecond)
2361
2362 stats := bi.Stats()
2363 if stats.NumFlushed != 1 {
2364 t.Errorf("Unexpected NumFlushed: want=1, got=%d", stats.NumFlushed)
2365 }
2366
2367 // Let any in-flight ticker settle before Close to avoid racing the defer flush.
2368 time.Sleep(150 * time.Millisecond)
2369 bi.Close(context.Background())
2370 })
2371
2372 t.Run("default FlushJitter is zero", func(t *testing.T) {
2373 es, err := elasticsearch.New(elasticsearch.WithTransportOptions(
2374 elastictransport.WithTransport(&mockTransport{
2375 RoundTripFunc: func(*http.Request) (*http.Response, error) {
2376 return &http.Response{
2377 StatusCode: http.StatusOK,
2378 Status: "200 OK",
2379 Body: io.NopCloser(strings.NewReader(`{}`)),
2380 Header: http.Header{"X-Elastic-Product": []string{"Elasticsearch"}},
2381 }, nil
2382 },
2383 }),

Callers

nothing calls this directly

Calls 5

AddMethod · 0.95
StatsMethod · 0.95
CloseMethod · 0.95
NewBulkIndexerFunction · 0.85
RunMethod · 0.45

Tested by

no test coverage detected