(ctx context.Context, mreq *apiMsgGetRequest)
| 559 | } |
| 560 | |
| 561 | func (s *stream) getMsg(ctx context.Context, mreq *apiMsgGetRequest) (*RawStreamMsg, error) { |
| 562 | ctx, cancel := s.js.wrapContextWithoutDeadline(ctx) |
| 563 | if cancel != nil { |
| 564 | defer cancel() |
| 565 | } |
| 566 | req, err := json.Marshal(mreq) |
| 567 | if err != nil { |
| 568 | return nil, err |
| 569 | } |
| 570 | |
| 571 | var gmSubj string |
| 572 | |
| 573 | // handle direct gets |
| 574 | if s.info.Config.AllowDirect { |
| 575 | if mreq.LastFor != "" { |
| 576 | gmSubj = fmt.Sprintf(apiDirectMsgGetLastBySubjectT, s.name, mreq.LastFor) |
| 577 | r, err := s.js.apiRequest(ctx, gmSubj, nil) |
| 578 | if err != nil { |
| 579 | return nil, err |
| 580 | } |
| 581 | return convertDirectGetMsgResponseToMsg(r.msg) |
| 582 | } |
| 583 | gmSubj = fmt.Sprintf(apiDirectMsgGetT, s.name) |
| 584 | r, err := s.js.apiRequest(ctx, gmSubj, req) |
| 585 | if err != nil { |
| 586 | return nil, err |
| 587 | } |
| 588 | return convertDirectGetMsgResponseToMsg(r.msg) |
| 589 | } |
| 590 | |
| 591 | var resp apiMsgGetResponse |
| 592 | dsSubj := fmt.Sprintf(apiMsgGetT, s.name) |
| 593 | _, err = s.js.apiRequestJSON(ctx, dsSubj, &resp, req) |
| 594 | if err != nil { |
| 595 | return nil, err |
| 596 | } |
| 597 | |
| 598 | if resp.Error != nil { |
| 599 | if resp.Error.ErrorCode == JSErrCodeMessageNotFound { |
| 600 | return nil, ErrMsgNotFound |
| 601 | } |
| 602 | return nil, resp.Error |
| 603 | } |
| 604 | |
| 605 | msg := resp.Message |
| 606 | |
| 607 | var hdr nats.Header |
| 608 | if len(msg.Header) > 0 { |
| 609 | hdr, err = nats.DecodeHeadersMsg(msg.Header) |
| 610 | if err != nil { |
| 611 | return nil, err |
| 612 | } |
| 613 | } |
| 614 | |
| 615 | return &RawStreamMsg{ |
| 616 | Subject: msg.Subject, |
| 617 | Sequence: msg.Sequence, |
| 618 | Header: hdr, |
no test coverage detected