| 905 | } |
| 906 | |
| 907 | func (w *Writer) chooseTopic(msg Message) (string, error) { |
| 908 | // w.Topic and msg.Topic are mutually exclusive, meaning only 1 must be set |
| 909 | // otherwise we will return an error. |
| 910 | if w.Topic != "" && msg.Topic != "" { |
| 911 | return "", errors.New("kafka.(*Writer): Topic must not be specified for both Writer and Message") |
| 912 | } else if w.Topic == "" && msg.Topic == "" { |
| 913 | return "", errors.New("kafka.(*Writer): Topic must be specified for Writer or Message") |
| 914 | } |
| 915 | |
| 916 | // now we choose the topic, depending on which one is not empty |
| 917 | if msg.Topic != "" { |
| 918 | return msg.Topic, nil |
| 919 | } |
| 920 | |
| 921 | return w.Topic, nil |
| 922 | } |
| 923 | |
| 924 | type batchQueue struct { |
| 925 | queue []*writeBatch |