MCPcopy
hub / github.com/nats-io/nats.go / handleAsyncReply

Method handleAsyncReply

js.go:860–969  ·  view source on GitHub ↗

Handle an async reply from PublishAsync.

(m *Msg)

Source from the content-addressed store, hash-verified

858
859// Handle an async reply from PublishAsync.
860func (js *js) handleAsyncReply(m *Msg) {
861 if len(m.Subject) <= js.replyPrefixLen {
862 return
863 }
864 id := m.Subject[js.replyPrefixLen:]
865
866 js.mu.Lock()
867 paf := js.getPAF(id)
868 if paf == nil {
869 js.mu.Unlock()
870 return
871 }
872
873 closeStc := func() {
874 // Check on anyone stalled and waiting.
875 if js.stc != nil && len(js.pafs) < js.opts.maxpa {
876 close(js.stc)
877 js.stc = nil
878 }
879 }
880
881 closeDchFn := func() func() {
882 var dch chan struct{}
883 // Check on anyone one waiting on done status.
884 if js.dch != nil && len(js.pafs) == 0 {
885 dch = js.dch
886 js.dch = nil
887 }
888 // Return function to close done channel which
889 // should be deferred so that error is processed and
890 // can be checked.
891 return func() {
892 if dch != nil {
893 close(dch)
894 }
895 }
896 }
897
898 doErr := func(err error) {
899 paf.err = err
900 if paf.errCh != nil {
901 paf.errCh <- paf.err
902 }
903 cb := js.opts.aecb
904 js.mu.Unlock()
905 if cb != nil {
906 cb(paf.js, paf.msg, err)
907 }
908 }
909
910 if paf.timeout != nil {
911 paf.timeout.Stop()
912 }
913
914 // Process no responders etc.
915 if len(m.Data) == 0 && m.Header.Get(statusHdr) == noResponders {
916 if paf.retries < paf.maxRetries {
917 paf.retries++

Callers

nothing calls this directly

Calls 5

getPAFMethod · 0.95
PublishMsgAsyncMethod · 0.95
pubOptFnFuncType · 0.85
StopMethod · 0.65
GetMethod · 0.65

Tested by

no test coverage detected