notifyWaiters checks if any waiters can proceed and notifies them in FIFO order. This is called after every state transition.
()
| 283 | // notifyWaiters checks if any waiters can proceed and notifies them in FIFO order. |
| 284 | // This is called after every state transition. |
| 285 | func (sm *ConnStateMachine) notifyWaiters() { |
| 286 | // Fast path: check atomic counter without acquiring lock |
| 287 | // This eliminates mutex overhead in the common case (no waiters) |
| 288 | if sm.waiterCount.Load() == 0 { |
| 289 | return |
| 290 | } |
| 291 | |
| 292 | sm.mu.Lock() |
| 293 | defer sm.mu.Unlock() |
| 294 | |
| 295 | // Double-check after acquiring lock (waiters might have been processed) |
| 296 | if sm.waiters.Len() == 0 { |
| 297 | return |
| 298 | } |
| 299 | |
| 300 | // Track state locally so we only consider transitions made within this |
| 301 | // call, not concurrent transitions from woken goroutines. Re-reading the |
| 302 | // atomic would let a fast goroutine's Transition(StateIdle) leak into our |
| 303 | // view, causing us to wake multiple waiters at once and breaking FIFO |
| 304 | // execution ordering. |
| 305 | currentState := sm.GetState() |
| 306 | |
| 307 | for { |
| 308 | processed := false |
| 309 | |
| 310 | for elem := sm.waiters.Front(); elem != nil; elem = elem.Next() { |
| 311 | w := elem.Value.(*waiter) |
| 312 | |
| 313 | if _, valid := w.validStates[currentState]; valid { |
| 314 | sm.waiters.Remove(elem) |
| 315 | sm.waiterCount.Add(-1) |
| 316 | |
| 317 | if sm.state.CompareAndSwap(uint32(currentState), uint32(w.targetState)) { |
| 318 | w.done <- nil |
| 319 | currentState = w.targetState |
| 320 | processed = true |
| 321 | break |
| 322 | } else { |
| 323 | sm.waiters.PushFront(w) |
| 324 | sm.waiterCount.Add(1) |
| 325 | currentState = sm.GetState() |
| 326 | processed = true |
| 327 | break |
| 328 | } |
| 329 | } |
| 330 | } |
| 331 | |
| 332 | if !processed { |
| 333 | break |
| 334 | } |
| 335 | } |
| 336 | } |