PublishMsg performs a synchronous publish to a stream and waits for ack from server. It accepts subject name (which must be bound to a stream) and nats.Message.
(ctx context.Context, m *nats.Msg, opts ...PublishOpt)
| 166 | // ack from server. It accepts subject name (which must be bound to a |
| 167 | // stream) and nats.Message. |
| 168 | func (js *jetStream) PublishMsg(ctx context.Context, m *nats.Msg, opts ...PublishOpt) (*PubAck, error) { |
| 169 | ctx, cancel := js.wrapContextWithoutDeadline(ctx) |
| 170 | if cancel != nil { |
| 171 | defer cancel() |
| 172 | } |
| 173 | o := pubOpts{ |
| 174 | retryWait: DefaultPubRetryWait, |
| 175 | retryAttempts: DefaultPubRetryAttempts, |
| 176 | } |
| 177 | if len(opts) > 0 { |
| 178 | if m.Header == nil { |
| 179 | m.Header = nats.Header{} |
| 180 | } |
| 181 | for _, opt := range opts { |
| 182 | if err := opt(&o); err != nil { |
| 183 | return nil, err |
| 184 | } |
| 185 | } |
| 186 | } |
| 187 | if o.stallWait > 0 { |
| 188 | return nil, fmt.Errorf("%w: stall wait cannot be set to sync publish", ErrInvalidOption) |
| 189 | } |
| 190 | |
| 191 | if o.id != "" { |
| 192 | m.Header.Set(MsgIDHeader, o.id) |
| 193 | } |
| 194 | if o.lastMsgID != "" { |
| 195 | m.Header.Set(ExpectedLastMsgIDHeader, o.lastMsgID) |
| 196 | } |
| 197 | if o.stream != "" { |
| 198 | m.Header.Set(ExpectedStreamHeader, o.stream) |
| 199 | } |
| 200 | if o.lastSeq != nil { |
| 201 | m.Header.Set(ExpectedLastSeqHeader, strconv.FormatUint(*o.lastSeq, 10)) |
| 202 | } |
| 203 | if o.lastSubjectSeq != nil { |
| 204 | m.Header.Set(ExpectedLastSubjSeqHeader, strconv.FormatUint(*o.lastSubjectSeq, 10)) |
| 205 | } |
| 206 | if o.lastSubject != "" { |
| 207 | m.Header.Set(ExpectedLastSubjSeqSubjHeader, o.lastSubject) |
| 208 | m.Header.Set(ExpectedLastSubjSeqHeader, strconv.FormatUint(*o.lastSubjectSeq, 10)) |
| 209 | } |
| 210 | if o.ttl > 0 { |
| 211 | m.Header.Set(MsgTTLHeader, o.ttl.String()) |
| 212 | } |
| 213 | if o.schedule != "" { |
| 214 | m.Header.Set(ScheduleHeader, o.schedule) |
| 215 | } |
| 216 | if o.scheduleTarget != "" { |
| 217 | m.Header.Set(ScheduleTargetHeader, o.scheduleTarget) |
| 218 | } |
| 219 | if o.scheduleSource != "" { |
| 220 | m.Header.Set(ScheduleSourceHeader, o.scheduleSource) |
| 221 | } |
| 222 | if o.scheduleTTL != "" { |
| 223 | m.Header.Set(ScheduleTTLHeader, o.scheduleTTL) |
| 224 | } |
| 225 | if o.scheduleTZ != "" { |
no test coverage detected