MCPcopy
hub / github.com/nats-io/nats.go / getMsg

Method getMsg

jetstream/stream.go:561–622  ·  view source on GitHub ↗
(ctx context.Context, mreq *apiMsgGetRequest)

Source from the content-addressed store, hash-verified

559}
560
561func (s *stream) getMsg(ctx context.Context, mreq *apiMsgGetRequest) (*RawStreamMsg, error) {
562 ctx, cancel := s.js.wrapContextWithoutDeadline(ctx)
563 if cancel != nil {
564 defer cancel()
565 }
566 req, err := json.Marshal(mreq)
567 if err != nil {
568 return nil, err
569 }
570
571 var gmSubj string
572
573 // handle direct gets
574 if s.info.Config.AllowDirect {
575 if mreq.LastFor != "" {
576 gmSubj = fmt.Sprintf(apiDirectMsgGetLastBySubjectT, s.name, mreq.LastFor)
577 r, err := s.js.apiRequest(ctx, gmSubj, nil)
578 if err != nil {
579 return nil, err
580 }
581 return convertDirectGetMsgResponseToMsg(r.msg)
582 }
583 gmSubj = fmt.Sprintf(apiDirectMsgGetT, s.name)
584 r, err := s.js.apiRequest(ctx, gmSubj, req)
585 if err != nil {
586 return nil, err
587 }
588 return convertDirectGetMsgResponseToMsg(r.msg)
589 }
590
591 var resp apiMsgGetResponse
592 dsSubj := fmt.Sprintf(apiMsgGetT, s.name)
593 _, err = s.js.apiRequestJSON(ctx, dsSubj, &resp, req)
594 if err != nil {
595 return nil, err
596 }
597
598 if resp.Error != nil {
599 if resp.Error.ErrorCode == JSErrCodeMessageNotFound {
600 return nil, ErrMsgNotFound
601 }
602 return nil, resp.Error
603 }
604
605 msg := resp.Message
606
607 var hdr nats.Header
608 if len(msg.Header) > 0 {
609 hdr, err = nats.DecodeHeadersMsg(msg.Header)
610 if err != nil {
611 return nil, err
612 }
613 }
614
615 return &RawStreamMsg{
616 Subject: msg.Subject,
617 Sequence: msg.Sequence,
618 Header: hdr,

Callers 2

GetMsgMethod · 0.95
GetLastMsgForSubjectMethod · 0.95

Calls 4

apiRequestMethod · 0.80
apiRequestJSONMethod · 0.80

Tested by

no test coverage detected