| 313 | } |
| 314 | |
| 315 | func (mfr *MockFetchResponse) SetMessageWithKey(topic string, partition int32, offset int64, key, msg Encoder) *MockFetchResponse { |
| 316 | mfr.messagesLock.Lock() |
| 317 | defer mfr.messagesLock.Unlock() |
| 318 | partitions := mfr.messages[topic] |
| 319 | if partitions == nil { |
| 320 | partitions = make(map[int32]map[int64]*mockMessage) |
| 321 | mfr.messages[topic] = partitions |
| 322 | } |
| 323 | messages := partitions[partition] |
| 324 | if messages == nil { |
| 325 | messages = make(map[int64]*mockMessage) |
| 326 | partitions[partition] = messages |
| 327 | } |
| 328 | messages[offset] = newMockMessage(key, msg) |
| 329 | return mfr |
| 330 | } |
| 331 | |
| 332 | func (mfr *MockFetchResponse) SetHighWaterMark(topic string, partition int32, offset int64) *MockFetchResponse { |
| 333 | partitions := mfr.highWaterMarks[topic] |