MCPcopy
hub / github.com/segmentio/kafka-go / TestIssue672

Function TestIssue672

transport_test.go:42–168  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

40}
41
42func TestIssue672(t *testing.T) {
43 // ensure the test times out if the bug is re-introduced
44 ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
45 defer cancel()
46
47 // we'll simulate a situation with one good topic and one bad topic (bad configuration)
48 const brokenTopicName = "bad-topic"
49 const okTopicName = "good-topic"
50
51 // make the connection pool think it's immediately ready to send
52 ready := make(chan struct{})
53 close(ready)
54
55 // allow the system to wake as much as it wants
56 wake := make(chan event)
57 defer close(wake)
58 go func() {
59 for {
60 select {
61 case <-ctx.Done():
62 return
63 case e := <-wake:
64 if e == nil {
65 return
66 }
67 e.trigger()
68 }
69 }
70 }()
71
72 // handle requests by immediately resolving them with a create topics response,
73 // the "bad topic" will have an error value
74 requests := make(chan connRequest, 1)
75 defer close(requests)
76 go func() {
77 request := <-requests
78 request.res.resolve(&createtopics.Response{
79 ThrottleTimeMs: 0,
80 Topics: []createtopics.ResponseTopic{
81 {
82 Name: brokenTopicName,
83 ErrorCode: int16(InvalidPartitionNumber),
84 ErrorMessage: InvalidPartitionNumber.Description(),
85 },
86 {
87 Name: okTopicName,
88 NumPartitions: 1,
89 ReplicationFactor: 1,
90 },
91 },
92 })
93 }()
94
95 pool := &connPool{
96 ready: ready,
97 wake: wake,
98 conns: map[int32]*connGroup{},
99 }

Callers

nothing calls this directly

Calls 7

setStateMethod · 0.95
roundTripMethod · 0.95
DoneMethod · 0.80
resolveMethod · 0.80
DescriptionMethod · 0.80
triggerMethod · 0.45
ErrMethod · 0.45

Tested by

no test coverage detected