MCPcopy
hub / github.com/segmentio/kafka-go / writeBatch

Method writeBatch

writer.go:1101–1185  ·  view source on GitHub ↗
(batch *writeBatch)

Source from the content-addressed store, hash-verified

1099}
1100
1101func (ptw *partitionWriter) writeBatch(batch *writeBatch) {
1102 stats := ptw.w.stats()
1103 stats.batchTime.observe(int64(time.Since(batch.time)))
1104 stats.batchSize.observe(int64(len(batch.msgs)))
1105 stats.batchSizeBytes.observe(batch.bytes)
1106
1107 var res *ProduceResponse
1108 var err error
1109 key := ptw.meta
1110 for attempt, maxAttempts := 0, ptw.w.maxAttempts(); attempt < maxAttempts; attempt++ {
1111 if attempt != 0 {
1112 stats.retries.observe(1)
1113 // TODO: should there be a way to asynchronously cancel this
1114 // operation?
1115 //
1116 // * If all goroutines that added message to this batch have stopped
1117 // waiting for it, should we abort?
1118 //
1119 // * If the writer has been closed? It reduces the durability
1120 // guarantees to abort, but may be better to avoid long wait times
1121 // on close.
1122 //
1123 delay := backoff(attempt, ptw.w.writeBackoffMin(), ptw.w.writeBackoffMax())
1124 ptw.w.withLogger(func(log Logger) {
1125 log.Printf("backing off %s writing %d messages to %s (partition: %d)", delay, len(batch.msgs), key.topic, key.partition)
1126 })
1127 time.Sleep(delay)
1128 }
1129
1130 ptw.w.withLogger(func(log Logger) {
1131 log.Printf("writing %d messages to %s (partition: %d)", len(batch.msgs), key.topic, key.partition)
1132 })
1133
1134 start := time.Now()
1135 res, err = ptw.w.produce(key, batch)
1136
1137 stats.writes.observe(1)
1138 stats.messages.observe(int64(len(batch.msgs)))
1139 stats.bytes.observe(batch.bytes)
1140 // stats.writeTime used to report the duration of WriteMessages, but the
1141 // implementation was broken and reporting values in the nanoseconds
1142 // range. In kafka-go 0.4, we recylced this value to instead report the
1143 // duration of produce requests, and changed the stats.waitTime value to
1144 // report the time that kafka has throttled the requests for.
1145 stats.writeTime.observe(int64(time.Since(start)))
1146
1147 if res != nil {
1148 err = res.Error
1149 stats.waitTime.observe(int64(res.Throttle))
1150 }
1151
1152 if err == nil {
1153 break
1154 }
1155
1156 stats.errors.observe(1)
1157
1158 ptw.w.withErrorLogger(func(log Logger) {

Callers 1

writeBatchesMethod · 0.95

Calls 14

backoffFunction · 0.85
isTemporaryFunction · 0.85
isTransientNetworkErrorFunction · 0.85
statsMethod · 0.80
maxAttemptsMethod · 0.80
writeBackoffMinMethod · 0.80
writeBackoffMaxMethod · 0.80
produceMethod · 0.80
IsZeroMethod · 0.80
completeMethod · 0.80
PrintfMethod · 0.65
observeMethod · 0.45

Tested by

no test coverage detected