MCPcopy
hub / github.com/IBM/sarama / TestBrokerClose

Function TestBrokerClose

broker_test.go:201–245  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

199}
200
201func TestBrokerClose(t *testing.T) {
202 t.Run("interrupts active async response reads", func(t *testing.T) {
203 mockBroker := NewMockBroker(t, 0)
204 defer mockBroker.Close()
205
206 broker := NewBroker(mockBroker.Addr())
207 conf := NewTestConfig()
208 conf.ApiVersionsRequest = false
209 conf.Net.ReadTimeout = 30 * time.Second
210
211 require.NoError(t, broker.Open(conf))
212
213 request := ProduceRequest{}
214 request.RequiredAcks = WaitForLocal
215
216 responseErrs := make(chan error, 1)
217 err := broker.AsyncProduce(&request, func(_ *ProduceResponse, err error) {
218 responseErrs <- err
219 })
220 require.NoError(t, err)
221
222 closeErrs := make(chan error, 1)
223 go func() {
224 closeErrs <- broker.Close()
225 }()
226
227 select {
228 case err := <-closeErrs:
229 require.NoError(t, err)
230 case <-time.After(250 * time.Millisecond):
231 require.FailNow(t, "Close blocked with an active async response read")
232 }
233
234 select {
235 case err := <-responseErrs:
236 require.Error(t, err)
237 case <-time.After(time.Second):
238 require.FailNow(t, "timed out waiting for the async produce callback")
239 }
240
241 connected, err := broker.Connected()
242 require.NoError(t, err)
243 require.False(t, connected)
244 })
245}
246
247// closeImmediatelyDialer is a test dialer that returns a net.Conn whose peer is
248// already closed. This reliably triggers a transport-level failure (e.g. EOF)

Callers

nothing calls this directly

Calls 11

CloseMethod · 0.95
AddrMethod · 0.95
OpenMethod · 0.95
AsyncProduceMethod · 0.95
CloseMethod · 0.95
ConnectedMethod · 0.95
NewMockBrokerFunction · 0.85
NewBrokerFunction · 0.85
RunMethod · 0.80
NewTestConfigFunction · 0.70
ErrorMethod · 0.65

Tested by

no test coverage detected