| 218 | } |
| 219 | |
| 220 | func (p *picker) Pick(pInfo balancer.PickInfo) (balancer.PickResult, error) { |
| 221 | var pickedEndpointState *endpointState |
| 222 | var pickedEndpointNumRPCs int32 |
| 223 | for i := 0; i < int(p.choiceCount); i++ { |
| 224 | index := randuint32() % uint32(len(p.endpointStates)) |
| 225 | endpointState := p.endpointStates[index] |
| 226 | n := endpointState.numRPCs.Load() |
| 227 | if pickedEndpointState == nil || n < pickedEndpointNumRPCs { |
| 228 | pickedEndpointState = &endpointState |
| 229 | pickedEndpointNumRPCs = n |
| 230 | } |
| 231 | } |
| 232 | result, err := pickedEndpointState.picker.Pick(pInfo) |
| 233 | if err != nil { |
| 234 | return result, err |
| 235 | } |
| 236 | // "The counter for a subchannel should be atomically incremented by one |
| 237 | // after it has been successfully picked by the picker." - A48 |
| 238 | pickedEndpointState.numRPCs.Add(1) |
| 239 | // "the picker should add a callback for atomically decrementing the |
| 240 | // subchannel counter once the RPC finishes (regardless of Status code)." - |
| 241 | // A48. |
| 242 | originalDone := result.Done |
| 243 | result.Done = func(info balancer.DoneInfo) { |
| 244 | pickedEndpointState.numRPCs.Add(-1) |
| 245 | if originalDone != nil { |
| 246 | originalDone(info) |
| 247 | } |
| 248 | } |
| 249 | return result, nil |
| 250 | } |