(t *testing.T)
| 199 | } |
| 200 | |
| 201 | func TestBrokerClose(t *testing.T) { |
| 202 | t.Run("interrupts active async response reads", func(t *testing.T) { |
| 203 | mockBroker := NewMockBroker(t, 0) |
| 204 | defer mockBroker.Close() |
| 205 | |
| 206 | broker := NewBroker(mockBroker.Addr()) |
| 207 | conf := NewTestConfig() |
| 208 | conf.ApiVersionsRequest = false |
| 209 | conf.Net.ReadTimeout = 30 * time.Second |
| 210 | |
| 211 | require.NoError(t, broker.Open(conf)) |
| 212 | |
| 213 | request := ProduceRequest{} |
| 214 | request.RequiredAcks = WaitForLocal |
| 215 | |
| 216 | responseErrs := make(chan error, 1) |
| 217 | err := broker.AsyncProduce(&request, func(_ *ProduceResponse, err error) { |
| 218 | responseErrs <- err |
| 219 | }) |
| 220 | require.NoError(t, err) |
| 221 | |
| 222 | closeErrs := make(chan error, 1) |
| 223 | go func() { |
| 224 | closeErrs <- broker.Close() |
| 225 | }() |
| 226 | |
| 227 | select { |
| 228 | case err := <-closeErrs: |
| 229 | require.NoError(t, err) |
| 230 | case <-time.After(250 * time.Millisecond): |
| 231 | require.FailNow(t, "Close blocked with an active async response read") |
| 232 | } |
| 233 | |
| 234 | select { |
| 235 | case err := <-responseErrs: |
| 236 | require.Error(t, err) |
| 237 | case <-time.After(time.Second): |
| 238 | require.FailNow(t, "timed out waiting for the async produce callback") |
| 239 | } |
| 240 | |
| 241 | connected, err := broker.Connected() |
| 242 | require.NoError(t, err) |
| 243 | require.False(t, connected) |
| 244 | }) |
| 245 | } |
| 246 | |
| 247 | // closeImmediatelyDialer is a test dialer that returns a net.Conn whose peer is |
| 248 | // already closed. This reliably triggers a transport-level failure (e.g. EOF) |
nothing calls this directly
no test coverage detected