MCPcopy
hub / github.com/IBM/sarama / closeProducerWithTimeout

Function closeProducerWithTimeout

helpers_test.go:19–60  ·  view source on GitHub ↗
(t *testing.T, p AsyncProducer, timeout time.Duration)

Source from the content-addressed store, hash-verified

17}
18
19func closeProducerWithTimeout(t *testing.T, p AsyncProducer, timeout time.Duration) {
20 var wg sync.WaitGroup
21 p.AsyncClose()
22
23 closer := make(chan struct{})
24 timer := time.AfterFunc(timeout, func() {
25 t.Error("timeout")
26 close(closer)
27 })
28 defer timer.Stop()
29
30 wg.Add(2)
31 go func() {
32 defer wg.Done()
33 for {
34 select {
35 case <-closer:
36 return
37 case _, ok := <-p.Successes():
38 if !ok {
39 return
40 }
41 t.Error("Unexpected message on Successes()")
42 }
43 }
44 }()
45 go func() {
46 defer wg.Done()
47 for {
48 select {
49 case <-closer:
50 return
51 case msg, ok := <-p.Errors():
52 if !ok {
53 return
54 }
55 t.Error(msg.Err)
56 }
57 }
58 }()
59 wg.Wait()
60}
61
62func closeProducer(t *testing.T, p AsyncProducer) {
63 closeProducerWithTimeout(t, p, 5*time.Minute)

Callers 2

closeProducerFunction · 0.85

Calls 7

StopMethod · 0.80
AsyncCloseMethod · 0.65
ErrorMethod · 0.65
DoneMethod · 0.65
SuccessesMethod · 0.65
ErrorsMethod · 0.65
AddMethod · 0.45

Tested by

no test coverage detected