MCPcopy
hub / github.com/nats-io/nats.go / flusher

Method flusher

nats.go:3971–4016  ·  view source on GitHub ↗

flusher is a separate Go routine that will process flush requests for the write bufio. This allows coalescing of writes to the underlying socket.

()

Source from the content-addressed store, hash-verified

3969// flusher is a separate Go routine that will process flush requests for the write
3970// bufio. This allows coalescing of writes to the underlying socket.
3971func (nc *Conn) flusher() {
3972 // Release the wait group
3973 defer nc.wg.Done()
3974
3975 // snapshot the bw and conn since they can change from underneath of us.
3976 nc.mu.Lock()
3977 bw := nc.bw
3978 conn := nc.conn
3979 fch := nc.fch
3980 nc.mu.Unlock()
3981
3982 if conn == nil || bw == nil {
3983 return
3984 }
3985
3986 for {
3987 if _, ok := <-fch; !ok {
3988 return
3989 }
3990 nc.mu.Lock()
3991
3992 // Check to see if we should bail out.
3993 if !nc.isConnected() || nc.isConnecting() || conn != nc.conn {
3994 nc.mu.Unlock()
3995 return
3996 }
3997 if bw.buffered() > 0 {
3998 if err := bw.flush(); err != nil {
3999 if nc.err == nil {
4000 nc.err = err
4001 }
4002 if asyncErrorCB := nc.Opts.AsyncErrorCB; asyncErrorCB != nil {
4003 nc.ach.push(func() { asyncErrorCB(nc, nil, err) })
4004 }
4005 if nc.Opts.ReconnectOnFlusherError {
4006 nc.mu.Unlock()
4007 if shouldClose := nc.processOpErr(err, true); shouldClose {
4008 nc.close(CLOSED, true, nil)
4009 }
4010 return
4011 }
4012 }
4013 }
4014 nc.mu.Unlock()
4015 }
4016}
4017
4018// processPing will send an immediate pong protocol response to the
4019// server. The server uses this mechanism to detect dead clients.

Callers 1

processConnectInitMethod · 0.95

Calls 8

isConnectedMethod · 0.95
isConnectingMethod · 0.95
processOpErrMethod · 0.95
closeMethod · 0.95
bufferedMethod · 0.80
flushMethod · 0.80
DoneMethod · 0.65
pushMethod · 0.45

Tested by

no test coverage detected