(correlationID int32, clientID, topic string, partition int32, offset int64, minBytes, maxBytes int, maxWait time.Duration, isolationLevel int8)
| 244 | } |
| 245 | |
| 246 | func (wb *writeBuffer) writeFetchRequestV10(correlationID int32, clientID, topic string, partition int32, offset int64, minBytes, maxBytes int, maxWait time.Duration, isolationLevel int8) error { |
| 247 | h := requestHeader{ |
| 248 | ApiKey: int16(fetch), |
| 249 | ApiVersion: int16(v10), |
| 250 | CorrelationID: correlationID, |
| 251 | ClientID: clientID, |
| 252 | } |
| 253 | h.Size = (h.size() - 4) + |
| 254 | 4 + // replica ID |
| 255 | 4 + // max wait time |
| 256 | 4 + // min bytes |
| 257 | 4 + // max bytes |
| 258 | 1 + // isolation level |
| 259 | 4 + // session ID |
| 260 | 4 + // session epoch |
| 261 | 4 + // topic array length |
| 262 | sizeofString(topic) + |
| 263 | 4 + // partition array length |
| 264 | 4 + // partition |
| 265 | 4 + // current leader epoch |
| 266 | 8 + // fetch offset |
| 267 | 8 + // log start offset |
| 268 | 4 + // partition max bytes |
| 269 | 4 // forgotten topics data |
| 270 | |
| 271 | h.writeTo(wb) |
| 272 | wb.writeInt32(-1) // replica ID |
| 273 | wb.writeInt32(milliseconds(maxWait)) |
| 274 | wb.writeInt32(int32(minBytes)) |
| 275 | wb.writeInt32(int32(maxBytes)) |
| 276 | wb.writeInt8(isolationLevel) // isolation level 0 - read uncommitted |
| 277 | wb.writeInt32(0) //FIXME |
| 278 | wb.writeInt32(-1) //FIXME |
| 279 | |
| 280 | // topic array |
| 281 | wb.writeArrayLen(1) |
| 282 | wb.writeString(topic) |
| 283 | |
| 284 | // partition array |
| 285 | wb.writeArrayLen(1) |
| 286 | wb.writeInt32(partition) |
| 287 | wb.writeInt32(-1) //FIXME |
| 288 | wb.writeInt64(offset) |
| 289 | wb.writeInt64(int64(0)) // log start offset only used when is sent by follower |
| 290 | wb.writeInt32(int32(maxBytes)) |
| 291 | |
| 292 | // forgotten topics array |
| 293 | wb.writeArrayLen(0) // forgotten topics not supported yet |
| 294 | |
| 295 | return wb.Flush() |
| 296 | } |
| 297 | |
| 298 | func (wb *writeBuffer) writeListOffsetRequestV1(correlationID int32, clientID, topic string, partition int32, time int64) error { |
| 299 | h := requestHeader{ |
no test coverage detected