| 506 | } |
| 507 | |
| 508 | func parsePending(msg *nats.Msg) (int, int, error) { |
| 509 | msgsLeftStr := msg.Header.Get("Nats-Pending-Messages") |
| 510 | var msgsLeft int |
| 511 | var err error |
| 512 | if msgsLeftStr != "" { |
| 513 | msgsLeft, err = strconv.Atoi(msgsLeftStr) |
| 514 | if err != nil { |
| 515 | return 0, 0, errors.New("nats: invalid format of Nats-Pending-Messages") |
| 516 | } |
| 517 | } |
| 518 | bytesLeftStr := msg.Header.Get("Nats-Pending-Bytes") |
| 519 | var bytesLeft int |
| 520 | if bytesLeftStr != "" { |
| 521 | bytesLeft, err = strconv.Atoi(bytesLeftStr) |
| 522 | if err != nil { |
| 523 | return 0, 0, errors.New("nats: invalid format of Nats-Pending-Bytes") |
| 524 | } |
| 525 | } |
| 526 | return msgsLeft, bytesLeft, nil |
| 527 | } |
| 528 | |
| 529 | // toJSMsg converts core [nats.Msg] to [jetStreamMsg], exposing JetStream-specific operations |
| 530 | func (js *jetStream) toJSMsg(msg *nats.Msg) *jetStreamMsg { |