queueOut attempts to send a ServerComMessage to a session write loop; it fails, if the send buffer is full.
(msg *ServerComMessage)
| 312 | // queueOut attempts to send a ServerComMessage to a session write loop; |
| 313 | // it fails, if the send buffer is full. |
| 314 | func (s *Session) queueOut(msg *ServerComMessage) bool { |
| 315 | if s == nil { |
| 316 | return true |
| 317 | } |
| 318 | if atomic.LoadInt32(&s.terminating) > 0 { |
| 319 | return true |
| 320 | } |
| 321 | |
| 322 | if s.multi != nil { |
| 323 | // In case of a cluster we need to pass a copy of the actual session. |
| 324 | msg.sess = s |
| 325 | if s.multi.queueOut(msg) { |
| 326 | s.multi.scheduleClusterWriteLoop() |
| 327 | return true |
| 328 | } |
| 329 | return false |
| 330 | } |
| 331 | |
| 332 | // Record latency only on {ctrl} messages and end-user sessions. |
| 333 | if msg.Ctrl != nil && msg.Id != "" { |
| 334 | if !msg.Ctrl.Timestamp.IsZero() && !s.isCluster() { |
| 335 | duration := time.Since(msg.Ctrl.Timestamp).Milliseconds() |
| 336 | statsAddHistSample("RequestLatency", float64(duration)) |
| 337 | } |
| 338 | if 200 <= msg.Ctrl.Code && msg.Ctrl.Code < 600 { |
| 339 | statsInc(fmt.Sprintf("CtrlCodesTotal%dxx", msg.Ctrl.Code/100), 1) |
| 340 | } else { |
| 341 | logs.Warn.Println("Invalid response code: ", msg.Ctrl.Code) |
| 342 | } |
| 343 | } |
| 344 | |
| 345 | select { |
| 346 | case s.send <- msg: |
| 347 | default: |
| 348 | // Never block here since it may also block the topic's run() goroutine. |
| 349 | logs.Err.Println("s.queueOut: session's send queue full", s.sid) |
| 350 | return false |
| 351 | } |
| 352 | if s.isMultiplex() { |
| 353 | s.scheduleClusterWriteLoop() |
| 354 | } |
| 355 | return true |
| 356 | } |
| 357 | |
| 358 | // queueOutBytes attempts to send a ServerComMessage already serialized to []byte. |
| 359 | // If the send buffer is full, it fails. |
no test coverage detected