TestXDSLabels tests that xDS Labels get emitted from OpenTelemetry metrics. This test configures OpenTelemetry with the CSM Plugin Option, and xDS Optional Labels turned on. It then configures a test balancer that updates labels, simulating the cluster_impl picker. It then makes a unary RPC, and exp
(t *testing.T)
| 439 | // test for xDS -> Stats handler and this tests -> OTel -> emission). It also |
| 440 | // tests the optional per call locality label in the same manner. |
| 441 | func (s) TestXDSLabels(t *testing.T) { |
| 442 | ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) |
| 443 | defer cancel() |
| 444 | balancerName := "o11y_balancer_xds_labels" |
| 445 | balancer.Register(base.NewBalancerBuilder(balancerName, &o11yPickerBuilder{}, base.Config{})) |
| 446 | reader := metric.NewManualReader() |
| 447 | provider := metric.NewMeterProvider(metric.WithReader(reader)) |
| 448 | ss := &stubserver.StubServer{ |
| 449 | UnaryCallF: func(_ context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { |
| 450 | return &testpb.SimpleResponse{Payload: &testpb.Payload{ |
| 451 | Body: make([]byte, len(in.GetPayload().GetBody())), |
| 452 | }}, nil |
| 453 | }, |
| 454 | } |
| 455 | |
| 456 | po := newPluginOption(ctx) |
| 457 | sc := fmt.Sprintf(`{"loadBalancingConfig": [{"%s": {}}]}`, balancerName) |
| 458 | dopts := []grpc.DialOption{dialOptionSetCSM(opentelemetry.Options{ |
| 459 | MetricsOptions: opentelemetry.MetricsOptions{ |
| 460 | MeterProvider: provider, |
| 461 | Metrics: opentelemetry.DefaultMetrics(), |
| 462 | OptionalLabels: []string{"csm.service_name", "csm.service_namespace_name", "grpc.lb.locality", "grpc.lb.backend_service", "grpc.client.call.custom"}, |
| 463 | }, |
| 464 | }, po), grpc.WithDefaultServiceConfig(sc)} |
| 465 | if err := ss.Start(nil, dopts...); err != nil { |
| 466 | t.Fatalf("Error starting endpoint server: %v", err) |
| 467 | } |
| 468 | |
| 469 | defer ss.Stop() |
| 470 | ss.Client.UnaryCall(estats.NewContextWithCustomLabel(ctx, "my-custom-label"), &testpb.SimpleRequest{Payload: &testpb.Payload{ |
| 471 | Body: make([]byte, 10000), |
| 472 | }}, grpc.UseCompressor(gzip.Name)) |
| 473 | |
| 474 | rm := &metricdata.ResourceMetrics{} |
| 475 | reader.Collect(ctx, rm) |
| 476 | |
| 477 | gotMetrics := map[string]metricdata.Metrics{} |
| 478 | for _, sm := range rm.ScopeMetrics { |
| 479 | for _, m := range sm.Metrics { |
| 480 | gotMetrics[m.Name] = m |
| 481 | } |
| 482 | } |
| 483 | |
| 484 | unaryMethodAttr := attribute.String("grpc.method", "grpc.testing.TestService/UnaryCall") |
| 485 | targetAttr := attribute.String("grpc.target", ss.Target) |
| 486 | unaryStatusAttr := attribute.String("grpc.status", "OK") |
| 487 | |
| 488 | serviceNameAttr := attribute.String("csm.service_name", "service_name_val") |
| 489 | serviceNamespaceAttr := attribute.String("csm.service_namespace_name", "service_namespace_val") |
| 490 | localityAttr := attribute.String("grpc.lb.locality", "grpc.lb.locality_val") |
| 491 | backendServiceAttr := attribute.String("grpc.lb.backend_service", "grpc.lb.backend_service_val") |
| 492 | meshIDAttr := attribute.String("csm.mesh_id", "unknown") |
| 493 | workloadCanonicalServiceAttr := attribute.String("csm.workload_canonical_service", "unknown") |
| 494 | remoteWorkloadTypeAttr := attribute.String("csm.remote_workload_type", "unknown") |
| 495 | remoteWorkloadCanonicalServiceAttr := attribute.String("csm.remote_workload_canonical_service", "unknown") |
| 496 | customLabelAttr := attribute.String("grpc.client.call.custom", "my-custom-label") |
| 497 | |
| 498 | unaryMethodClientSideEnd := []attribute.KeyValue{ |
nothing calls this directly
no test coverage detected