staleControllerAdmin wires two mock brokers into a controller-failover scenario and returns a ready admin client. Broker 1 answers reqType with errResp and flips fresh metadata to name broker 2 as controller, modeling a stale cached controller; broker 2 answers with okResp. retriedOnNewController re
(t *testing.T, version KafkaVersion, reqType string, errResp, okResp requestHandlerFunc)
| 1483 | // stale cached controller; broker 2 answers with okResp. retriedOnNewController |
| 1484 | // reports whether broker 2 received the request. |
| 1485 | func staleControllerAdmin(t *testing.T, version KafkaVersion, reqType string, errResp, okResp requestHandlerFunc) (admin ClusterAdmin, retriedOnNewController func() bool) { |
| 1486 | b1 := NewMockBroker(t, 1) |
| 1487 | b2 := NewMockBroker(t, 2) |
| 1488 | t.Cleanup(func() { b1.Close(); b2.Close() }) |
| 1489 | |
| 1490 | var controllerID atomic.Int32 |
| 1491 | controllerID.Store(b1.BrokerID()) |
| 1492 | meta := func(req *request) encoderWithHeader { |
| 1493 | return NewMockMetadataResponse(t).SetController(controllerID.Load()). |
| 1494 | SetBroker(b1.Addr(), b1.BrokerID()). |
| 1495 | SetBroker(b2.Addr(), b2.BrokerID()).For(req.body) |
| 1496 | } |
| 1497 | |
| 1498 | var newControllerReceivedRequest atomic.Bool |
| 1499 | b1.SetHandlerFuncByMap(map[string]requestHandlerFunc{ |
| 1500 | "MetadataRequest": meta, |
| 1501 | reqType: func(req *request) encoderWithHeader { |
| 1502 | controllerID.Store(b2.BrokerID()) // flip so the post-error refresh elects broker 2 |
| 1503 | return errResp(req) |
| 1504 | }, |
| 1505 | }) |
| 1506 | b2.SetHandlerFuncByMap(map[string]requestHandlerFunc{ |
| 1507 | "MetadataRequest": meta, |
| 1508 | reqType: func(req *request) encoderWithHeader { |
| 1509 | newControllerReceivedRequest.Store(true) |
| 1510 | return okResp(req) |
| 1511 | }, |
| 1512 | }) |
| 1513 | |
| 1514 | config := NewTestConfig() |
| 1515 | config.Version = version |
| 1516 | config.Admin.Retry.Backoff = time.Millisecond |
| 1517 | admin, err := NewClusterAdmin([]string{b1.Addr()}, config) |
| 1518 | require.NoError(t, err) |
| 1519 | t.Cleanup(func() { _ = admin.Close() }) |
| 1520 | |
| 1521 | return admin, newControllerReceivedRequest.Load |
| 1522 | } |
| 1523 | |
| 1524 | func TestClusterAdminListAcls(t *testing.T) { |
| 1525 | resourceName := "my_topic" |
no test coverage detected