(t *testing.T)
| 136 | } |
| 137 | |
| 138 | func TestAsyncProducer(t *testing.T) { |
| 139 | seedBroker := NewMockBroker(t, 1) |
| 140 | leader := NewMockBroker(t, 2) |
| 141 | |
| 142 | metadataResponse := new(MetadataResponse) |
| 143 | metadataResponse.AddBroker(leader.Addr(), leader.BrokerID()) |
| 144 | metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError) |
| 145 | seedBroker.Returns(metadataResponse) |
| 146 | |
| 147 | prodSuccess := new(ProduceResponse) |
| 148 | prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError) |
| 149 | leader.Returns(prodSuccess) |
| 150 | |
| 151 | config := NewTestConfig() |
| 152 | config.Producer.Flush.Messages = 10 |
| 153 | config.Producer.Return.Successes = true |
| 154 | producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config) |
| 155 | if err != nil { |
| 156 | t.Fatal(err) |
| 157 | } |
| 158 | |
| 159 | for i := range 10 { |
| 160 | producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Metadata: i} |
| 161 | } |
| 162 | for i := range 10 { |
| 163 | select { |
| 164 | case msg := <-producer.Errors(): |
| 165 | t.Error(msg.Err) |
| 166 | if msg.Msg.flags != 0 { |
| 167 | t.Error("Message had flags set") |
| 168 | } |
| 169 | case msg := <-producer.Successes(): |
| 170 | if msg.flags != 0 { |
| 171 | t.Error("Message had flags set") |
| 172 | } |
| 173 | if msg.Metadata.(int) != i { |
| 174 | t.Error("Message metadata did not match") |
| 175 | } |
| 176 | case <-time.After(time.Second): |
| 177 | t.Errorf("Timeout waiting for msg #%d", i) |
| 178 | goto done |
| 179 | } |
| 180 | } |
| 181 | done: |
| 182 | closeProducer(t, producer) |
| 183 | leader.Close() |
| 184 | seedBroker.Close() |
| 185 | } |
| 186 | |
| 187 | func TestAsyncProducerMultipleFlushes(t *testing.T) { |
| 188 | seedBroker := NewMockBroker(t, 1) |
nothing calls this directly
no test coverage detected