(t *testing.T)
| 158 | } |
| 159 | |
| 160 | func TestBrokerFailedRequest(t *testing.T) { |
| 161 | for _, tt := range brokerFailedReqTestTable { |
| 162 | t.Run(tt.name, func(t *testing.T) { |
| 163 | t.Logf("Testing broker communication for %s", tt.name) |
| 164 | mb := NewMockBroker(t, 0) |
| 165 | if !tt.stopBroker { |
| 166 | mb.Returns(&mockEncoder{tt.response}) |
| 167 | } |
| 168 | broker := NewBroker(mb.Addr()) |
| 169 | // Stop the broker before calling the runner to purposefully |
| 170 | // make the request fail right away, the port will be closed |
| 171 | // and should not be reused right away |
| 172 | if tt.stopBroker { |
| 173 | t.Log("Closing broker:", mb.Addr()) |
| 174 | mb.Close() |
| 175 | } |
| 176 | conf := NewTestConfig() |
| 177 | conf.ApiVersionsRequest = false |
| 178 | conf.Version = tt.version |
| 179 | // Tune read timeout to speed up some test cases |
| 180 | conf.Net.ReadTimeout = 1 * time.Second |
| 181 | err := broker.Open(conf) |
| 182 | if err != nil { |
| 183 | t.Fatal(err) |
| 184 | } |
| 185 | tt.runner(t, broker) |
| 186 | if !tt.stopBroker { |
| 187 | mb.Close() |
| 188 | } |
| 189 | err = broker.Close() |
| 190 | if err != nil { |
| 191 | if tt.stopBroker && errors.Is(err, ErrNotConnected) { |
| 192 | // We expect the broker to not close properly |
| 193 | return |
| 194 | } |
| 195 | t.Error(err) |
| 196 | } |
| 197 | }) |
| 198 | } |
| 199 | } |
| 200 | |
| 201 | func TestBrokerClose(t *testing.T) { |
| 202 | t.Run("interrupts active async response reads", func(t *testing.T) { |
nothing calls this directly
no test coverage detected