MCPcopy
hub / github.com/nats-io/nats.go / get

Method get

jetstream/kv.go:922–982  ·  view source on GitHub ↗
(ctx context.Context, key string, revision uint64)

Source from the content-addressed store, hash-verified

920}
921
922func (kv *kvs) get(ctx context.Context, key string, revision uint64) (KeyValueEntry, error) {
923 if !keyValid(key) {
924 return nil, ErrInvalidKey
925 }
926
927 var b strings.Builder
928 b.WriteString(kv.pre)
929 b.WriteString(key)
930
931 var m *RawStreamMsg
932 var err error
933
934 if revision == kvLatestRevision {
935 m, err = kv.stream.GetLastMsgForSubject(ctx, b.String())
936 } else {
937 m, err = kv.stream.GetMsg(ctx, revision)
938 // If a sequence was provided, just make sure that the retrieved
939 // message subject matches the request.
940 if err == nil && m.Subject != b.String() {
941 return nil, ErrKeyNotFound
942 }
943 }
944 if err != nil {
945 if errors.Is(err, ErrMsgNotFound) {
946 err = ErrKeyNotFound
947 }
948 return nil, err
949 }
950
951 entry := &kve{
952 bucket: kv.name,
953 key: key,
954 value: m.Data,
955 revision: m.Sequence,
956 created: m.Time,
957 }
958
959 // Double check here that this is not a DEL Operation marker.
960 if len(m.Header) > 0 {
961 if m.Header.Get(kvop) != "" {
962 switch m.Header.Get(kvop) {
963 case kvdel:
964 entry.op = KeyValueDelete
965 case kvpurge:
966 entry.op = KeyValuePurge
967 }
968 } else if m.Header.Get(MarkerReasonHeader) != "" {
969 switch m.Header.Get(MarkerReasonHeader) {
970 case "MaxAge", "Purge":
971 entry.op = KeyValuePurge
972 case "Remove":
973 entry.op = KeyValueDelete
974 }
975 }
976 if entry.op != KeyValuePut {
977 return entry, ErrKeyDeleted
978 }
979 }

Callers 3

GetMethod · 0.95
GetRevisionMethod · 0.95
CreateMethod · 0.95

Calls 6

keyValidFunction · 0.70
GetLastMsgForSubjectMethod · 0.65
GetMsgMethod · 0.65
GetMethod · 0.65
StringMethod · 0.45
IsMethod · 0.45

Tested by

no test coverage detected