| 733 | } |
| 734 | |
| 735 | func (r *FetchResponse) SetLastOffsetDelta(topic string, partition int32, offset int32) { |
| 736 | frb := r.getOrCreateBlock(topic, partition) |
| 737 | if len(frb.RecordsSet) == 0 { |
| 738 | records := newDefaultRecords(&RecordBatch{Version: 2}) |
| 739 | frb.RecordsSet = []*Records{&records} |
| 740 | } |
| 741 | batch := frb.RecordsSet[0].RecordBatch |
| 742 | batch.LastOffsetDelta = offset |
| 743 | } |
| 744 | |
| 745 | func (r *FetchResponse) SetLastStableOffset(topic string, partition int32, offset int64) { |
| 746 | frb := r.getOrCreateBlock(topic, partition) |