MCPcopy
hub / github.com/segmentio/kafka-go / Close

Method Close

writer.go:555–580  ·  view source on GitHub ↗

Close flushes pending writes, and waits for all writes to complete before returning. Calling Close also prevents new writes from being submitted to the writer, further calls to WriteMessages and the like will fail with io.ErrClosedPipe.

()

Source from the content-addressed store, hash-verified

553// the writer, further calls to WriteMessages and the like will fail with
554// io.ErrClosedPipe.
555func (w *Writer) Close() error {
556 w.mutex.Lock()
557 // Marking the writer as closed here causes future calls to WriteMessages to
558 // fail with io.ErrClosedPipe. Mutation of this field is synchronized on the
559 // writer's mutex to ensure that no more increments of the wait group are
560 // performed afterwards (which could otherwise race with the Wait below).
561 w.closed = true
562
563 // close all writers to trigger any pending batches
564 for _, writer := range w.writers {
565 writer.close()
566 }
567
568 for partition := range w.writers {
569 delete(w.writers, partition)
570 }
571
572 w.mutex.Unlock()
573 w.group.Wait()
574
575 if w.transport != nil {
576 w.transport.CloseIdleConnections()
577 }
578
579 return nil
580}
581
582// WriteMessages writes a batch of messages to the kafka topic configured on this
583// writer.

Calls 2

CloseIdleConnectionsMethod · 0.80
closeMethod · 0.45