MCPcopy
hub / github.com/segmentio/kafka-go / TestIssue806

Function TestIssue806

transport_test.go:170–306  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

168}
169
170func TestIssue806(t *testing.T) {
171 // ensure the test times out if the bug is re-introduced
172 ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
173 defer cancel()
174
175 // simulate unknown topic want auto create with unknownTopicName,
176 const unknownTopicName = "unknown-topic"
177 const okTopicName = "good-topic"
178
179 // make the connection pool think it's immediately ready to send
180 ready := make(chan struct{})
181 close(ready)
182
183 // allow the system to wake as much as it wants
184 wake := make(chan event)
185 defer close(wake)
186 go func() {
187 for {
188 select {
189 case <-ctx.Done():
190 return
191 case e := <-wake:
192 if e == nil {
193 return
194 }
195 e.trigger()
196 }
197 }
198 }()
199
200 // handle requests by immediately resolving them with a create topics response,
201 // the "unknown topic" will have err UNKNOWN_TOPIC_OR_PARTITION
202 requests := make(chan connRequest, 1)
203 defer close(requests)
204 go func() {
205 request := <-requests
206 request.res.resolve(&meta.Response{
207 Topics: []meta.ResponseTopic{
208 {
209 Name: unknownTopicName,
210 ErrorCode: int16(UnknownTopicOrPartition),
211 },
212 {
213 Name: okTopicName,
214 Partitions: []meta.ResponsePartition{
215 {
216 PartitionIndex: 0,
217 },
218 },
219 },
220 },
221 })
222 }()
223
224 pool := &connPool{
225 ready: ready,
226 wake: wake,
227 conns: map[int32]*connGroup{},

Callers

nothing calls this directly

Calls 6

setStateMethod · 0.95
roundTripMethod · 0.95
DoneMethod · 0.80
resolveMethod · 0.80
triggerMethod · 0.45
ErrMethod · 0.45

Tested by

no test coverage detected