flushBuffer writes out the worker buffer. nolint:gocyclo
(ctx context.Context)
| 682 | // |
| 683 | //nolint:gocyclo |
| 684 | func (w *worker) flushBuffer(ctx context.Context) error { |
| 685 | if w.bi.config.OnFlushStart != nil { |
| 686 | ctx = w.bi.config.OnFlushStart(ctx) |
| 687 | } |
| 688 | |
| 689 | if w.bi.config.OnFlushEnd != nil { |
| 690 | defer func() { w.bi.config.OnFlushEnd(ctx) }() |
| 691 | } |
| 692 | |
| 693 | bufLen := w.buf.Len() |
| 694 | |
| 695 | if bufLen < 1 { |
| 696 | if w.bi.config.DebugLogger != nil { |
| 697 | w.bi.config.DebugLogger.Printf("[worker-%03d] Flush: Buffer empty\n", w.id) |
| 698 | } |
| 699 | return nil |
| 700 | } |
| 701 | |
| 702 | var ( |
| 703 | err error |
| 704 | blk BulkIndexerResponse |
| 705 | ) |
| 706 | |
| 707 | defer func() { |
| 708 | w.items = nil |
| 709 | if w.buf.Cap() > w.bi.config.FlushBytes { |
| 710 | w.buf = bytes.NewBuffer(make([]byte, 0, w.bi.config.FlushBytes)) |
| 711 | } else { |
| 712 | w.buf.Reset() |
| 713 | } |
| 714 | }() |
| 715 | |
| 716 | if w.bi.config.DebugLogger != nil { |
| 717 | w.bi.config.DebugLogger.Printf("[worker-%03d] Flush: %s\n", w.id, w.buf.String()) |
| 718 | } |
| 719 | |
| 720 | atomic.AddUint64(&w.bi.stats.numRequests, 1) |
| 721 | req := esapi.BulkRequest{ |
| 722 | Index: w.bi.config.Index, |
| 723 | Body: w.buf, |
| 724 | |
| 725 | Pipeline: w.bi.config.Pipeline, |
| 726 | Refresh: w.bi.config.Refresh, |
| 727 | Source: w.bi.config.Source, |
| 728 | SourceExcludes: w.bi.config.SourceExcludes, |
| 729 | SourceIncludes: w.bi.config.SourceIncludes, |
| 730 | Timeout: w.bi.config.Timeout, |
| 731 | WaitForActiveShards: w.bi.config.WaitForActiveShards, |
| 732 | |
| 733 | Pretty: w.bi.config.Pretty, |
| 734 | Human: w.bi.config.Human, |
| 735 | ErrorTrace: w.bi.config.ErrorTrace, |
| 736 | FilterPath: w.bi.config.FilterPath, |
| 737 | Header: w.bi.config.Header.Clone(), |
| 738 | } |
| 739 | if w.bi.config.Routing != "" { |
| 740 | req.Routing = strings.Split(w.bi.config.Routing, ",") |
| 741 | } |
no test coverage detected