PublishMsgAsync performs an asynchronous publish to a stream and returns [PubAckFuture] interface. It accepts subject name (which must be bound to a stream) and nats.Message.
(m *nats.Msg, opts ...PublishOpt)
| 272 | // returns [PubAckFuture] interface. It accepts subject name (which must |
| 273 | // be bound to a stream) and nats.Message. |
| 274 | func (js *jetStream) PublishMsgAsync(m *nats.Msg, opts ...PublishOpt) (PubAckFuture, error) { |
| 275 | o := pubOpts{ |
| 276 | retryWait: DefaultPubRetryWait, |
| 277 | retryAttempts: DefaultPubRetryAttempts, |
| 278 | } |
| 279 | if len(opts) > 0 { |
| 280 | if m.Header == nil { |
| 281 | m.Header = nats.Header{} |
| 282 | } |
| 283 | for _, opt := range opts { |
| 284 | if err := opt(&o); err != nil { |
| 285 | return nil, err |
| 286 | } |
| 287 | } |
| 288 | } |
| 289 | defaultStallWait := 200 * time.Millisecond |
| 290 | |
| 291 | stallWait := defaultStallWait |
| 292 | if o.stallWait > 0 { |
| 293 | stallWait = o.stallWait |
| 294 | } |
| 295 | |
| 296 | if o.id != "" { |
| 297 | m.Header.Set(MsgIDHeader, o.id) |
| 298 | } |
| 299 | if o.lastMsgID != "" { |
| 300 | m.Header.Set(ExpectedLastMsgIDHeader, o.lastMsgID) |
| 301 | } |
| 302 | if o.stream != "" { |
| 303 | m.Header.Set(ExpectedStreamHeader, o.stream) |
| 304 | } |
| 305 | if o.lastSeq != nil { |
| 306 | m.Header.Set(ExpectedLastSeqHeader, strconv.FormatUint(*o.lastSeq, 10)) |
| 307 | } |
| 308 | if o.lastSubjectSeq != nil { |
| 309 | m.Header.Set(ExpectedLastSubjSeqHeader, strconv.FormatUint(*o.lastSubjectSeq, 10)) |
| 310 | } |
| 311 | if o.lastSubject != "" { |
| 312 | m.Header.Set(ExpectedLastSubjSeqSubjHeader, o.lastSubject) |
| 313 | m.Header.Set(ExpectedLastSubjSeqHeader, strconv.FormatUint(*o.lastSubjectSeq, 10)) |
| 314 | } |
| 315 | if o.ttl > 0 { |
| 316 | m.Header.Set(MsgTTLHeader, o.ttl.String()) |
| 317 | } |
| 318 | if o.schedule != "" { |
| 319 | m.Header.Set(ScheduleHeader, o.schedule) |
| 320 | } |
| 321 | if o.scheduleTarget != "" { |
| 322 | m.Header.Set(ScheduleTargetHeader, o.scheduleTarget) |
| 323 | } |
| 324 | if o.scheduleSource != "" { |
| 325 | m.Header.Set(ScheduleSourceHeader, o.scheduleSource) |
| 326 | } |
| 327 | if o.scheduleTTL != "" { |
| 328 | m.Header.Set(ScheduleTTLHeader, o.scheduleTTL) |
| 329 | } |
| 330 | if o.scheduleTZ != "" { |
| 331 | m.Header.Set(ScheduleTimeZoneHeader, o.scheduleTZ) |
no test coverage detected