(key topicPartition, batch *writeBatch)
| 725 | } |
| 726 | |
| 727 | func (w *Writer) produce(key topicPartition, batch *writeBatch) (*ProduceResponse, error) { |
| 728 | timeout := w.writeTimeout() |
| 729 | |
| 730 | ctx, cancel := context.WithTimeout(context.Background(), timeout) |
| 731 | defer cancel() |
| 732 | |
| 733 | return w.client(timeout).Produce(ctx, &ProduceRequest{ |
| 734 | Partition: int(key.partition), |
| 735 | Topic: key.topic, |
| 736 | RequiredAcks: w.RequiredAcks, |
| 737 | Compression: w.Compression, |
| 738 | Records: &writerRecords{ |
| 739 | msgs: batch.msgs, |
| 740 | }, |
| 741 | }) |
| 742 | } |
| 743 | |
| 744 | func (w *Writer) partitions(ctx context.Context, topic string) (int, error) { |
| 745 | client := w.client(w.readTimeout()) |
no test coverage detected