NewBulkIndexer creates a new bulk indexer. If cfg.Client is nil, a default client is created via [elasticsearch.NewBase]; see its documentation for address resolution. The auto-created client is closed by [BulkIndexer.Close]. Any error from the underlying client construction is returned.
(cfg BulkIndexerConfig)
| 322 | // closed by [BulkIndexer.Close]. Any error from the underlying client |
| 323 | // construction is returned. |
| 324 | func NewBulkIndexer(cfg BulkIndexerConfig) (BulkIndexer, error) { |
| 325 | var ownedClient *elasticsearch.BaseClient |
| 326 | if cfg.Client == nil { |
| 327 | client, err := elasticsearch.NewBase() |
| 328 | if err != nil { |
| 329 | return nil, err |
| 330 | } |
| 331 | cfg.Client = client |
| 332 | ownedClient = client |
| 333 | } |
| 334 | |
| 335 | if cfg.Decoder == nil { |
| 336 | cfg.Decoder = defaultJSONDecoder{} |
| 337 | } |
| 338 | |
| 339 | if cfg.NumWorkers <= 0 { |
| 340 | cfg.NumWorkers = runtime.NumCPU() |
| 341 | } |
| 342 | |
| 343 | if cfg.FlushBytes <= 0 { |
| 344 | cfg.FlushBytes = 5e+6 |
| 345 | } |
| 346 | |
| 347 | if cfg.FlushInterval <= 0 { |
| 348 | cfg.FlushInterval = 30 * time.Second |
| 349 | } |
| 350 | |
| 351 | if cfg.QueueSizeMultiplier <= 0 { |
| 352 | cfg.QueueSizeMultiplier = 1 |
| 353 | } |
| 354 | |
| 355 | bi := bulkIndexer{ |
| 356 | config: cfg, |
| 357 | stats: &bulkIndexerStats{}, |
| 358 | ownedClient: ownedClient, |
| 359 | } |
| 360 | |
| 361 | bi.init() |
| 362 | |
| 363 | return &bi, nil |
| 364 | } |
| 365 | |
| 366 | // Add adds an item to the indexer. |
| 367 | // |