(t *testing.T)
| 336 | } |
| 337 | |
| 338 | func TestBrokerFetch(t *testing.T) { |
| 339 | t.Run("metric mark does not race with concurrent reopen", func(t *testing.T) { |
| 340 | mb := NewMockBroker(t, 1) |
| 341 | defer mb.Close() |
| 342 | mb.SetHandlerByMap(map[string]MockResponse{ |
| 343 | "FetchRequest": NewMockFetchResponse(t, 1), |
| 344 | }) |
| 345 | |
| 346 | broker := NewBroker(mb.Addr()) |
| 347 | conf := NewTestConfig() |
| 348 | conf.Version = V0_11_0_0 |
| 349 | require.NoError(t, broker.Open(conf)) |
| 350 | t.Cleanup(func() { _ = broker.Close() }) |
| 351 | |
| 352 | req := &FetchRequest{MaxWaitTime: 100, MinBytes: 1, Version: 4} |
| 353 | |
| 354 | var wg sync.WaitGroup |
| 355 | wg.Go(func() { |
| 356 | for range 500 { |
| 357 | _, _ = broker.Fetch(req) |
| 358 | } |
| 359 | }) |
| 360 | wg.Go(func() { |
| 361 | for range 50 { |
| 362 | _ = broker.Close() |
| 363 | _ = broker.Open(conf) |
| 364 | } |
| 365 | }) |
| 366 | wg.Wait() |
| 367 | }) |
| 368 | } |
| 369 | |
| 370 | var ErrTokenFailure = errors.New("Failure generating token") |
| 371 |
nothing calls this directly
no test coverage detected