MCPcopy
hub / github.com/elastic/go-elasticsearch / Add

Method Add

esutil/bulk_indexer.go:369–418  ·  view source on GitHub ↗

Add adds an item to the indexer. Adding an item after a call to Close() will panic.

(ctx context.Context, item BulkIndexerItem)

Source from the content-addressed store, hash-verified

367//
368// Adding an item after a call to Close() will panic.
369func (bi *bulkIndexer) Add(ctx context.Context, item BulkIndexerItem) error {
370 atomic.AddUint64(&bi.stats.numAdded, 1)
371
372 if item.ctx == nil {
373 item.ctx = ctx
374 }
375
376 // Serialize metadata to JSON
377 item.marshallMeta()
378 // Compute length for body & metadata
379 if err := item.computeLength(); err != nil {
380 return err
381 }
382
383 n := bi.addCounter.Add(1) - 1
384 numWorkers := uint64(len(bi.workers))
385 primary := bi.workers[n%numWorkers]
386
387 // Avoid the O(N) scan when the hashed worker can take the item immediately.
388 select {
389 case primary.ch <- item:
390 return nil
391 default:
392 }
393
394 // Spread load: prefer making progress on any idle worker over blocking
395 // on one busy worker. Start after primary to avoid retrying it and to
396 // distribute scan pressure evenly under sustained back-pressure.
397 for i := uint64(1); i < numWorkers; i++ {
398 w := bi.workers[(n+i)%numWorkers]
399 select {
400 case w.ch <- item:
401 return nil
402 default:
403 }
404 }
405
406 // Back-pressure: all queues full; block until primary drains or the
407 // context is cancelled.
408 select {
409 case <-ctx.Done():
410 if bi.config.OnError != nil {
411 bi.config.OnError(ctx, ctx.Err())
412 }
413 return ctx.Err()
414 case primary.ch <- item:
415 }
416
417 return nil
418}
419
420// Close stops the periodic flush, closes the indexer queue channel,
421// which triggers the workers to flush and stop.

Callers

nothing calls this directly

Calls 3

marshallMetaMethod · 0.80
computeLengthMethod · 0.80
AddMethod · 0.65

Tested by

no test coverage detected