ackReply handles all acks. Will do the right thing for pull and sync mode. It ensures that an ack is only sent a single time, regardless of how many times it is being called to avoid duplicated acks.
(ackType []byte, sync bool, opts ...AckOpt)
| 3575 | // It ensures that an ack is only sent a single time, regardless of |
| 3576 | // how many times it is being called to avoid duplicated acks. |
| 3577 | func (m *Msg) ackReply(ackType []byte, sync bool, opts ...AckOpt) error { |
| 3578 | var o ackOpts |
| 3579 | for _, opt := range opts { |
| 3580 | if err := opt.configureAck(&o); err != nil { |
| 3581 | return err |
| 3582 | } |
| 3583 | } |
| 3584 | |
| 3585 | if err := m.checkReply(); err != nil { |
| 3586 | return err |
| 3587 | } |
| 3588 | |
| 3589 | var ackNone bool |
| 3590 | var js *js |
| 3591 | |
| 3592 | sub := m.Sub |
| 3593 | sub.mu.Lock() |
| 3594 | nc := sub.conn |
| 3595 | if jsi := sub.jsi; jsi != nil { |
| 3596 | js = jsi.js |
| 3597 | ackNone = jsi.ackNone |
| 3598 | } |
| 3599 | sub.mu.Unlock() |
| 3600 | |
| 3601 | // Skip if already acked. |
| 3602 | if atomic.LoadUint32(&m.ackd) == 1 { |
| 3603 | return ErrMsgAlreadyAckd |
| 3604 | } |
| 3605 | if ackNone { |
| 3606 | return ErrCantAckIfConsumerAckNone |
| 3607 | } |
| 3608 | |
| 3609 | usesCtx := o.ctx != nil |
| 3610 | usesWait := o.ttl > 0 |
| 3611 | |
| 3612 | // Only allow either AckWait or Context option to set the timeout. |
| 3613 | if usesWait && usesCtx { |
| 3614 | return ErrContextAndTimeout |
| 3615 | } |
| 3616 | |
| 3617 | sync = sync || usesCtx || usesWait |
| 3618 | ctx := o.ctx |
| 3619 | wait := defaultRequestWait |
| 3620 | if usesWait { |
| 3621 | wait = o.ttl |
| 3622 | } else if js != nil { |
| 3623 | wait = js.opts.wait |
| 3624 | } |
| 3625 | |
| 3626 | var body []byte |
| 3627 | var err error |
| 3628 | // This will be > 0 only when called from NakWithDelay() |
| 3629 | if o.nakDelay > 0 { |
| 3630 | body = []byte(fmt.Sprintf("%s {\"delay\": %d}", ackType, o.nakDelay.Nanoseconds())) |
| 3631 | } else { |
| 3632 | body = ackType |
| 3633 | } |
| 3634 |
no test coverage detected