(t *testing.T)
| 163 | } |
| 164 | |
| 165 | func TestSyncProducerBatch(t *testing.T) { |
| 166 | seedBroker := NewMockBroker(t, 1) |
| 167 | leader := NewMockBroker(t, 2) |
| 168 | |
| 169 | metadataResponse := new(MetadataResponse) |
| 170 | metadataResponse.AddBroker(leader.Addr(), leader.BrokerID()) |
| 171 | metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError) |
| 172 | seedBroker.Returns(metadataResponse) |
| 173 | |
| 174 | prodSuccess := new(ProduceResponse) |
| 175 | prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError) |
| 176 | leader.Returns(prodSuccess) |
| 177 | |
| 178 | config := NewTestConfig() |
| 179 | config.Producer.Flush.Messages = 3 |
| 180 | config.Producer.Return.Successes = true |
| 181 | producer, err := NewSyncProducer([]string{seedBroker.Addr()}, config) |
| 182 | if err != nil { |
| 183 | t.Fatal(err) |
| 184 | } |
| 185 | |
| 186 | err = producer.SendMessages([]*ProducerMessage{ |
| 187 | { |
| 188 | Topic: "my_topic", |
| 189 | Value: StringEncoder(TestMessage), |
| 190 | Metadata: "test", |
| 191 | }, |
| 192 | { |
| 193 | Topic: "my_topic", |
| 194 | Value: StringEncoder(TestMessage), |
| 195 | Metadata: "test", |
| 196 | }, |
| 197 | { |
| 198 | Topic: "my_topic", |
| 199 | Value: StringEncoder(TestMessage), |
| 200 | Metadata: "test", |
| 201 | }, |
| 202 | }) |
| 203 | |
| 204 | if err != nil { |
| 205 | t.Error(err) |
| 206 | } |
| 207 | |
| 208 | safeClose(t, producer) |
| 209 | leader.Close() |
| 210 | seedBroker.Close() |
| 211 | } |
| 212 | |
| 213 | func TestConcurrentSyncProducer(t *testing.T) { |
| 214 | seedBroker := NewMockBroker(t, 1) |
nothing calls this directly
no test coverage detected