MCPcopy
hub / github.com/nats-io/nats.go / ackReply

Method ackReply

jetstream/message.go:399–446  ·  view source on GitHub ↗
(ctx context.Context, ackType ackType, sync bool, opts ackOpts)

Source from the content-addressed store, hash-verified

397}
398
399func (m *jetStreamMsg) ackReply(ctx context.Context, ackType ackType, sync bool, opts ackOpts) error {
400 err := m.checkReply()
401 if err != nil {
402 return err
403 }
404
405 m.Lock()
406 if m.ackd {
407 m.Unlock()
408 return ErrMsgAlreadyAckd
409 }
410 m.Unlock()
411
412 if sync {
413 var cancel context.CancelFunc
414 ctx, cancel = m.js.wrapContextWithoutDeadline(ctx)
415 if cancel != nil {
416 defer cancel()
417 }
418 }
419
420 var body []byte
421 if opts.nakDelay > 0 {
422 body = []byte(fmt.Sprintf("%s {\"delay\": %d}", ackType, opts.nakDelay.Nanoseconds()))
423 } else if opts.termReason != "" {
424 body = []byte(fmt.Sprintf("%s %s", ackType, opts.termReason))
425 } else {
426 body = ackType
427 }
428
429 if sync {
430 _, err = m.js.conn.RequestWithContext(ctx, m.msg.Reply, body)
431 } else {
432 err = m.js.conn.Publish(m.msg.Reply, body)
433 }
434 if err != nil {
435 return err
436 }
437
438 // Mark that the message has been acked unless it is ackProgress
439 // which can be sent many times.
440 if !bytes.Equal(ackType, ackProgress) {
441 m.Lock()
442 m.ackd = true
443 m.Unlock()
444 }
445 return nil
446}
447
448func (m *jetStreamMsg) checkReply() error {
449 if m == nil || m.msg.Sub == nil {

Callers 7

AckMethod · 0.95
DoubleAckMethod · 0.95
NakMethod · 0.95
NakWithDelayMethod · 0.95
InProgressMethod · 0.95
TermMethod · 0.95
TermWithReasonMethod · 0.95

Calls 5

checkReplyMethod · 0.95
EqualMethod · 0.80
PublishMethod · 0.65
RequestWithContextMethod · 0.45

Tested by

no test coverage detected