| 296 | } |
| 297 | |
| 298 | func (wb *writeBuffer) writeListOffsetRequestV1(correlationID int32, clientID, topic string, partition int32, time int64) error { |
| 299 | h := requestHeader{ |
| 300 | ApiKey: int16(listOffsets), |
| 301 | ApiVersion: int16(v1), |
| 302 | CorrelationID: correlationID, |
| 303 | ClientID: clientID, |
| 304 | } |
| 305 | h.Size = (h.size() - 4) + |
| 306 | 4 + // replica ID |
| 307 | 4 + // topic array length |
| 308 | sizeofString(topic) + // topic |
| 309 | 4 + // partition array length |
| 310 | 4 + // partition |
| 311 | 8 // time |
| 312 | |
| 313 | h.writeTo(wb) |
| 314 | wb.writeInt32(-1) // replica ID |
| 315 | |
| 316 | // topic array |
| 317 | wb.writeArrayLen(1) |
| 318 | wb.writeString(topic) |
| 319 | |
| 320 | // partition array |
| 321 | wb.writeArrayLen(1) |
| 322 | wb.writeInt32(partition) |
| 323 | wb.writeInt64(time) |
| 324 | |
| 325 | return wb.Flush() |
| 326 | } |
| 327 | |
| 328 | func (wb *writeBuffer) writeProduceRequestV2(codec CompressionCodec, correlationID int32, clientID, topic string, partition int32, timeout time.Duration, requiredAcks int16, msgs ...Message) (err error) { |
| 329 | var size int32 |