MCPcopy
hub / github.com/nats-io/nats.go / PublishMsg

Method PublishMsg

jetstream/publish.go:168–262  ·  view source on GitHub ↗

PublishMsg performs a synchronous publish to a stream and waits for ack from server. It accepts subject name (which must be bound to a stream) and nats.Message.

(ctx context.Context, m *nats.Msg, opts ...PublishOpt)

Source from the content-addressed store, hash-verified

166// ack from server. It accepts subject name (which must be bound to a
167// stream) and nats.Message.
168func (js *jetStream) PublishMsg(ctx context.Context, m *nats.Msg, opts ...PublishOpt) (*PubAck, error) {
169 ctx, cancel := js.wrapContextWithoutDeadline(ctx)
170 if cancel != nil {
171 defer cancel()
172 }
173 o := pubOpts{
174 retryWait: DefaultPubRetryWait,
175 retryAttempts: DefaultPubRetryAttempts,
176 }
177 if len(opts) > 0 {
178 if m.Header == nil {
179 m.Header = nats.Header{}
180 }
181 for _, opt := range opts {
182 if err := opt(&o); err != nil {
183 return nil, err
184 }
185 }
186 }
187 if o.stallWait > 0 {
188 return nil, fmt.Errorf("%w: stall wait cannot be set to sync publish", ErrInvalidOption)
189 }
190
191 if o.id != "" {
192 m.Header.Set(MsgIDHeader, o.id)
193 }
194 if o.lastMsgID != "" {
195 m.Header.Set(ExpectedLastMsgIDHeader, o.lastMsgID)
196 }
197 if o.stream != "" {
198 m.Header.Set(ExpectedStreamHeader, o.stream)
199 }
200 if o.lastSeq != nil {
201 m.Header.Set(ExpectedLastSeqHeader, strconv.FormatUint(*o.lastSeq, 10))
202 }
203 if o.lastSubjectSeq != nil {
204 m.Header.Set(ExpectedLastSubjSeqHeader, strconv.FormatUint(*o.lastSubjectSeq, 10))
205 }
206 if o.lastSubject != "" {
207 m.Header.Set(ExpectedLastSubjSeqSubjHeader, o.lastSubject)
208 m.Header.Set(ExpectedLastSubjSeqHeader, strconv.FormatUint(*o.lastSubjectSeq, 10))
209 }
210 if o.ttl > 0 {
211 m.Header.Set(MsgTTLHeader, o.ttl.String())
212 }
213 if o.schedule != "" {
214 m.Header.Set(ScheduleHeader, o.schedule)
215 }
216 if o.scheduleTarget != "" {
217 m.Header.Set(ScheduleTargetHeader, o.scheduleTarget)
218 }
219 if o.scheduleSource != "" {
220 m.Header.Set(ScheduleSourceHeader, o.scheduleSource)
221 }
222 if o.scheduleTTL != "" {
223 m.Header.Set(ScheduleTTLHeader, o.scheduleTTL)
224 }
225 if o.scheduleTZ != "" {

Callers 1

PublishMethod · 0.95

Calls 7

ErrorfMethod · 0.80
RequestMsgWithContextMethod · 0.80
SetMethod · 0.65
DoneMethod · 0.65
StringMethod · 0.45
IsMethod · 0.45

Tested by

no test coverage detected