MCPcopy
hub / github.com/nats-io/nats.go / PublishMsgAsync

Method PublishMsgAsync

js.go:1011–1140  ·  js.go::js.PublishMsgAsync
(m *Msg, opts ...PubOpt)

Source from the content-addressed store, hash-verified

1009const defaultStallWait = 200 * time.Millisecond
1010
1011func (js *js) PublishMsgAsync(m *Msg, opts ...PubOpt) (PubAckFuture, error) {
1012 var o pubOpts
1013 if len(opts) > 0 {
1014 if m.Header == nil {
1015 m.Header = Header{}
1016 }
1017 for _, opt := range opts {
1018 if err := opt.configurePublish(&o); err != nil {
1019 return nil, err
1020 }
1021 }
1022 }
1023
1024 if o.rnum < 0 {
1025 return nil, fmt.Errorf("%w: retry attempts cannot be negative", ErrInvalidArg)
1026 }
1027
1028 // Timeouts and contexts do not make sense for these.
1029 if o.ttl != 0 || o.ctx != nil {
1030 return nil, ErrContextAndTimeout
1031 }
1032 stallWait := defaultStallWait
1033 if o.stallWait > 0 {
1034 stallWait = o.stallWait
1035 }
1036
1037 // FIXME(dlc) - Make common.
1038 if o.id != _EMPTY_ {
1039 m.Header.Set(MsgIdHdr, o.id)
1040 }
1041 if o.lid != _EMPTY_ {
1042 m.Header.Set(ExpectedLastMsgIdHdr, o.lid)
1043 }
1044 if o.str != _EMPTY_ {
1045 m.Header.Set(ExpectedStreamHdr, o.str)
1046 }
1047 if o.seq != nil {
1048 m.Header.Set(ExpectedLastSeqHdr, strconv.FormatUint(*o.seq, 10))
1049 }
1050 if o.lss != nil {
1051 m.Header.Set(ExpectedLastSubjSeqHdr, strconv.FormatUint(*o.lss, 10))
1052 }
1053 if o.msgTTL > 0 {
1054 m.Header.Set(MsgTTLHdr, o.msgTTL.String())
1055 }
1056
1057 // Reply
1058 paf := o.pafRetry
1059 if paf == nil && m.Reply != _EMPTY_ {
1060 return nil, errors.New("nats: reply subject should be empty")
1061 }
1062 var id string
1063 var reply string
1064
1065 // register new paf if not retrying
1066 if paf == nil {
1067 reply = js.newAsyncReply()
1068

Callers 2

handleAsyncReplyMethod · 0.95
PublishAsyncMethod · 0.95

Calls 11

newAsyncReplyMethod · 0.95
registerPAFMethod · 0.95
asyncStallMethod · 0.95
clearPAFMethod · 0.95
ErrorfMethod · 0.80
headerBytesMethod · 0.80
publishMethod · 0.80
configurePublishMethod · 0.65
SetMethod · 0.65
ResetMethod · 0.65
StringMethod · 0.45

Tested by

no test coverage detected