(r *bufio.Reader, size int)
| 305 | } |
| 306 | |
| 307 | func readFetchResponseHeaderV2(r *bufio.Reader, size int) (throttle int32, watermark int64, remain int, err error) { |
| 308 | var n int32 |
| 309 | var p struct { |
| 310 | Partition int32 |
| 311 | ErrorCode int16 |
| 312 | HighwaterMarkOffset int64 |
| 313 | MessageSetSize int32 |
| 314 | } |
| 315 | |
| 316 | if remain, err = readInt32(r, size, &throttle); err != nil { |
| 317 | return |
| 318 | } |
| 319 | |
| 320 | if remain, err = readInt32(r, remain, &n); err != nil { |
| 321 | return |
| 322 | } |
| 323 | |
| 324 | // This error should never trigger, unless there's a bug in the kafka client |
| 325 | // or server. |
| 326 | if n != 1 { |
| 327 | err = fmt.Errorf("1 kafka topic was expected in the fetch response but the client received %d", n) |
| 328 | return |
| 329 | } |
| 330 | |
| 331 | // We ignore the topic name because we've requests messages for a single |
| 332 | // topic, unless there's a bug in the kafka server we will have received |
| 333 | // the name of the topic that we requested. |
| 334 | if remain, err = discardString(r, remain); err != nil { |
| 335 | return |
| 336 | } |
| 337 | |
| 338 | if remain, err = readInt32(r, remain, &n); err != nil { |
| 339 | return |
| 340 | } |
| 341 | |
| 342 | // This error should never trigger, unless there's a bug in the kafka client |
| 343 | // or server. |
| 344 | if n != 1 { |
| 345 | err = fmt.Errorf("1 kafka partition was expected in the fetch response but the client received %d", n) |
| 346 | return |
| 347 | } |
| 348 | |
| 349 | if remain, err = read(r, remain, &p); err != nil { |
| 350 | return |
| 351 | } |
| 352 | |
| 353 | if p.ErrorCode != 0 { |
| 354 | err = Error(p.ErrorCode) |
| 355 | return |
| 356 | } |
| 357 | |
| 358 | // This error should never trigger, unless there's a bug in the kafka client |
| 359 | // or server. |
| 360 | if remain != int(p.MessageSetSize) { |
| 361 | err = fmt.Errorf("the size of the message set in a fetch response doesn't match the number of remaining bytes (message set size = %d, remaining bytes = %d)", p.MessageSetSize, remain) |
| 362 | return |
| 363 | } |
| 364 |
no test coverage detected