MCPcopy
hub / github.com/IBM/sarama / staleControllerAdmin

Function staleControllerAdmin

admin_test.go:1485–1522  ·  view source on GitHub ↗

staleControllerAdmin wires two mock brokers into a controller-failover scenario and returns a ready admin client. Broker 1 answers reqType with errResp and flips fresh metadata to name broker 2 as controller, modeling a stale cached controller; broker 2 answers with okResp. retriedOnNewController re

(t *testing.T, version KafkaVersion, reqType string, errResp, okResp requestHandlerFunc)

Source from the content-addressed store, hash-verified

1483// stale cached controller; broker 2 answers with okResp. retriedOnNewController
1484// reports whether broker 2 received the request.
1485func staleControllerAdmin(t *testing.T, version KafkaVersion, reqType string, errResp, okResp requestHandlerFunc) (admin ClusterAdmin, retriedOnNewController func() bool) {
1486 b1 := NewMockBroker(t, 1)
1487 b2 := NewMockBroker(t, 2)
1488 t.Cleanup(func() { b1.Close(); b2.Close() })
1489
1490 var controllerID atomic.Int32
1491 controllerID.Store(b1.BrokerID())
1492 meta := func(req *request) encoderWithHeader {
1493 return NewMockMetadataResponse(t).SetController(controllerID.Load()).
1494 SetBroker(b1.Addr(), b1.BrokerID()).
1495 SetBroker(b2.Addr(), b2.BrokerID()).For(req.body)
1496 }
1497
1498 var newControllerReceivedRequest atomic.Bool
1499 b1.SetHandlerFuncByMap(map[string]requestHandlerFunc{
1500 "MetadataRequest": meta,
1501 reqType: func(req *request) encoderWithHeader {
1502 controllerID.Store(b2.BrokerID()) // flip so the post-error refresh elects broker 2
1503 return errResp(req)
1504 },
1505 })
1506 b2.SetHandlerFuncByMap(map[string]requestHandlerFunc{
1507 "MetadataRequest": meta,
1508 reqType: func(req *request) encoderWithHeader {
1509 newControllerReceivedRequest.Store(true)
1510 return okResp(req)
1511 },
1512 })
1513
1514 config := NewTestConfig()
1515 config.Version = version
1516 config.Admin.Retry.Backoff = time.Millisecond
1517 admin, err := NewClusterAdmin([]string{b1.Addr()}, config)
1518 require.NoError(t, err)
1519 t.Cleanup(func() { _ = admin.Close() })
1520
1521 return admin, newControllerReceivedRequest.Load
1522}
1523
1524func TestClusterAdminListAcls(t *testing.T) {
1525 resourceName := "my_topic"

Calls 13

CloseMethod · 0.95
BrokerIDMethod · 0.95
AddrMethod · 0.95
SetHandlerFuncByMapMethod · 0.95
CloseMethod · 0.95
NewMockBrokerFunction · 0.85
NewMockMetadataResponseFunction · 0.85
NewClusterAdminFunction · 0.85
SetBrokerMethod · 0.80
SetControllerMethod · 0.80
NewTestConfigFunction · 0.70
CleanupMethod · 0.65

Tested by

no test coverage detected