| 166 | } |
| 167 | |
| 168 | func (wb *writeBuffer) writeFetchRequestV2(correlationID int32, clientID, topic string, partition int32, offset int64, minBytes, maxBytes int, maxWait time.Duration) error { |
| 169 | h := requestHeader{ |
| 170 | ApiKey: int16(fetch), |
| 171 | ApiVersion: int16(v2), |
| 172 | CorrelationID: correlationID, |
| 173 | ClientID: clientID, |
| 174 | } |
| 175 | h.Size = (h.size() - 4) + |
| 176 | 4 + // replica ID |
| 177 | 4 + // max wait time |
| 178 | 4 + // min bytes |
| 179 | 4 + // topic array length |
| 180 | sizeofString(topic) + |
| 181 | 4 + // partition array length |
| 182 | 4 + // partition |
| 183 | 8 + // offset |
| 184 | 4 // max bytes |
| 185 | |
| 186 | h.writeTo(wb) |
| 187 | wb.writeInt32(-1) // replica ID |
| 188 | wb.writeInt32(milliseconds(maxWait)) |
| 189 | wb.writeInt32(int32(minBytes)) |
| 190 | |
| 191 | // topic array |
| 192 | wb.writeArrayLen(1) |
| 193 | wb.writeString(topic) |
| 194 | |
| 195 | // partition array |
| 196 | wb.writeArrayLen(1) |
| 197 | wb.writeInt32(partition) |
| 198 | wb.writeInt64(offset) |
| 199 | wb.writeInt32(int32(maxBytes)) |
| 200 | |
| 201 | return wb.Flush() |
| 202 | } |
| 203 | |
| 204 | func (wb *writeBuffer) writeFetchRequestV5(correlationID int32, clientID, topic string, partition int32, offset int64, minBytes, maxBytes int, maxWait time.Duration, isolationLevel int8) error { |
| 205 | h := requestHeader{ |