Handle an async reply from PublishAsync.
(m *Msg)
| 858 | |
| 859 | // Handle an async reply from PublishAsync. |
| 860 | func (js *js) handleAsyncReply(m *Msg) { |
| 861 | if len(m.Subject) <= js.replyPrefixLen { |
| 862 | return |
| 863 | } |
| 864 | id := m.Subject[js.replyPrefixLen:] |
| 865 | |
| 866 | js.mu.Lock() |
| 867 | paf := js.getPAF(id) |
| 868 | if paf == nil { |
| 869 | js.mu.Unlock() |
| 870 | return |
| 871 | } |
| 872 | |
| 873 | closeStc := func() { |
| 874 | // Check on anyone stalled and waiting. |
| 875 | if js.stc != nil && len(js.pafs) < js.opts.maxpa { |
| 876 | close(js.stc) |
| 877 | js.stc = nil |
| 878 | } |
| 879 | } |
| 880 | |
| 881 | closeDchFn := func() func() { |
| 882 | var dch chan struct{} |
| 883 | // Check on anyone one waiting on done status. |
| 884 | if js.dch != nil && len(js.pafs) == 0 { |
| 885 | dch = js.dch |
| 886 | js.dch = nil |
| 887 | } |
| 888 | // Return function to close done channel which |
| 889 | // should be deferred so that error is processed and |
| 890 | // can be checked. |
| 891 | return func() { |
| 892 | if dch != nil { |
| 893 | close(dch) |
| 894 | } |
| 895 | } |
| 896 | } |
| 897 | |
| 898 | doErr := func(err error) { |
| 899 | paf.err = err |
| 900 | if paf.errCh != nil { |
| 901 | paf.errCh <- paf.err |
| 902 | } |
| 903 | cb := js.opts.aecb |
| 904 | js.mu.Unlock() |
| 905 | if cb != nil { |
| 906 | cb(paf.js, paf.msg, err) |
| 907 | } |
| 908 | } |
| 909 | |
| 910 | if paf.timeout != nil { |
| 911 | paf.timeout.Stop() |
| 912 | } |
| 913 | |
| 914 | // Process no responders etc. |
| 915 | if len(m.Data) == 0 && m.Header.Get(statusHdr) == noResponders { |
| 916 | if paf.retries < paf.maxRetries { |
| 917 | paf.retries++ |
nothing calls this directly
no test coverage detected