(t *testing.T)
| 27 | } |
| 28 | |
| 29 | func TestFuncAdminNetworkErrorClosesControllerConnection(t *testing.T) { |
| 30 | // Goal (IBM/sarama#1162): verify controller reconnection semantics after a TCP reset. |
| 31 | // Expected flow: |
| 32 | // 1) First metadata request succeeds. |
| 33 | // 2) Injected TCP reset makes the next metadata request fail. |
| 34 | // 3) Explicit Open triggers automatic reconnection and the subsequent metadata request succeeds. |
| 35 | checkKafkaVersion(t, "0.11.0.0") |
| 36 | setupFunctionalTest(t) |
| 37 | defer teardownFunctionalTest(t) |
| 38 | |
| 39 | kafkaVersion, err := ParseKafkaVersion(FunctionalTestEnv.KafkaVersion) |
| 40 | if err != nil { |
| 41 | t.Fatal(err) |
| 42 | } |
| 43 | |
| 44 | config := NewFunctionalTestConfig() |
| 45 | config.Version = kafkaVersion |
| 46 | adminClient, err := NewClusterAdmin(FunctionalTestEnv.KafkaBrokerAddrs, config) |
| 47 | if err != nil { |
| 48 | t.Fatal(err) |
| 49 | } |
| 50 | defer safeClose(t, adminClient) |
| 51 | |
| 52 | controller, err := adminClient.Controller() |
| 53 | if err != nil { |
| 54 | t.Fatal(err) |
| 55 | } |
| 56 | if controller.ID() < 0 { |
| 57 | t.Fatalf("expected controller broker ID to be resolved, got %d", controller.ID()) |
| 58 | } |
| 59 | |
| 60 | // Warm up the connection so the proxy toxic applies to an established TCP session. |
| 61 | metadataReq := NewMetadataRequest(config.Version, nil) |
| 62 | if _, err := controller.GetMetadata(metadataReq); err != nil { |
| 63 | t.Fatal(err) |
| 64 | } |
| 65 | |
| 66 | proxy := proxyForBrokerID(t, controller.ID()) |
| 67 | addResetPeerToxic(t, proxy) |
| 68 | defer resetProxies(t) |
| 69 | |
| 70 | if _, err := controller.GetMetadata(metadataReq); err == nil { |
| 71 | t.Fatal("expected metadata request to fail after injected network error") |
| 72 | } |
| 73 | // Ensure the injected reset is one-shot; otherwise the proxy will continue |
| 74 | // to reset new connections and make reconnection impossible. |
| 75 | resetProxies(t) |
| 76 | |
| 77 | // Trigger a reconnect path and retry. It should succeed after the automatic reconnection. |
| 78 | _ = controller.Open(config) |
| 79 | if _, err := controller.GetMetadata(metadataReq); err != nil { |
| 80 | t.Fatalf("expected metadata request to succeed after reopen, got %v", err) |
| 81 | } |
| 82 | } |
| 83 | |
| 84 | func TestFuncClientMetadata(t *testing.T) { |
| 85 | setupFunctionalTest(t) |
nothing calls this directly
no test coverage detected