runDisconnectScenario sets up a functional xDS environment with OpenTelemetry enabled. It performs the following steps: 1. Starts a backend and configures an xDS management server. 2. Establishes a gRPC connection using a custom dialer to capture the underlying transport. 3. Performs a warm-up RPC t
(t *testing.T, name, wantLabel string, action func(*stubserver.StubServer, *grpc.ClientConn, func(error)))
| 2229 | // 4. Executes the provided 'action' callback to trigger a specific disconnection event. |
| 2230 | // 5. Polls the OpenTelemetry metric reader to verify the 'grpc.disconnect_error' label matches wantLabel. |
| 2231 | func runDisconnectScenario(t *testing.T, name, wantLabel string, action func(*stubserver.StubServer, *grpc.ClientConn, func(error))) { |
| 2232 | backend := stubserver.StartTestService(t, nil) |
| 2233 | port := itestutils.ParsePort(t, backend.Address) |
| 2234 | defer backend.Stop() |
| 2235 | |
| 2236 | mgmtServer, nodeID, _, xdsResolver := setup.ManagementServerAndResolver(t) |
| 2237 | |
| 2238 | serviceName := "service-" + name |
| 2239 | clusterName := "cluster-" + serviceName |
| 2240 | resources := e2e.DefaultClientResources(e2e.ResourceParams{ |
| 2241 | DialTarget: serviceName, |
| 2242 | NodeID: nodeID, |
| 2243 | Host: "localhost", |
| 2244 | Port: port, |
| 2245 | SecLevel: e2e.SecurityLevelNone, |
| 2246 | }) |
| 2247 | |
| 2248 | ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) |
| 2249 | defer cancel() |
| 2250 | |
| 2251 | if err := mgmtServer.Update(ctx, resources); err != nil { |
| 2252 | t.Fatalf("Failed to update xDS resources: %v", err) |
| 2253 | } |
| 2254 | |
| 2255 | // Setup Telemetry |
| 2256 | reader := metric.NewManualReader() |
| 2257 | provider := metric.NewMeterProvider(metric.WithReader(reader)) |
| 2258 | mo := opentelemetry.MetricsOptions{ |
| 2259 | MeterProvider: provider, |
| 2260 | Metrics: opentelemetry.DefaultMetrics().Add("grpc.subchannel.disconnections"), |
| 2261 | OptionalLabels: []string{"grpc.lb.locality", "grpc.lb.backend_service", "grpc.disconnect_error"}, |
| 2262 | } |
| 2263 | |
| 2264 | connCh := make(chan *errorConn, 1) |
| 2265 | dialer := func(ctx context.Context, addr string) (net.Conn, error) { |
| 2266 | conn, err := (&net.Dialer{}).DialContext(ctx, "tcp", addr) |
| 2267 | if err != nil { |
| 2268 | return nil, err |
| 2269 | } |
| 2270 | activeConn := &errorConn{Conn: conn} |
| 2271 | select { |
| 2272 | case connCh <- activeConn: |
| 2273 | case <-ctx.Done(): |
| 2274 | activeConn.Conn.Close() |
| 2275 | return nil, ctx.Err() |
| 2276 | } |
| 2277 | return activeConn, nil |
| 2278 | } |
| 2279 | |
| 2280 | target := fmt.Sprintf("xds:///%s", serviceName) |
| 2281 | dopts := []grpc.DialOption{ |
| 2282 | grpc.WithTransportCredentials(insecure.NewCredentials()), |
| 2283 | grpc.WithResolvers(xdsResolver), |
| 2284 | grpc.WithContextDialer(dialer), |
| 2285 | opentelemetry.DialOption(opentelemetry.Options{MetricsOptions: mo}), |
| 2286 | } |
| 2287 | cc, err := grpc.NewClient(target, dopts...) |
| 2288 | if err != nil { |
no test coverage detected