(m *Msg, opts ...PubOpt)
| 1009 | const defaultStallWait = 200 * time.Millisecond |
| 1010 | |
| 1011 | func (js *js) PublishMsgAsync(m *Msg, opts ...PubOpt) (PubAckFuture, error) { |
| 1012 | var o pubOpts |
| 1013 | if len(opts) > 0 { |
| 1014 | if m.Header == nil { |
| 1015 | m.Header = Header{} |
| 1016 | } |
| 1017 | for _, opt := range opts { |
| 1018 | if err := opt.configurePublish(&o); err != nil { |
| 1019 | return nil, err |
| 1020 | } |
| 1021 | } |
| 1022 | } |
| 1023 | |
| 1024 | if o.rnum < 0 { |
| 1025 | return nil, fmt.Errorf("%w: retry attempts cannot be negative", ErrInvalidArg) |
| 1026 | } |
| 1027 | |
| 1028 | // Timeouts and contexts do not make sense for these. |
| 1029 | if o.ttl != 0 || o.ctx != nil { |
| 1030 | return nil, ErrContextAndTimeout |
| 1031 | } |
| 1032 | stallWait := defaultStallWait |
| 1033 | if o.stallWait > 0 { |
| 1034 | stallWait = o.stallWait |
| 1035 | } |
| 1036 | |
| 1037 | // FIXME(dlc) - Make common. |
| 1038 | if o.id != _EMPTY_ { |
| 1039 | m.Header.Set(MsgIdHdr, o.id) |
| 1040 | } |
| 1041 | if o.lid != _EMPTY_ { |
| 1042 | m.Header.Set(ExpectedLastMsgIdHdr, o.lid) |
| 1043 | } |
| 1044 | if o.str != _EMPTY_ { |
| 1045 | m.Header.Set(ExpectedStreamHdr, o.str) |
| 1046 | } |
| 1047 | if o.seq != nil { |
| 1048 | m.Header.Set(ExpectedLastSeqHdr, strconv.FormatUint(*o.seq, 10)) |
| 1049 | } |
| 1050 | if o.lss != nil { |
| 1051 | m.Header.Set(ExpectedLastSubjSeqHdr, strconv.FormatUint(*o.lss, 10)) |
| 1052 | } |
| 1053 | if o.msgTTL > 0 { |
| 1054 | m.Header.Set(MsgTTLHdr, o.msgTTL.String()) |
| 1055 | } |
| 1056 | |
| 1057 | // Reply |
| 1058 | paf := o.pafRetry |
| 1059 | if paf == nil && m.Reply != _EMPTY_ { |
| 1060 | return nil, errors.New("nats: reply subject should be empty") |
| 1061 | } |
| 1062 | var id string |
| 1063 | var reply string |
| 1064 | |
| 1065 | // register new paf if not retrying |
| 1066 | if paf == nil { |
| 1067 | reply = js.newAsyncReply() |
| 1068 |
no test coverage detected