TestCSMPluginOptionStreaming tests the CSM Plugin Option and labels. It configures the environment for the CSM Plugin Option to read from. It then configures a system with a gRPC Client and gRPC server with the OpenTelemetry Dial and Server Option configured with a CSM Plugin Option with a certain s
(t *testing.T)
| 266 | // provided OpenTelemetry SDK's Meter Provider. The CSM Labels emitted from the |
| 267 | // plugin option should be attached to the relevant metrics. |
| 268 | func (s) TestCSMPluginOptionStreaming(t *testing.T) { |
| 269 | resourceDetectorEmissions := map[string]string{ |
| 270 | "cloud.platform": "gcp_kubernetes_engine", |
| 271 | "cloud.region": "cloud_region_val", // availability_zone isn't present, so this should become location |
| 272 | "cloud.account.id": "cloud_account_id_val", |
| 273 | "k8s.namespace.name": "k8s_namespace_name_val", |
| 274 | "k8s.cluster.name": "k8s_cluster_name_val", |
| 275 | } |
| 276 | const meshID = "mesh_id" |
| 277 | const csmCanonicalServiceName = "csm_canonical_service_name" |
| 278 | const csmWorkloadName = "csm_workload_name" |
| 279 | setupEnv(t, resourceDetectorEmissions, meshID, csmCanonicalServiceName, csmWorkloadName) |
| 280 | |
| 281 | attributesWant := map[string]string{ |
| 282 | "csm.workload_canonical_service": csmCanonicalServiceName, // from env |
| 283 | "csm.mesh_id": "mesh_id", // from bootstrap env var |
| 284 | |
| 285 | // No xDS Labels - this happens in a test below. |
| 286 | |
| 287 | "csm.remote_workload_type": "gcp_kubernetes_engine", |
| 288 | "csm.remote_workload_canonical_service": csmCanonicalServiceName, |
| 289 | "csm.remote_workload_project_id": "cloud_account_id_val", |
| 290 | "csm.remote_workload_cluster_name": "k8s_cluster_name_val", |
| 291 | "csm.remote_workload_namespace_name": "k8s_namespace_name_val", |
| 292 | "csm.remote_workload_location": "cloud_region_val", |
| 293 | "csm.remote_workload_name": csmWorkloadName, |
| 294 | } |
| 295 | |
| 296 | var csmLabels []attribute.KeyValue |
| 297 | for k, v := range attributesWant { |
| 298 | csmLabels = append(csmLabels, attribute.String(k, v)) |
| 299 | } |
| 300 | ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) |
| 301 | defer cancel() |
| 302 | tests := []struct { |
| 303 | name string |
| 304 | // To test the different operations for Streaming RPC's from the |
| 305 | // interceptor level that can plumb metadata exchange header in. |
| 306 | streamingCallFunc func(stream testgrpc.TestService_FullDuplexCallServer) error |
| 307 | opts itestutils.MetricDataOptions |
| 308 | }{ |
| 309 | { |
| 310 | name: "trailers-only", |
| 311 | streamingCallFunc: func(stream testgrpc.TestService_FullDuplexCallServer) error { |
| 312 | for { |
| 313 | if _, err := stream.Recv(); err == io.EOF { |
| 314 | return nil |
| 315 | } |
| 316 | } |
| 317 | }, |
| 318 | opts: itestutils.MetricDataOptions{ |
| 319 | CSMLabels: csmLabels, |
| 320 | }, |
| 321 | }, |
| 322 | { |
| 323 | name: "set-header", |
| 324 | streamingCallFunc: func(stream testgrpc.TestService_FullDuplexCallServer) error { |
| 325 | stream.SetHeader(metadata.New(map[string]string{"some-metadata": "some-metadata-val"})) |
nothing calls this directly
no test coverage detected