(t *testing.T)
| 297 | } |
| 298 | |
| 299 | func TestBrokerOpenSASLv1FailThenReopenTransportError(t *testing.T) { |
| 300 | t.Parallel() |
| 301 | |
| 302 | mockBroker := NewMockBroker(t, 0) |
| 303 | defer mockBroker.Close() |
| 304 | |
| 305 | mockBroker.SetHandlerByMap(map[string]MockResponse{ |
| 306 | "ApiVersionsRequest": NewMockApiVersionsResponse(t), |
| 307 | "SaslHandshakeRequest": NewMockSaslHandshakeResponse(t).SetEnabledMechanisms([]string{SASLTypeOAuth}), |
| 308 | "SaslAuthenticateRequest": NewMockSaslAuthenticateResponse(t).SetError(ErrSASLAuthenticationFailed), |
| 309 | }) |
| 310 | |
| 311 | broker := NewBroker(mockBroker.Addr()) |
| 312 | |
| 313 | // first Open: SASL v1 auth fails after b.responses channel is created |
| 314 | conf := NewTestConfig() |
| 315 | conf.Net.SASL.Enable = true |
| 316 | conf.Net.SASL.Mechanism = SASLTypeOAuth |
| 317 | conf.Net.SASL.TokenProvider = newTokenProvider(&AccessToken{Token: "test"}, nil) |
| 318 | conf.Version = V1_0_0_0 |
| 319 | |
| 320 | err := broker.Open(conf) |
| 321 | require.NoError(t, err) |
| 322 | |
| 323 | connected, connErr := broker.Connected() |
| 324 | require.False(t, connected) |
| 325 | require.Error(t, connErr) |
| 326 | |
| 327 | // second Open with a transport error during ApiVersions must not panic |
| 328 | // with "close of closed channel" |
| 329 | conf2 := NewTestConfig() |
| 330 | conf2.ApiVersionsRequest = true |
| 331 | conf2.Net.Proxy.Enable = true |
| 332 | conf2.Net.Proxy.Dialer = closeImmediatelyDialer{} |
| 333 | |
| 334 | require.NotErrorIs(t, broker.Open(conf2), ErrAlreadyConnected) |
| 335 | _, _ = broker.Connected() |
| 336 | } |
| 337 | |
| 338 | func TestBrokerFetch(t *testing.T) { |
| 339 | t.Run("metric mark does not race with concurrent reopen", func(t *testing.T) { |
nothing calls this directly
no test coverage detected