queueHandoff queues a handoff request for processing if err is returned, connection will be removed from pool
(conn *pool.Conn)
| 265 | // queueHandoff queues a handoff request for processing |
| 266 | // if err is returned, connection will be removed from pool |
| 267 | func (hwm *handoffWorkerManager) queueHandoff(conn *pool.Conn) error { |
| 268 | // Get handoff info atomically to prevent race conditions |
| 269 | shouldHandoff, endpoint, seqID := conn.GetHandoffInfo() |
| 270 | |
| 271 | // on retries the connection will not be marked for handoff, but it will have retries > 0 |
| 272 | // if shouldHandoff is false and retries is 0, then we are not retrying and not do a handoff |
| 273 | if !shouldHandoff && conn.HandoffRetries() == 0 { |
| 274 | if internal.LogLevel.InfoOrAbove() { |
| 275 | internal.Logger.Printf(context.Background(), logs.ConnectionNotMarkedForHandoff(conn.GetID())) |
| 276 | } |
| 277 | return errors.New(logs.ConnectionNotMarkedForHandoffError(conn.GetID())) |
| 278 | } |
| 279 | |
| 280 | // Create handoff request with atomically retrieved data |
| 281 | request := HandoffRequest{ |
| 282 | Conn: conn, |
| 283 | ConnID: conn.GetID(), |
| 284 | Endpoint: endpoint, |
| 285 | SeqID: seqID, |
| 286 | Pool: hwm.poolHook.pool, // Include pool for connection removal on failure |
| 287 | } |
| 288 | |
| 289 | select { |
| 290 | // priority to shutdown |
| 291 | case <-hwm.shutdown: |
| 292 | return ErrShutdown |
| 293 | default: |
| 294 | select { |
| 295 | case <-hwm.shutdown: |
| 296 | return ErrShutdown |
| 297 | case hwm.handoffQueue <- request: |
| 298 | // Store in pending map |
| 299 | hwm.pending.Store(request.ConnID, request.SeqID) |
| 300 | // Ensure we have a worker to process this request |
| 301 | hwm.ensureWorkerAvailable() |
| 302 | return nil |
| 303 | default: |
| 304 | select { |
| 305 | case <-hwm.shutdown: |
| 306 | return ErrShutdown |
| 307 | case hwm.handoffQueue <- request: |
| 308 | // Store in pending map |
| 309 | hwm.pending.Store(request.ConnID, request.SeqID) |
| 310 | // Ensure we have a worker to process this request |
| 311 | hwm.ensureWorkerAvailable() |
| 312 | return nil |
| 313 | case <-time.After(100 * time.Millisecond): // give workers a chance to process |
| 314 | // Queue is full - log and attempt scaling |
| 315 | queueLen := len(hwm.handoffQueue) |
| 316 | queueCap := cap(hwm.handoffQueue) |
| 317 | if internal.LogLevel.WarnOrAbove() { |
| 318 | internal.Logger.Printf(context.Background(), logs.HandoffQueueFull(queueLen, queueCap)) |
| 319 | } |
| 320 | } |
| 321 | } |
| 322 | } |
| 323 | |
| 324 | // Ensure we have workers available to handle the load |
no test coverage detected