(ctx context.Context, ackType ackType, sync bool, opts ackOpts)
| 397 | } |
| 398 | |
| 399 | func (m *jetStreamMsg) ackReply(ctx context.Context, ackType ackType, sync bool, opts ackOpts) error { |
| 400 | err := m.checkReply() |
| 401 | if err != nil { |
| 402 | return err |
| 403 | } |
| 404 | |
| 405 | m.Lock() |
| 406 | if m.ackd { |
| 407 | m.Unlock() |
| 408 | return ErrMsgAlreadyAckd |
| 409 | } |
| 410 | m.Unlock() |
| 411 | |
| 412 | if sync { |
| 413 | var cancel context.CancelFunc |
| 414 | ctx, cancel = m.js.wrapContextWithoutDeadline(ctx) |
| 415 | if cancel != nil { |
| 416 | defer cancel() |
| 417 | } |
| 418 | } |
| 419 | |
| 420 | var body []byte |
| 421 | if opts.nakDelay > 0 { |
| 422 | body = []byte(fmt.Sprintf("%s {\"delay\": %d}", ackType, opts.nakDelay.Nanoseconds())) |
| 423 | } else if opts.termReason != "" { |
| 424 | body = []byte(fmt.Sprintf("%s %s", ackType, opts.termReason)) |
| 425 | } else { |
| 426 | body = ackType |
| 427 | } |
| 428 | |
| 429 | if sync { |
| 430 | _, err = m.js.conn.RequestWithContext(ctx, m.msg.Reply, body) |
| 431 | } else { |
| 432 | err = m.js.conn.Publish(m.msg.Reply, body) |
| 433 | } |
| 434 | if err != nil { |
| 435 | return err |
| 436 | } |
| 437 | |
| 438 | // Mark that the message has been acked unless it is ackProgress |
| 439 | // which can be sent many times. |
| 440 | if !bytes.Equal(ackType, ackProgress) { |
| 441 | m.Lock() |
| 442 | m.ackd = true |
| 443 | m.Unlock() |
| 444 | } |
| 445 | return nil |
| 446 | } |
| 447 | |
| 448 | func (m *jetStreamMsg) checkReply() error { |
| 449 | if m == nil || m.msg.Sub == nil { |
no test coverage detected