(t *testing.T)
| 2324 | } |
| 2325 | |
| 2326 | func TestBulkIndexerFlushJitter(t *testing.T) { |
| 2327 | t.Run("auto-flush fires with jitter configured", func(t *testing.T) { |
| 2328 | es, err := elasticsearch.New(elasticsearch.WithTransportOptions( |
| 2329 | elastictransport.WithTransport(&mockTransport{ |
| 2330 | RoundTripFunc: func(*http.Request) (*http.Response, error) { |
| 2331 | return &http.Response{ |
| 2332 | StatusCode: http.StatusOK, |
| 2333 | Status: "200 OK", |
| 2334 | Body: io.NopCloser(strings.NewReader(`{"items":[{"index": {}}]}`)), |
| 2335 | Header: http.Header{"X-Elastic-Product": []string{"Elasticsearch"}}, |
| 2336 | }, nil |
| 2337 | }, |
| 2338 | }), |
| 2339 | )) |
| 2340 | if err != nil { |
| 2341 | t.Fatalf("Unexpected error: %s", err) |
| 2342 | } |
| 2343 | |
| 2344 | bi, err := NewBulkIndexer(BulkIndexerConfig{ |
| 2345 | NumWorkers: 1, |
| 2346 | Client: es, |
| 2347 | FlushInterval: 50 * time.Millisecond, |
| 2348 | FlushJitter: 50 * time.Millisecond, |
| 2349 | }) |
| 2350 | if err != nil { |
| 2351 | t.Fatalf("Unexpected error: %s", err) |
| 2352 | } |
| 2353 | |
| 2354 | if err := bi.Add(context.Background(), |
| 2355 | BulkIndexerItem{Action: "index", Body: strings.NewReader(`{"title":"foo"}`)}); err != nil { |
| 2356 | t.Fatalf("Unexpected error: %s", err) |
| 2357 | } |
| 2358 | |
| 2359 | // Upper bound of the jittered interval is 100ms; 300ms is generous. |
| 2360 | time.Sleep(300 * time.Millisecond) |
| 2361 | |
| 2362 | stats := bi.Stats() |
| 2363 | if stats.NumFlushed != 1 { |
| 2364 | t.Errorf("Unexpected NumFlushed: want=1, got=%d", stats.NumFlushed) |
| 2365 | } |
| 2366 | |
| 2367 | // Let any in-flight ticker settle before Close to avoid racing the defer flush. |
| 2368 | time.Sleep(150 * time.Millisecond) |
| 2369 | bi.Close(context.Background()) |
| 2370 | }) |
| 2371 | |
| 2372 | t.Run("default FlushJitter is zero", func(t *testing.T) { |
| 2373 | es, err := elasticsearch.New(elasticsearch.WithTransportOptions( |
| 2374 | elastictransport.WithTransport(&mockTransport{ |
| 2375 | RoundTripFunc: func(*http.Request) (*http.Response, error) { |
| 2376 | return &http.Response{ |
| 2377 | StatusCode: http.StatusOK, |
| 2378 | Status: "200 OK", |
| 2379 | Body: io.NopCloser(strings.NewReader(`{}`)), |
| 2380 | Header: http.Header{"X-Elastic-Product": []string{"Elasticsearch"}}, |
| 2381 | }, nil |
| 2382 | }, |
| 2383 | }), |
nothing calls this directly
no test coverage detected