singleBrokerAdmin wires a single mock broker that names itself controller, installs the given per-request handlers, and returns a ready admin client.
(t *testing.T, version KafkaVersion, handlers map[string]requestHandlerFunc)
| 1455 | // singleBrokerAdmin wires a single mock broker that names itself controller, |
| 1456 | // installs the given per-request handlers, and returns a ready admin client. |
| 1457 | func singleBrokerAdmin(t *testing.T, version KafkaVersion, handlers map[string]requestHandlerFunc) ClusterAdmin { |
| 1458 | b := NewMockBroker(t, 1) |
| 1459 | t.Cleanup(func() { b.Close() }) |
| 1460 | |
| 1461 | hm := map[string]requestHandlerFunc{ |
| 1462 | "MetadataRequest": func(req *request) encoderWithHeader { |
| 1463 | return NewMockMetadataResponse(t). |
| 1464 | SetController(b.BrokerID()). |
| 1465 | SetBroker(b.Addr(), b.BrokerID()).For(req.body) |
| 1466 | }, |
| 1467 | } |
| 1468 | maps.Copy(hm, handlers) |
| 1469 | b.SetHandlerFuncByMap(hm) |
| 1470 | |
| 1471 | config := NewTestConfig() |
| 1472 | config.Version = version |
| 1473 | admin, err := NewClusterAdmin([]string{b.Addr()}, config) |
| 1474 | require.NoError(t, err) |
| 1475 | t.Cleanup(func() { _ = admin.Close() }) |
| 1476 | |
| 1477 | return admin |
| 1478 | } |
| 1479 | |
| 1480 | // staleControllerAdmin wires two mock brokers into a controller-failover |
| 1481 | // scenario and returns a ready admin client. Broker 1 answers reqType with |
no test coverage detected