MCPcopy
hub / github.com/nats-io/nats.go / PublishMsg

Method PublishMsg

js.go:534–626  ·  view source on GitHub ↗

PublishMsg publishes a Msg to a stream from JetStream.

(m *Msg, opts ...PubOpt)

Source from the content-addressed store, hash-verified

532
533// PublishMsg publishes a Msg to a stream from JetStream.
534func (js *js) PublishMsg(m *Msg, opts ...PubOpt) (*PubAck, error) {
535 var o = pubOpts{rwait: DefaultPubRetryWait, rnum: DefaultPubRetryAttempts}
536 if len(opts) > 0 {
537 if m.Header == nil {
538 m.Header = Header{}
539 }
540 for _, opt := range opts {
541 if err := opt.configurePublish(&o); err != nil {
542 return nil, err
543 }
544 }
545 }
546 // Check for option collisions. Right now just timeout and context.
547 if o.ctx != nil && o.ttl != 0 {
548 return nil, ErrContextAndTimeout
549 }
550 if o.ttl == 0 && o.ctx == nil {
551 o.ttl = js.opts.wait
552 }
553 if o.stallWait > 0 {
554 return nil, errors.New("nats: stall wait cannot be set to sync publish")
555 }
556
557 if o.id != _EMPTY_ {
558 m.Header.Set(MsgIdHdr, o.id)
559 }
560 if o.lid != _EMPTY_ {
561 m.Header.Set(ExpectedLastMsgIdHdr, o.lid)
562 }
563 if o.str != _EMPTY_ {
564 m.Header.Set(ExpectedStreamHdr, o.str)
565 }
566 if o.seq != nil {
567 m.Header.Set(ExpectedLastSeqHdr, strconv.FormatUint(*o.seq, 10))
568 }
569 if o.lss != nil {
570 m.Header.Set(ExpectedLastSubjSeqHdr, strconv.FormatUint(*o.lss, 10))
571 }
572 if o.msgTTL > 0 {
573 m.Header.Set(MsgTTLHdr, o.msgTTL.String())
574 }
575
576 var resp *Msg
577 var err error
578
579 if o.ttl > 0 {
580 resp, err = js.nc.RequestMsg(m, time.Duration(o.ttl))
581 } else {
582 resp, err = js.nc.RequestMsgWithContext(o.ctx, m)
583 }
584
585 if err != nil {
586 for r, ttl := 0, o.ttl; errors.Is(err, ErrNoResponders) && (r < o.rnum || o.rnum < 0); r++ {
587 // To protect against small blips in leadership changes etc, if we get a no responders here retry.
588 if o.ctx != nil {
589 select {
590 case <-o.ctx.Done():
591 case <-time.After(o.rwait):

Callers 1

PublishMethod · 0.95

Calls 8

RequestMsgMethod · 0.80
DurationMethod · 0.80
RequestMsgWithContextMethod · 0.80
configurePublishMethod · 0.65
SetMethod · 0.65
DoneMethod · 0.65
StringMethod · 0.45
IsMethod · 0.45

Tested by

no test coverage detected