MCPcopy
hub / github.com/grpc/grpc-go / RouteAndProcess

Function RouteAndProcess

internal/xds/server/routing.go:39–99  ·  view source on GitHub ↗

RouteAndProcess routes the incoming RPC to a configured route in the route table and also processes the RPC by running the incoming RPC through any HTTP Filters configured.

(ctx context.Context)

Source from the content-addressed store, hash-verified

37// table and also processes the RPC by running the incoming RPC through any HTTP
38// Filters configured.
39func RouteAndProcess(ctx context.Context) error {
40 conn := transport.GetConnection(ctx)
41 cw, ok := conn.(*connWrapper)
42 if !ok {
43 return errors.New("missing virtual hosts in incoming context")
44 }
45
46 rc := cw.urc.Load()
47 // Error out at routing l7 level with a status code UNAVAILABLE, represents
48 // an nack before usable route configuration or resource not found for RDS
49 // or error combining LDS + RDS (Shouldn't happen).
50 if rc.err != nil {
51 if logger.V(2) {
52 logger.Infof("RPC on connection with xDS Configuration error: %v", rc.err)
53 }
54 return status.Error(codes.Unavailable, fmt.Sprintf("error from xDS configuration for matched route configuration: %v", rc.err))
55 }
56
57 mn, ok := grpc.Method(ctx)
58 if !ok {
59 return errors.New("missing method name in incoming context")
60 }
61 md, ok := metadata.FromIncomingContext(ctx)
62 if !ok {
63 return errors.New("missing metadata in incoming context")
64 }
65 // A41 added logic to the core grpc implementation to guarantee that once
66 // the RPC gets to this point, there will be a single, unambiguous authority
67 // present in the header map.
68 authority := md.Get(":authority")
69 // authority[0] is safe because of the guarantee mentioned above.
70 vh := findBestMatchingVirtualHostServer(authority[0], rc.vhs)
71 if vh == nil {
72 return rc.statusErrWithNodeID(codes.Unavailable, "the incoming RPC did not match a configured Virtual Host")
73 }
74
75 var rwi *routeWithInterceptors
76 rpcInfo := iresolver.RPCInfo{
77 Context: ctx,
78 Method: mn,
79 }
80 for _, r := range vh.routes {
81 if r.matcher.Match(rpcInfo) {
82 // "NonForwardingAction is expected for all Routes used on
83 // server-side; a route with an inappropriate action causes RPCs
84 // matching that route to fail with UNAVAILABLE." - A36
85 if r.actionType != xdsresource.RouteActionNonForwardingAction {
86 return rc.statusErrWithNodeID(codes.Unavailable, "the incoming RPC matched to a route that was not of action type non forwarding")
87 }
88 rwi = &r
89 break
90 }
91 }
92 if rwi == nil {
93 return rc.statusErrWithNodeID(codes.Unavailable, "the incoming RPC did not match a configured Route")
94 }
95 if err := rwi.interceptor.AllowRPC(ctx); err != nil {
96 return rc.statusErrWithNodeID(codes.PermissionDenied, "Incoming RPC is not allowed: %v", err)

Callers 2

xdsUnaryInterceptorFunction · 0.92
xdsStreamInterceptorFunction · 0.92

Calls 12

GetConnectionFunction · 0.92
ErrorFunction · 0.92
MethodFunction · 0.92
FromIncomingContextFunction · 0.92
statusErrWithNodeIDMethod · 0.80
VMethod · 0.65
InfofMethod · 0.65
GetMethod · 0.65
MatchMethod · 0.65
AllowRPCMethod · 0.65
LoadMethod · 0.45

Tested by

no test coverage detected