(correlationID int32, clientID, topic string, partition int32, offset int64, minBytes, maxBytes int, maxWait time.Duration, isolationLevel int8)
| 202 | } |
| 203 | |
| 204 | func (wb *writeBuffer) writeFetchRequestV5(correlationID int32, clientID, topic string, partition int32, offset int64, minBytes, maxBytes int, maxWait time.Duration, isolationLevel int8) error { |
| 205 | h := requestHeader{ |
| 206 | ApiKey: int16(fetch), |
| 207 | ApiVersion: int16(v5), |
| 208 | CorrelationID: correlationID, |
| 209 | ClientID: clientID, |
| 210 | } |
| 211 | h.Size = (h.size() - 4) + |
| 212 | 4 + // replica ID |
| 213 | 4 + // max wait time |
| 214 | 4 + // min bytes |
| 215 | 4 + // max bytes |
| 216 | 1 + // isolation level |
| 217 | 4 + // topic array length |
| 218 | sizeofString(topic) + |
| 219 | 4 + // partition array length |
| 220 | 4 + // partition |
| 221 | 8 + // offset |
| 222 | 8 + // log start offset |
| 223 | 4 // max bytes |
| 224 | |
| 225 | h.writeTo(wb) |
| 226 | wb.writeInt32(-1) // replica ID |
| 227 | wb.writeInt32(milliseconds(maxWait)) |
| 228 | wb.writeInt32(int32(minBytes)) |
| 229 | wb.writeInt32(int32(maxBytes)) |
| 230 | wb.writeInt8(isolationLevel) // isolation level 0 - read uncommitted |
| 231 | |
| 232 | // topic array |
| 233 | wb.writeArrayLen(1) |
| 234 | wb.writeString(topic) |
| 235 | |
| 236 | // partition array |
| 237 | wb.writeArrayLen(1) |
| 238 | wb.writeInt32(partition) |
| 239 | wb.writeInt64(offset) |
| 240 | wb.writeInt64(int64(0)) // log start offset only used when is sent by follower |
| 241 | wb.writeInt32(int32(maxBytes)) |
| 242 | |
| 243 | return wb.Flush() |
| 244 | } |
| 245 | |
| 246 | func (wb *writeBuffer) writeFetchRequestV10(correlationID int32, clientID, topic string, partition int32, offset int64, minBytes, maxBytes int, maxWait time.Duration, isolationLevel int8) error { |
| 247 | h := requestHeader{ |
no test coverage detected