Tests that RPCs are routed according to endpoint hash key rather than endpoint first address if it is set in EDS endpoint metadata.
(t *testing.T)
| 2558 | // Tests that RPCs are routed according to endpoint hash key rather than |
| 2559 | // endpoint first address if it is set in EDS endpoint metadata. |
| 2560 | func (s) TestRingHash_EndpointHashKey(t *testing.T) { |
| 2561 | testutils.SetEnvConfig(t, &envconfig.XDSEndpointHashKeyBackwardCompat, false) |
| 2562 | |
| 2563 | backends := backendAddrs(startTestServiceBackends(t, 4)) |
| 2564 | |
| 2565 | const clusterName = "cluster" |
| 2566 | var backendOpts []e2e.BackendOptions |
| 2567 | for i, addr := range backends { |
| 2568 | var ports []uint32 |
| 2569 | ports = append(ports, testutils.ParsePort(t, addr)) |
| 2570 | backendOpts = append(backendOpts, e2e.BackendOptions{ |
| 2571 | Ports: ports, |
| 2572 | Metadata: map[string]any{"hash_key": strconv.Itoa(i)}, |
| 2573 | }) |
| 2574 | } |
| 2575 | endpoints := e2e.EndpointResourceWithOptions(e2e.EndpointOptions{ |
| 2576 | ClusterName: clusterName, |
| 2577 | Host: "localhost", |
| 2578 | Localities: []e2e.LocalityOptions{{ |
| 2579 | Backends: backendOpts, |
| 2580 | Weight: 1, |
| 2581 | }}, |
| 2582 | }) |
| 2583 | cluster := e2e.ClusterResourceWithOptions(e2e.ClusterOptions{ |
| 2584 | ClusterName: clusterName, |
| 2585 | ServiceName: clusterName, |
| 2586 | Policy: e2e.LoadBalancingPolicyRingHash, |
| 2587 | }) |
| 2588 | route := headerHashRoute("new_route", virtualHostName, clusterName, "address_hash") |
| 2589 | listener := e2e.DefaultClientListener(virtualHostName, route.Name) |
| 2590 | |
| 2591 | ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) |
| 2592 | defer cancel() |
| 2593 | |
| 2594 | xdsServer, nodeID, xdsResolver := setupManagementServerAndResolver(t) |
| 2595 | if err := xdsServer.Update(ctx, xdsUpdateOpts(nodeID, endpoints, cluster, route, listener)); err != nil { |
| 2596 | t.Fatalf("Failed to update xDS resources: %v", err) |
| 2597 | } |
| 2598 | |
| 2599 | opts := []grpc.DialOption{ |
| 2600 | grpc.WithResolvers(xdsResolver), |
| 2601 | grpc.WithTransportCredentials(insecure.NewCredentials()), |
| 2602 | } |
| 2603 | conn, err := grpc.NewClient("xds:///test.server", opts...) |
| 2604 | if err != nil { |
| 2605 | t.Fatalf("Failed to create client: %s", err) |
| 2606 | } |
| 2607 | defer conn.Close() |
| 2608 | client := testgrpc.NewTestServiceClient(conn) |
| 2609 | |
| 2610 | // Make sure RPCs are routed to backends according to the endpoint metadata |
| 2611 | // rather than their address. Note each type of RPC contains a header value |
| 2612 | // that will always be hashed to a specific backend as the header value |
| 2613 | // matches the endpoint metadata hash key. |
| 2614 | for i, backend := range backends { |
| 2615 | ctx := metadata.NewOutgoingContext(ctx, metadata.Pairs("address_hash", strconv.Itoa(i)+"_0")) |
| 2616 | numRPCs := 10 |
| 2617 | reqPerBackend := checkRPCSendOK(ctx, t, client, numRPCs) |
nothing calls this directly
no test coverage detected