MCPcopy
hub / github.com/redis/go-redis / queueHandoff

Method queueHandoff

maintnotifications/handoff_worker.go:267–327  ·  view source on GitHub ↗

queueHandoff queues a handoff request for processing if err is returned, connection will be removed from pool

(conn *pool.Conn)

Source from the content-addressed store, hash-verified

265// queueHandoff queues a handoff request for processing
266// if err is returned, connection will be removed from pool
267func (hwm *handoffWorkerManager) queueHandoff(conn *pool.Conn) error {
268 // Get handoff info atomically to prevent race conditions
269 shouldHandoff, endpoint, seqID := conn.GetHandoffInfo()
270
271 // on retries the connection will not be marked for handoff, but it will have retries > 0
272 // if shouldHandoff is false and retries is 0, then we are not retrying and not do a handoff
273 if !shouldHandoff && conn.HandoffRetries() == 0 {
274 if internal.LogLevel.InfoOrAbove() {
275 internal.Logger.Printf(context.Background(), logs.ConnectionNotMarkedForHandoff(conn.GetID()))
276 }
277 return errors.New(logs.ConnectionNotMarkedForHandoffError(conn.GetID()))
278 }
279
280 // Create handoff request with atomically retrieved data
281 request := HandoffRequest{
282 Conn: conn,
283 ConnID: conn.GetID(),
284 Endpoint: endpoint,
285 SeqID: seqID,
286 Pool: hwm.poolHook.pool, // Include pool for connection removal on failure
287 }
288
289 select {
290 // priority to shutdown
291 case <-hwm.shutdown:
292 return ErrShutdown
293 default:
294 select {
295 case <-hwm.shutdown:
296 return ErrShutdown
297 case hwm.handoffQueue <- request:
298 // Store in pending map
299 hwm.pending.Store(request.ConnID, request.SeqID)
300 // Ensure we have a worker to process this request
301 hwm.ensureWorkerAvailable()
302 return nil
303 default:
304 select {
305 case <-hwm.shutdown:
306 return ErrShutdown
307 case hwm.handoffQueue <- request:
308 // Store in pending map
309 hwm.pending.Store(request.ConnID, request.SeqID)
310 // Ensure we have a worker to process this request
311 hwm.ensureWorkerAvailable()
312 return nil
313 case <-time.After(100 * time.Millisecond): // give workers a chance to process
314 // Queue is full - log and attempt scaling
315 queueLen := len(hwm.handoffQueue)
316 queueCap := cap(hwm.handoffQueue)
317 if internal.LogLevel.WarnOrAbove() {
318 internal.Logger.Printf(context.Background(), logs.HandoffQueueFull(queueLen, queueCap))
319 }
320 }
321 }
322 }
323
324 // Ensure we have workers available to handle the load

Callers 3

processHandoffRequestMethod · 0.95
OnPutMethod · 0.80
handleMovingMethod · 0.80

Calls 10

ensureWorkerAvailableMethod · 0.95
HandoffQueueFullFunction · 0.92
GetHandoffInfoMethod · 0.80
HandoffRetriesMethod · 0.80
InfoOrAboveMethod · 0.80
GetIDMethod · 0.80
WarnOrAboveMethod · 0.80
PrintfMethod · 0.65

Tested by

no test coverage detected