MCPcopy
hub / github.com/elastic/go-elasticsearch / run

Method run

esutil/bulk_indexer.go:568–639  ·  view source on GitHub ↗

run launches the worker in a goroutine.

()

Source from the content-addressed store, hash-verified

566
567// run launches the worker in a goroutine.
568func (w *worker) run() {
569 go func() {
570 ctx := context.Background()
571
572 if w.bi.config.DebugLogger != nil {
573 w.bi.config.DebugLogger.Printf("[worker-%03d] Started\n", w.id)
574 }
575 defer func() {
576 w.flush(ctx)
577 w.ticker.Stop()
578 w.bi.wg.Done()
579 }()
580
581 for {
582 select {
583 case <-w.ticker.C:
584 if w.bi.config.DebugLogger != nil {
585 w.bi.config.DebugLogger.Printf("[worker-%03d] Auto-flushing after %s\n",
586 w.id, w.bi.config.FlushInterval)
587 }
588 w.flush(ctx)
589 case item, ok := <-w.ch:
590 if !ok {
591 return
592 }
593
594 if item.flushDone != nil {
595 if w.bi.config.DebugLogger != nil {
596 w.bi.config.DebugLogger.Printf("[worker-%03d] Flush sentinel received\n", w.id)
597 }
598 w.flush(ctx)
599 close(item.flushDone)
600 continue
601 }
602
603 if w.bi.config.DebugLogger != nil {
604 w.bi.config.DebugLogger.Printf("[worker-%03d] Received item [%s:%s]\n", w.id, item.Action, item.DocumentID)
605 }
606
607 oversizePayload := w.bi.config.FlushBytes <= item.payloadLength
608 if !oversizePayload && w.buf.Len() > 0 && w.buf.Len()+item.payloadLength >= w.bi.config.FlushBytes {
609 w.flush(ctx)
610 }
611
612 if err := w.writeMeta(&item); err != nil {
613 if item.OnFailure != nil {
614 item.OnFailure(item.ctx, item, BulkIndexerResponseItem{}, err)
615 }
616 atomic.AddUint64(&w.bi.stats.numFailed, 1)
617 continue
618 }
619
620 if err := w.writeBody(&item); err != nil {
621 if item.OnFailure != nil {
622 item.OnFailure(item.ctx, item, BulkIndexerResponseItem{}, err)
623 }
624 atomic.AddUint64(&w.bi.stats.numFailed, 1)
625 continue

Callers 1

initMethod · 0.95

Calls 6

flushMethod · 0.95
writeMetaMethod · 0.95
writeBodyMethod · 0.95
PrintfMethod · 0.80
StopMethod · 0.45
OnFailureMethod · 0.45

Tested by

no test coverage detected