MCPcopy
hub / github.com/nats-io/nats.go / PublishMsgAsync

Method PublishMsgAsync

jetstream/publish.go:274–420  ·  view source on GitHub ↗

PublishMsgAsync performs an asynchronous publish to a stream and returns [PubAckFuture] interface. It accepts subject name (which must be bound to a stream) and nats.Message.

(m *nats.Msg, opts ...PublishOpt)

Source from the content-addressed store, hash-verified

272// returns [PubAckFuture] interface. It accepts subject name (which must
273// be bound to a stream) and nats.Message.
274func (js *jetStream) PublishMsgAsync(m *nats.Msg, opts ...PublishOpt) (PubAckFuture, error) {
275 o := pubOpts{
276 retryWait: DefaultPubRetryWait,
277 retryAttempts: DefaultPubRetryAttempts,
278 }
279 if len(opts) > 0 {
280 if m.Header == nil {
281 m.Header = nats.Header{}
282 }
283 for _, opt := range opts {
284 if err := opt(&o); err != nil {
285 return nil, err
286 }
287 }
288 }
289 defaultStallWait := 200 * time.Millisecond
290
291 stallWait := defaultStallWait
292 if o.stallWait > 0 {
293 stallWait = o.stallWait
294 }
295
296 if o.id != "" {
297 m.Header.Set(MsgIDHeader, o.id)
298 }
299 if o.lastMsgID != "" {
300 m.Header.Set(ExpectedLastMsgIDHeader, o.lastMsgID)
301 }
302 if o.stream != "" {
303 m.Header.Set(ExpectedStreamHeader, o.stream)
304 }
305 if o.lastSeq != nil {
306 m.Header.Set(ExpectedLastSeqHeader, strconv.FormatUint(*o.lastSeq, 10))
307 }
308 if o.lastSubjectSeq != nil {
309 m.Header.Set(ExpectedLastSubjSeqHeader, strconv.FormatUint(*o.lastSubjectSeq, 10))
310 }
311 if o.lastSubject != "" {
312 m.Header.Set(ExpectedLastSubjSeqSubjHeader, o.lastSubject)
313 m.Header.Set(ExpectedLastSubjSeqHeader, strconv.FormatUint(*o.lastSubjectSeq, 10))
314 }
315 if o.ttl > 0 {
316 m.Header.Set(MsgTTLHeader, o.ttl.String())
317 }
318 if o.schedule != "" {
319 m.Header.Set(ScheduleHeader, o.schedule)
320 }
321 if o.scheduleTarget != "" {
322 m.Header.Set(ScheduleTargetHeader, o.scheduleTarget)
323 }
324 if o.scheduleSource != "" {
325 m.Header.Set(ScheduleSourceHeader, o.scheduleSource)
326 }
327 if o.scheduleTTL != "" {
328 m.Header.Set(ScheduleTTLHeader, o.scheduleTTL)
329 }
330 if o.scheduleTZ != "" {
331 m.Header.Set(ScheduleTimeZoneHeader, o.scheduleTZ)

Callers 2

PublishAsyncMethod · 0.95
handleAsyncReplyMethod · 0.95

Calls 9

newAsyncReplyMethod · 0.95
registerPAFMethod · 0.95
asyncStallMethod · 0.95
clearPAFMethod · 0.95
ErrorfMethod · 0.80
SetMethod · 0.65
ResetMethod · 0.65
PublishMsgMethod · 0.65
StringMethod · 0.45

Tested by

no test coverage detected