| 629 | } |
| 630 | |
| 631 | func (r *FetchResponse) AddMessageWithTimestamp(topic string, partition int32, key, value Encoder, offset int64, timestamp time.Time, version int8) { |
| 632 | frb := r.getOrCreateBlock(topic, partition) |
| 633 | kb, vb := encodeKV(key, value) |
| 634 | if r.LogAppendTime { |
| 635 | timestamp = r.Timestamp |
| 636 | } |
| 637 | msg := &Message{Key: kb, Value: vb, LogAppendTime: r.LogAppendTime, Timestamp: timestamp, Version: version} |
| 638 | msgBlock := &MessageBlock{Msg: msg, Offset: offset} |
| 639 | if len(frb.RecordsSet) == 0 { |
| 640 | records := newLegacyRecords(&MessageSet{}) |
| 641 | frb.RecordsSet = []*Records{&records} |
| 642 | } |
| 643 | set := frb.RecordsSet[0].MsgSet |
| 644 | set.Messages = append(set.Messages, msgBlock) |
| 645 | } |
| 646 | |
| 647 | func (r *FetchResponse) AddRecordWithTimestamp(topic string, partition int32, key, value Encoder, offset int64, timestamp time.Time) { |
| 648 | frb := r.getOrCreateBlock(topic, partition) |