| 920 | } |
| 921 | |
| 922 | func (kv *kvs) get(ctx context.Context, key string, revision uint64) (KeyValueEntry, error) { |
| 923 | if !keyValid(key) { |
| 924 | return nil, ErrInvalidKey |
| 925 | } |
| 926 | |
| 927 | var b strings.Builder |
| 928 | b.WriteString(kv.pre) |
| 929 | b.WriteString(key) |
| 930 | |
| 931 | var m *RawStreamMsg |
| 932 | var err error |
| 933 | |
| 934 | if revision == kvLatestRevision { |
| 935 | m, err = kv.stream.GetLastMsgForSubject(ctx, b.String()) |
| 936 | } else { |
| 937 | m, err = kv.stream.GetMsg(ctx, revision) |
| 938 | // If a sequence was provided, just make sure that the retrieved |
| 939 | // message subject matches the request. |
| 940 | if err == nil && m.Subject != b.String() { |
| 941 | return nil, ErrKeyNotFound |
| 942 | } |
| 943 | } |
| 944 | if err != nil { |
| 945 | if errors.Is(err, ErrMsgNotFound) { |
| 946 | err = ErrKeyNotFound |
| 947 | } |
| 948 | return nil, err |
| 949 | } |
| 950 | |
| 951 | entry := &kve{ |
| 952 | bucket: kv.name, |
| 953 | key: key, |
| 954 | value: m.Data, |
| 955 | revision: m.Sequence, |
| 956 | created: m.Time, |
| 957 | } |
| 958 | |
| 959 | // Double check here that this is not a DEL Operation marker. |
| 960 | if len(m.Header) > 0 { |
| 961 | if m.Header.Get(kvop) != "" { |
| 962 | switch m.Header.Get(kvop) { |
| 963 | case kvdel: |
| 964 | entry.op = KeyValueDelete |
| 965 | case kvpurge: |
| 966 | entry.op = KeyValuePurge |
| 967 | } |
| 968 | } else if m.Header.Get(MarkerReasonHeader) != "" { |
| 969 | switch m.Header.Get(MarkerReasonHeader) { |
| 970 | case "MaxAge", "Purge": |
| 971 | entry.op = KeyValuePurge |
| 972 | case "Remove": |
| 973 | entry.op = KeyValueDelete |
| 974 | } |
| 975 | } |
| 976 | if entry.op != KeyValuePut { |
| 977 | return entry, ErrKeyDeleted |
| 978 | } |
| 979 | } |