MCPcopy
hub / github.com/nats-io/nats.go / readLoop

Method readLoop

nats.go:3527–3567  ·  view source on GitHub ↗

readLoop() will sit on the socket reading and processing the protocol from the server. It will dispatch appropriately based on the op type.

()

Source from the content-addressed store, hash-verified

3525// protocol from the server. It will dispatch appropriately based
3526// on the op type.
3527func (nc *Conn) readLoop() {
3528 // Release the wait group on exit
3529 defer nc.wg.Done()
3530
3531 // Create a parseState if needed.
3532 nc.mu.Lock()
3533 if nc.ps == nil {
3534 nc.ps = &parseState{}
3535 }
3536 conn := nc.conn
3537 br := nc.br
3538 nc.mu.Unlock()
3539
3540 if conn == nil {
3541 return
3542 }
3543
3544 for {
3545 buf, err := br.Read()
3546 if err == nil {
3547 // With websocket, it is possible that there is no error but
3548 // also no buffer returned (either WS control message or read of a
3549 // partial compressed message). We could call parse(buf) which
3550 // would ignore an empty buffer, but simply go back to top of the loop.
3551 if len(buf) == 0 {
3552 continue
3553 }
3554 err = nc.parse(buf)
3555 }
3556 if err != nil {
3557 if shouldClose := nc.processOpErr(err, false); shouldClose {
3558 nc.close(CLOSED, true, nil)
3559 }
3560 break
3561 }
3562 }
3563 // Clear the parseState here..
3564 nc.mu.Lock()
3565 nc.ps = nil
3566 nc.mu.Unlock()
3567}
3568
3569// waitForMsgs waits on the conditional shared with readLoop and processMsg.
3570// It is used to deliver messages to asynchronous subscribers.

Callers 1

processConnectInitMethod · 0.95

Calls 5

parseMethod · 0.95
processOpErrMethod · 0.95
closeMethod · 0.95
DoneMethod · 0.65
ReadMethod · 0.45

Tested by

no test coverage detected