Add adds an item to the indexer. Adding an item after a call to Close() will panic.
(ctx context.Context, item BulkIndexerItem)
| 367 | // |
| 368 | // Adding an item after a call to Close() will panic. |
| 369 | func (bi *bulkIndexer) Add(ctx context.Context, item BulkIndexerItem) error { |
| 370 | atomic.AddUint64(&bi.stats.numAdded, 1) |
| 371 | |
| 372 | if item.ctx == nil { |
| 373 | item.ctx = ctx |
| 374 | } |
| 375 | |
| 376 | // Serialize metadata to JSON |
| 377 | item.marshallMeta() |
| 378 | // Compute length for body & metadata |
| 379 | if err := item.computeLength(); err != nil { |
| 380 | return err |
| 381 | } |
| 382 | |
| 383 | n := bi.addCounter.Add(1) - 1 |
| 384 | numWorkers := uint64(len(bi.workers)) |
| 385 | primary := bi.workers[n%numWorkers] |
| 386 | |
| 387 | // Avoid the O(N) scan when the hashed worker can take the item immediately. |
| 388 | select { |
| 389 | case primary.ch <- item: |
| 390 | return nil |
| 391 | default: |
| 392 | } |
| 393 | |
| 394 | // Spread load: prefer making progress on any idle worker over blocking |
| 395 | // on one busy worker. Start after primary to avoid retrying it and to |
| 396 | // distribute scan pressure evenly under sustained back-pressure. |
| 397 | for i := uint64(1); i < numWorkers; i++ { |
| 398 | w := bi.workers[(n+i)%numWorkers] |
| 399 | select { |
| 400 | case w.ch <- item: |
| 401 | return nil |
| 402 | default: |
| 403 | } |
| 404 | } |
| 405 | |
| 406 | // Back-pressure: all queues full; block until primary drains or the |
| 407 | // context is cancelled. |
| 408 | select { |
| 409 | case <-ctx.Done(): |
| 410 | if bi.config.OnError != nil { |
| 411 | bi.config.OnError(ctx, ctx.Err()) |
| 412 | } |
| 413 | return ctx.Err() |
| 414 | case primary.ch <- item: |
| 415 | } |
| 416 | |
| 417 | return nil |
| 418 | } |
| 419 | |
| 420 | // Close stops the periodic flush, closes the indexer queue channel, |
| 421 | // which triggers the workers to flush and stop. |
nothing calls this directly
no test coverage detected