MCPcopy
hub / github.com/grpc/grpc-go / runDisconnectScenario

Function runDisconnectScenario

stats/opentelemetry/e2e_test.go:2231–2344  ·  view source on GitHub ↗

runDisconnectScenario sets up a functional xDS environment with OpenTelemetry enabled. It performs the following steps: 1. Starts a backend and configures an xDS management server. 2. Establishes a gRPC connection using a custom dialer to capture the underlying transport. 3. Performs a warm-up RPC t

(t *testing.T, name, wantLabel string, action func(*stubserver.StubServer, *grpc.ClientConn, func(error)))

Source from the content-addressed store, hash-verified

2229// 4. Executes the provided 'action' callback to trigger a specific disconnection event.
2230// 5. Polls the OpenTelemetry metric reader to verify the 'grpc.disconnect_error' label matches wantLabel.
2231func runDisconnectScenario(t *testing.T, name, wantLabel string, action func(*stubserver.StubServer, *grpc.ClientConn, func(error))) {
2232 backend := stubserver.StartTestService(t, nil)
2233 port := itestutils.ParsePort(t, backend.Address)
2234 defer backend.Stop()
2235
2236 mgmtServer, nodeID, _, xdsResolver := setup.ManagementServerAndResolver(t)
2237
2238 serviceName := "service-" + name
2239 clusterName := "cluster-" + serviceName
2240 resources := e2e.DefaultClientResources(e2e.ResourceParams{
2241 DialTarget: serviceName,
2242 NodeID: nodeID,
2243 Host: "localhost",
2244 Port: port,
2245 SecLevel: e2e.SecurityLevelNone,
2246 })
2247
2248 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
2249 defer cancel()
2250
2251 if err := mgmtServer.Update(ctx, resources); err != nil {
2252 t.Fatalf("Failed to update xDS resources: %v", err)
2253 }
2254
2255 // Setup Telemetry
2256 reader := metric.NewManualReader()
2257 provider := metric.NewMeterProvider(metric.WithReader(reader))
2258 mo := opentelemetry.MetricsOptions{
2259 MeterProvider: provider,
2260 Metrics: opentelemetry.DefaultMetrics().Add("grpc.subchannel.disconnections"),
2261 OptionalLabels: []string{"grpc.lb.locality", "grpc.lb.backend_service", "grpc.disconnect_error"},
2262 }
2263
2264 connCh := make(chan *errorConn, 1)
2265 dialer := func(ctx context.Context, addr string) (net.Conn, error) {
2266 conn, err := (&net.Dialer{}).DialContext(ctx, "tcp", addr)
2267 if err != nil {
2268 return nil, err
2269 }
2270 activeConn := &errorConn{Conn: conn}
2271 select {
2272 case connCh <- activeConn:
2273 case <-ctx.Done():
2274 activeConn.Conn.Close()
2275 return nil, ctx.Err()
2276 }
2277 return activeConn, nil
2278 }
2279
2280 target := fmt.Sprintf("xds:///%s", serviceName)
2281 dopts := []grpc.DialOption{
2282 grpc.WithTransportCredentials(insecure.NewCredentials()),
2283 grpc.WithResolvers(xdsResolver),
2284 grpc.WithContextDialer(dialer),
2285 opentelemetry.DialOption(opentelemetry.Options{MetricsOptions: mo}),
2286 }
2287 cc, err := grpc.NewClient(target, dopts...)
2288 if err != nil {

Callers 1

Calls 15

EmptyCallMethod · 0.95
setErrorMethod · 0.95
StartTestServiceFunction · 0.92
DefaultClientResourcesFunction · 0.92
DefaultMetricsFunction · 0.92
WithTransportCredentialsFunction · 0.92
NewCredentialsFunction · 0.92
WithResolversFunction · 0.92
WithContextDialerFunction · 0.92
DialOptionFunction · 0.92
NewClientFunction · 0.92

Tested by

no test coverage detected