| 168 | } |
| 169 | |
| 170 | func TestIssue806(t *testing.T) { |
| 171 | // ensure the test times out if the bug is re-introduced |
| 172 | ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) |
| 173 | defer cancel() |
| 174 | |
| 175 | // simulate unknown topic want auto create with unknownTopicName, |
| 176 | const unknownTopicName = "unknown-topic" |
| 177 | const okTopicName = "good-topic" |
| 178 | |
| 179 | // make the connection pool think it's immediately ready to send |
| 180 | ready := make(chan struct{}) |
| 181 | close(ready) |
| 182 | |
| 183 | // allow the system to wake as much as it wants |
| 184 | wake := make(chan event) |
| 185 | defer close(wake) |
| 186 | go func() { |
| 187 | for { |
| 188 | select { |
| 189 | case <-ctx.Done(): |
| 190 | return |
| 191 | case e := <-wake: |
| 192 | if e == nil { |
| 193 | return |
| 194 | } |
| 195 | e.trigger() |
| 196 | } |
| 197 | } |
| 198 | }() |
| 199 | |
| 200 | // handle requests by immediately resolving them with a create topics response, |
| 201 | // the "unknown topic" will have err UNKNOWN_TOPIC_OR_PARTITION |
| 202 | requests := make(chan connRequest, 1) |
| 203 | defer close(requests) |
| 204 | go func() { |
| 205 | request := <-requests |
| 206 | request.res.resolve(&meta.Response{ |
| 207 | Topics: []meta.ResponseTopic{ |
| 208 | { |
| 209 | Name: unknownTopicName, |
| 210 | ErrorCode: int16(UnknownTopicOrPartition), |
| 211 | }, |
| 212 | { |
| 213 | Name: okTopicName, |
| 214 | Partitions: []meta.ResponsePartition{ |
| 215 | { |
| 216 | PartitionIndex: 0, |
| 217 | }, |
| 218 | }, |
| 219 | }, |
| 220 | }, |
| 221 | }) |
| 222 | }() |
| 223 | |
| 224 | pool := &connPool{ |
| 225 | ready: ready, |
| 226 | wake: wake, |
| 227 | conns: map[int32]*connGroup{}, |