MCPcopy
hub / github.com/grpc/grpc-go / Pick

Method Pick

balancer/rls/picker.go:82–165  ·  view source on GitHub ↗

Pick makes the routing decision for every outbound RPC.

(info balancer.PickInfo)

Source from the content-addressed store, hash-verified

80
81// Pick makes the routing decision for every outbound RPC.
82func (p *rlsPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
83 if name := info.FullMethodName; !isFullMethodNameValid(name) {
84 return balancer.PickResult{}, fmt.Errorf("rls: method name %q is not of the form '/service/method", name)
85 }
86
87 // Build the request's keys using the key builders from LB config.
88 md, _ := metadata.FromOutgoingContext(info.Ctx)
89 reqKeys := p.kbm.RLSKey(md, p.origEndpoint, info.FullMethodName)
90
91 p.lb.cacheMu.Lock()
92 var pr balancer.PickResult
93 var err error
94
95 // Record metrics without the cache mutex held, to prevent lock contention
96 // between concurrent RPC's and their Pick calls. Metrics Recording can
97 // potentially be expensive.
98 metricsCallback := func() {}
99 defer func() {
100 p.lb.cacheMu.Unlock()
101 metricsCallback()
102 }()
103
104 // Lookup data cache and pending request map using request path and keys.
105 cacheKey := cacheKey{path: info.FullMethodName, keys: reqKeys.Str}
106 dcEntry := p.lb.dataCache.getEntry(cacheKey)
107 pendingEntry := p.lb.pendingMap[cacheKey]
108 now := time.Now()
109
110 switch {
111 // No data cache entry. No pending request.
112 case dcEntry == nil && pendingEntry == nil:
113 throttled := p.sendRouteLookupRequestLocked(cacheKey, &backoffState{bs: defaultBackoffStrategy}, reqKeys.Map, rlspb.RouteLookupRequest_REASON_MISS, "")
114 if throttled {
115 pr, metricsCallback, err = p.useDefaultPickIfPossible(info, errRLSThrottled)
116 return pr, err
117 }
118 return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
119
120 // No data cache entry. Pending request exits.
121 case dcEntry == nil && pendingEntry != nil:
122 return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
123
124 // Data cache hit. No pending request.
125 case dcEntry != nil && pendingEntry == nil:
126 if dcEntry.expiryTime.After(now) {
127 if !dcEntry.staleTime.IsZero() && dcEntry.staleTime.Before(now) && dcEntry.backoffTime.Before(now) {
128 p.sendRouteLookupRequestLocked(cacheKey, dcEntry.backoffState, reqKeys.Map, rlspb.RouteLookupRequest_REASON_STALE, dcEntry.headerData)
129 }
130 // Delegate to child policies.
131 pr, metricsCallback, err = p.delegateToChildPoliciesLocked(dcEntry, info)
132 return pr, err
133 }
134
135 // We get here only if the data cache entry has expired. If entry is in
136 // backoff, delegate to default target or fail the pick.
137 if dcEntry.backoffState != nil && dcEntry.backoffTime.After(now) {
138 // Avoid propagating the status code received on control plane RPCs to the
139 // data plane which can lead to unexpected outcomes as we do not control

Callers

nothing calls this directly

Calls 13

FromOutgoingContextFunction · 0.92
ErrorFunction · 0.92
isFullMethodNameValidFunction · 0.85
RLSKeyMethod · 0.80
getEntryMethod · 0.80
NowMethod · 0.80
ErrorfMethod · 0.65
ErrorMethod · 0.65
LockMethod · 0.45

Tested by

no test coverage detected