Close flushes pending writes, and waits for all writes to complete before returning. Calling Close also prevents new writes from being submitted to the writer, further calls to WriteMessages and the like will fail with io.ErrClosedPipe.
()
| 553 | // the writer, further calls to WriteMessages and the like will fail with |
| 554 | // io.ErrClosedPipe. |
| 555 | func (w *Writer) Close() error { |
| 556 | w.mutex.Lock() |
| 557 | // Marking the writer as closed here causes future calls to WriteMessages to |
| 558 | // fail with io.ErrClosedPipe. Mutation of this field is synchronized on the |
| 559 | // writer's mutex to ensure that no more increments of the wait group are |
| 560 | // performed afterwards (which could otherwise race with the Wait below). |
| 561 | w.closed = true |
| 562 | |
| 563 | // close all writers to trigger any pending batches |
| 564 | for _, writer := range w.writers { |
| 565 | writer.close() |
| 566 | } |
| 567 | |
| 568 | for partition := range w.writers { |
| 569 | delete(w.writers, partition) |
| 570 | } |
| 571 | |
| 572 | w.mutex.Unlock() |
| 573 | w.group.Wait() |
| 574 | |
| 575 | if w.transport != nil { |
| 576 | w.transport.CloseIdleConnections() |
| 577 | } |
| 578 | |
| 579 | return nil |
| 580 | } |
| 581 | |
| 582 | // WriteMessages writes a batch of messages to the kafka topic configured on this |
| 583 | // writer. |