MCPcopy
hub / github.com/IBM/sarama / TestFuncAdminNetworkErrorClosesControllerConnection

Function TestFuncAdminNetworkErrorClosesControllerConnection

functional_client_test.go:29–82  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

27}
28
29func TestFuncAdminNetworkErrorClosesControllerConnection(t *testing.T) {
30 // Goal (IBM/sarama#1162): verify controller reconnection semantics after a TCP reset.
31 // Expected flow:
32 // 1) First metadata request succeeds.
33 // 2) Injected TCP reset makes the next metadata request fail.
34 // 3) Explicit Open triggers automatic reconnection and the subsequent metadata request succeeds.
35 checkKafkaVersion(t, "0.11.0.0")
36 setupFunctionalTest(t)
37 defer teardownFunctionalTest(t)
38
39 kafkaVersion, err := ParseKafkaVersion(FunctionalTestEnv.KafkaVersion)
40 if err != nil {
41 t.Fatal(err)
42 }
43
44 config := NewFunctionalTestConfig()
45 config.Version = kafkaVersion
46 adminClient, err := NewClusterAdmin(FunctionalTestEnv.KafkaBrokerAddrs, config)
47 if err != nil {
48 t.Fatal(err)
49 }
50 defer safeClose(t, adminClient)
51
52 controller, err := adminClient.Controller()
53 if err != nil {
54 t.Fatal(err)
55 }
56 if controller.ID() < 0 {
57 t.Fatalf("expected controller broker ID to be resolved, got %d", controller.ID())
58 }
59
60 // Warm up the connection so the proxy toxic applies to an established TCP session.
61 metadataReq := NewMetadataRequest(config.Version, nil)
62 if _, err := controller.GetMetadata(metadataReq); err != nil {
63 t.Fatal(err)
64 }
65
66 proxy := proxyForBrokerID(t, controller.ID())
67 addResetPeerToxic(t, proxy)
68 defer resetProxies(t)
69
70 if _, err := controller.GetMetadata(metadataReq); err == nil {
71 t.Fatal("expected metadata request to fail after injected network error")
72 }
73 // Ensure the injected reset is one-shot; otherwise the proxy will continue
74 // to reset new connections and make reconnection impossible.
75 resetProxies(t)
76
77 // Trigger a reconnect path and retry. It should succeed after the automatic reconnection.
78 _ = controller.Open(config)
79 if _, err := controller.GetMetadata(metadataReq); err != nil {
80 t.Fatalf("expected metadata request to succeed after reopen, got %v", err)
81 }
82}
83
84func TestFuncClientMetadata(t *testing.T) {
85 setupFunctionalTest(t)

Callers

nothing calls this directly

Calls 15

ControllerMethod · 0.95
checkKafkaVersionFunction · 0.85
setupFunctionalTestFunction · 0.85
teardownFunctionalTestFunction · 0.85
ParseKafkaVersionFunction · 0.85
NewFunctionalTestConfigFunction · 0.85
NewClusterAdminFunction · 0.85
NewMetadataRequestFunction · 0.85
proxyForBrokerIDFunction · 0.85
addResetPeerToxicFunction · 0.85
resetProxiesFunction · 0.85
FatalMethod · 0.80

Tested by

no test coverage detected