(batch *writeBatch)
| 1099 | } |
| 1100 | |
| 1101 | func (ptw *partitionWriter) writeBatch(batch *writeBatch) { |
| 1102 | stats := ptw.w.stats() |
| 1103 | stats.batchTime.observe(int64(time.Since(batch.time))) |
| 1104 | stats.batchSize.observe(int64(len(batch.msgs))) |
| 1105 | stats.batchSizeBytes.observe(batch.bytes) |
| 1106 | |
| 1107 | var res *ProduceResponse |
| 1108 | var err error |
| 1109 | key := ptw.meta |
| 1110 | for attempt, maxAttempts := 0, ptw.w.maxAttempts(); attempt < maxAttempts; attempt++ { |
| 1111 | if attempt != 0 { |
| 1112 | stats.retries.observe(1) |
| 1113 | // TODO: should there be a way to asynchronously cancel this |
| 1114 | // operation? |
| 1115 | // |
| 1116 | // * If all goroutines that added message to this batch have stopped |
| 1117 | // waiting for it, should we abort? |
| 1118 | // |
| 1119 | // * If the writer has been closed? It reduces the durability |
| 1120 | // guarantees to abort, but may be better to avoid long wait times |
| 1121 | // on close. |
| 1122 | // |
| 1123 | delay := backoff(attempt, ptw.w.writeBackoffMin(), ptw.w.writeBackoffMax()) |
| 1124 | ptw.w.withLogger(func(log Logger) { |
| 1125 | log.Printf("backing off %s writing %d messages to %s (partition: %d)", delay, len(batch.msgs), key.topic, key.partition) |
| 1126 | }) |
| 1127 | time.Sleep(delay) |
| 1128 | } |
| 1129 | |
| 1130 | ptw.w.withLogger(func(log Logger) { |
| 1131 | log.Printf("writing %d messages to %s (partition: %d)", len(batch.msgs), key.topic, key.partition) |
| 1132 | }) |
| 1133 | |
| 1134 | start := time.Now() |
| 1135 | res, err = ptw.w.produce(key, batch) |
| 1136 | |
| 1137 | stats.writes.observe(1) |
| 1138 | stats.messages.observe(int64(len(batch.msgs))) |
| 1139 | stats.bytes.observe(batch.bytes) |
| 1140 | // stats.writeTime used to report the duration of WriteMessages, but the |
| 1141 | // implementation was broken and reporting values in the nanoseconds |
| 1142 | // range. In kafka-go 0.4, we recylced this value to instead report the |
| 1143 | // duration of produce requests, and changed the stats.waitTime value to |
| 1144 | // report the time that kafka has throttled the requests for. |
| 1145 | stats.writeTime.observe(int64(time.Since(start))) |
| 1146 | |
| 1147 | if res != nil { |
| 1148 | err = res.Error |
| 1149 | stats.waitTime.observe(int64(res.Throttle)) |
| 1150 | } |
| 1151 | |
| 1152 | if err == nil { |
| 1153 | break |
| 1154 | } |
| 1155 | |
| 1156 | stats.errors.observe(1) |
| 1157 | |
| 1158 | ptw.w.withErrorLogger(func(log Logger) { |
no test coverage detected