run launches the worker in a goroutine.
()
| 566 | |
| 567 | // run launches the worker in a goroutine. |
| 568 | func (w *worker) run() { |
| 569 | go func() { |
| 570 | ctx := context.Background() |
| 571 | |
| 572 | if w.bi.config.DebugLogger != nil { |
| 573 | w.bi.config.DebugLogger.Printf("[worker-%03d] Started\n", w.id) |
| 574 | } |
| 575 | defer func() { |
| 576 | w.flush(ctx) |
| 577 | w.ticker.Stop() |
| 578 | w.bi.wg.Done() |
| 579 | }() |
| 580 | |
| 581 | for { |
| 582 | select { |
| 583 | case <-w.ticker.C: |
| 584 | if w.bi.config.DebugLogger != nil { |
| 585 | w.bi.config.DebugLogger.Printf("[worker-%03d] Auto-flushing after %s\n", |
| 586 | w.id, w.bi.config.FlushInterval) |
| 587 | } |
| 588 | w.flush(ctx) |
| 589 | case item, ok := <-w.ch: |
| 590 | if !ok { |
| 591 | return |
| 592 | } |
| 593 | |
| 594 | if item.flushDone != nil { |
| 595 | if w.bi.config.DebugLogger != nil { |
| 596 | w.bi.config.DebugLogger.Printf("[worker-%03d] Flush sentinel received\n", w.id) |
| 597 | } |
| 598 | w.flush(ctx) |
| 599 | close(item.flushDone) |
| 600 | continue |
| 601 | } |
| 602 | |
| 603 | if w.bi.config.DebugLogger != nil { |
| 604 | w.bi.config.DebugLogger.Printf("[worker-%03d] Received item [%s:%s]\n", w.id, item.Action, item.DocumentID) |
| 605 | } |
| 606 | |
| 607 | oversizePayload := w.bi.config.FlushBytes <= item.payloadLength |
| 608 | if !oversizePayload && w.buf.Len() > 0 && w.buf.Len()+item.payloadLength >= w.bi.config.FlushBytes { |
| 609 | w.flush(ctx) |
| 610 | } |
| 611 | |
| 612 | if err := w.writeMeta(&item); err != nil { |
| 613 | if item.OnFailure != nil { |
| 614 | item.OnFailure(item.ctx, item, BulkIndexerResponseItem{}, err) |
| 615 | } |
| 616 | atomic.AddUint64(&w.bi.stats.numFailed, 1) |
| 617 | continue |
| 618 | } |
| 619 | |
| 620 | if err := w.writeBody(&item); err != nil { |
| 621 | if item.OnFailure != nil { |
| 622 | item.OnFailure(item.ctx, item, BulkIndexerResponseItem{}, err) |
| 623 | } |
| 624 | atomic.AddUint64(&w.bi.stats.numFailed, 1) |
| 625 | continue |