(codec CompressionCodec, msgs ...Message)
| 1129 | } |
| 1130 | |
| 1131 | func (c *Conn) writeCompressedMessages(codec CompressionCodec, msgs ...Message) (nbytes int, partition int32, offset int64, appendTime time.Time, err error) { |
| 1132 | if len(msgs) == 0 { |
| 1133 | return |
| 1134 | } |
| 1135 | |
| 1136 | writeTime := time.Now() |
| 1137 | for i, msg := range msgs { |
| 1138 | // users may believe they can set the Topic and/or Partition |
| 1139 | // on the kafka message. |
| 1140 | if msg.Topic != "" && msg.Topic != c.topic { |
| 1141 | err = errInvalidWriteTopic |
| 1142 | return |
| 1143 | } |
| 1144 | if msg.Partition != 0 { |
| 1145 | err = errInvalidWritePartition |
| 1146 | return |
| 1147 | } |
| 1148 | |
| 1149 | if msg.Time.IsZero() { |
| 1150 | msgs[i].Time = writeTime |
| 1151 | } |
| 1152 | |
| 1153 | nbytes += len(msg.Key) + len(msg.Value) |
| 1154 | } |
| 1155 | |
| 1156 | var produceVersion apiVersion |
| 1157 | if produceVersion, err = c.negotiateVersion(produce, v2, v3, v7); err != nil { |
| 1158 | return |
| 1159 | } |
| 1160 | |
| 1161 | err = c.writeOperation( |
| 1162 | func(deadline time.Time, id int32) error { |
| 1163 | now := time.Now() |
| 1164 | deadline = adjustDeadlineForRTT(deadline, now, defaultRTT) |
| 1165 | switch produceVersion { |
| 1166 | case v7: |
| 1167 | recordBatch, err := |
| 1168 | newRecordBatch( |
| 1169 | codec, |
| 1170 | msgs..., |
| 1171 | ) |
| 1172 | if err != nil { |
| 1173 | return err |
| 1174 | } |
| 1175 | return c.wb.writeProduceRequestV7( |
| 1176 | id, |
| 1177 | c.clientID, |
| 1178 | c.topic, |
| 1179 | c.partition, |
| 1180 | deadlineToTimeout(deadline, now), |
| 1181 | int16(atomic.LoadInt32(&c.requiredAcks)), |
| 1182 | c.transactionalID, |
| 1183 | recordBatch, |
| 1184 | ) |
| 1185 | case v3: |
| 1186 | recordBatch, err := |
| 1187 | newRecordBatch( |
| 1188 | codec, |
no test coverage detected