RouteAndProcess routes the incoming RPC to a configured route in the route table and also processes the RPC by running the incoming RPC through any HTTP Filters configured.
(ctx context.Context)
| 37 | // table and also processes the RPC by running the incoming RPC through any HTTP |
| 38 | // Filters configured. |
| 39 | func RouteAndProcess(ctx context.Context) error { |
| 40 | conn := transport.GetConnection(ctx) |
| 41 | cw, ok := conn.(*connWrapper) |
| 42 | if !ok { |
| 43 | return errors.New("missing virtual hosts in incoming context") |
| 44 | } |
| 45 | |
| 46 | rc := cw.urc.Load() |
| 47 | // Error out at routing l7 level with a status code UNAVAILABLE, represents |
| 48 | // an nack before usable route configuration or resource not found for RDS |
| 49 | // or error combining LDS + RDS (Shouldn't happen). |
| 50 | if rc.err != nil { |
| 51 | if logger.V(2) { |
| 52 | logger.Infof("RPC on connection with xDS Configuration error: %v", rc.err) |
| 53 | } |
| 54 | return status.Error(codes.Unavailable, fmt.Sprintf("error from xDS configuration for matched route configuration: %v", rc.err)) |
| 55 | } |
| 56 | |
| 57 | mn, ok := grpc.Method(ctx) |
| 58 | if !ok { |
| 59 | return errors.New("missing method name in incoming context") |
| 60 | } |
| 61 | md, ok := metadata.FromIncomingContext(ctx) |
| 62 | if !ok { |
| 63 | return errors.New("missing metadata in incoming context") |
| 64 | } |
| 65 | // A41 added logic to the core grpc implementation to guarantee that once |
| 66 | // the RPC gets to this point, there will be a single, unambiguous authority |
| 67 | // present in the header map. |
| 68 | authority := md.Get(":authority") |
| 69 | // authority[0] is safe because of the guarantee mentioned above. |
| 70 | vh := findBestMatchingVirtualHostServer(authority[0], rc.vhs) |
| 71 | if vh == nil { |
| 72 | return rc.statusErrWithNodeID(codes.Unavailable, "the incoming RPC did not match a configured Virtual Host") |
| 73 | } |
| 74 | |
| 75 | var rwi *routeWithInterceptors |
| 76 | rpcInfo := iresolver.RPCInfo{ |
| 77 | Context: ctx, |
| 78 | Method: mn, |
| 79 | } |
| 80 | for _, r := range vh.routes { |
| 81 | if r.matcher.Match(rpcInfo) { |
| 82 | // "NonForwardingAction is expected for all Routes used on |
| 83 | // server-side; a route with an inappropriate action causes RPCs |
| 84 | // matching that route to fail with UNAVAILABLE." - A36 |
| 85 | if r.actionType != xdsresource.RouteActionNonForwardingAction { |
| 86 | return rc.statusErrWithNodeID(codes.Unavailable, "the incoming RPC matched to a route that was not of action type non forwarding") |
| 87 | } |
| 88 | rwi = &r |
| 89 | break |
| 90 | } |
| 91 | } |
| 92 | if rwi == nil { |
| 93 | return rc.statusErrWithNodeID(codes.Unavailable, "the incoming RPC did not match a configured Route") |
| 94 | } |
| 95 | if err := rwi.interceptor.AllowRPC(ctx); err != nil { |
| 96 | return rc.statusErrWithNodeID(codes.PermissionDenied, "Incoming RPC is not allowed: %v", err) |
no test coverage detected