| 458 | } |
| 459 | |
| 460 | func readFetchResponseHeaderV10(r *bufio.Reader, size int) (throttle int32, watermark int64, remain int, err error) { |
| 461 | var n int32 |
| 462 | var errorCode int16 |
| 463 | type AbortedTransaction struct { |
| 464 | ProducerId int64 |
| 465 | FirstOffset int64 |
| 466 | } |
| 467 | var p struct { |
| 468 | Partition int32 |
| 469 | ErrorCode int16 |
| 470 | HighwaterMarkOffset int64 |
| 471 | LastStableOffset int64 |
| 472 | LogStartOffset int64 |
| 473 | } |
| 474 | var messageSetSize int32 |
| 475 | var abortedTransactions []AbortedTransaction |
| 476 | |
| 477 | if remain, err = readInt32(r, size, &throttle); err != nil { |
| 478 | return |
| 479 | } |
| 480 | |
| 481 | if remain, err = readInt16(r, remain, &errorCode); err != nil { |
| 482 | return |
| 483 | } |
| 484 | if errorCode != 0 { |
| 485 | err = Error(errorCode) |
| 486 | return |
| 487 | } |
| 488 | |
| 489 | if remain, err = discardInt32(r, remain); err != nil { |
| 490 | return |
| 491 | } |
| 492 | |
| 493 | if remain, err = readInt32(r, remain, &n); err != nil { |
| 494 | return |
| 495 | } |
| 496 | |
| 497 | // This error should never trigger, unless there's a bug in the kafka client |
| 498 | // or server. |
| 499 | if n != 1 { |
| 500 | err = fmt.Errorf("1 kafka topic was expected in the fetch response but the client received %d", n) |
| 501 | return |
| 502 | } |
| 503 | |
| 504 | // We ignore the topic name because we've requests messages for a single |
| 505 | // topic, unless there's a bug in the kafka server we will have received |
| 506 | // the name of the topic that we requested. |
| 507 | if remain, err = discardString(r, remain); err != nil { |
| 508 | return |
| 509 | } |
| 510 | |
| 511 | if remain, err = readInt32(r, remain, &n); err != nil { |
| 512 | return |
| 513 | } |
| 514 | |
| 515 | // This error should never trigger, unless there's a bug in the kafka client |
| 516 | // or server. |
| 517 | if n != 1 { |