(t *testing.T)
| 1094 | } |
| 1095 | |
| 1096 | func TestAsyncProducerRetryShutdown(t *testing.T) { |
| 1097 | seedBroker := NewMockBroker(t, 1) |
| 1098 | leader := NewMockBroker(t, 2) |
| 1099 | |
| 1100 | metadataLeader := new(MetadataResponse) |
| 1101 | metadataLeader.AddBroker(leader.Addr(), leader.BrokerID()) |
| 1102 | metadataLeader.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError) |
| 1103 | seedBroker.Returns(metadataLeader) |
| 1104 | |
| 1105 | config := NewTestConfig() |
| 1106 | config.Producer.Flush.Messages = 10 |
| 1107 | config.Producer.Return.Successes = true |
| 1108 | config.Producer.Retry.Backoff = 0 |
| 1109 | producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config) |
| 1110 | if err != nil { |
| 1111 | t.Fatal(err) |
| 1112 | } |
| 1113 | |
| 1114 | for range 10 { |
| 1115 | producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)} |
| 1116 | } |
| 1117 | producer.AsyncClose() |
| 1118 | time.Sleep(5 * time.Millisecond) // let the shutdown goroutine kick in |
| 1119 | |
| 1120 | producer.Input() <- &ProducerMessage{Topic: "FOO"} |
| 1121 | if err := <-producer.Errors(); !errors.Is(err.Err, ErrShuttingDown) { |
| 1122 | t.Error(err) |
| 1123 | } |
| 1124 | |
| 1125 | prodNotLeader := new(ProduceResponse) |
| 1126 | prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition) |
| 1127 | leader.Returns(prodNotLeader) |
| 1128 | |
| 1129 | leader.Returns(metadataLeader) |
| 1130 | |
| 1131 | prodSuccess := new(ProduceResponse) |
| 1132 | prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError) |
| 1133 | leader.Returns(prodSuccess) |
| 1134 | expectResults(t, producer, 10, 0) |
| 1135 | |
| 1136 | seedBroker.Close() |
| 1137 | leader.Close() |
| 1138 | |
| 1139 | // wait for the async-closed producer to shut down fully |
| 1140 | for err := range producer.Errors() { |
| 1141 | t.Error(err) |
| 1142 | } |
| 1143 | } |
| 1144 | |
| 1145 | func TestAsyncProducerNoReturns(t *testing.T) { |
| 1146 | seedBroker := NewMockBroker(t, 1) |
nothing calls this directly
no test coverage detected