| 98 | ) |
| 99 | |
| 100 | func (db *DB) replayFunction() func(Entry, valuePointer) error { |
| 101 | type txnEntry struct { |
| 102 | nk []byte |
| 103 | v y.ValueStruct |
| 104 | } |
| 105 | |
| 106 | var txn []txnEntry |
| 107 | var lastCommit uint64 |
| 108 | |
| 109 | toLSM := func(nk []byte, vs y.ValueStruct) { |
| 110 | for err := db.ensureRoomForWrite(); err != nil; err = db.ensureRoomForWrite() { |
| 111 | db.opt.Debugf("Replay: Making room for writes") |
| 112 | time.Sleep(10 * time.Millisecond) |
| 113 | } |
| 114 | db.mt.Put(nk, vs) |
| 115 | } |
| 116 | |
| 117 | first := true |
| 118 | return func(e Entry, vp valuePointer) error { // Function for replaying. |
| 119 | if first { |
| 120 | db.opt.Debugf("First key=%q\n", e.Key) |
| 121 | } |
| 122 | first = false |
| 123 | db.orc.Lock() |
| 124 | if db.orc.nextTxnTs < y.ParseTs(e.Key) { |
| 125 | db.orc.nextTxnTs = y.ParseTs(e.Key) |
| 126 | } |
| 127 | db.orc.Unlock() |
| 128 | |
| 129 | nk := make([]byte, len(e.Key)) |
| 130 | copy(nk, e.Key) |
| 131 | var nv []byte |
| 132 | meta := e.meta |
| 133 | if db.shouldWriteValueToLSM(e) { |
| 134 | nv = make([]byte, len(e.Value)) |
| 135 | copy(nv, e.Value) |
| 136 | } else { |
| 137 | nv = vp.Encode() |
| 138 | meta = meta | bitValuePointer |
| 139 | // Update vhead. If the crash happens while replay was in progess |
| 140 | // and the head is not updated, we will end up replaying all the |
| 141 | // files again. |
| 142 | db.updateHead([]valuePointer{vp}) |
| 143 | } |
| 144 | |
| 145 | v := y.ValueStruct{ |
| 146 | Value: nv, |
| 147 | Meta: meta, |
| 148 | UserMeta: e.UserMeta, |
| 149 | ExpiresAt: e.ExpiresAt, |
| 150 | } |
| 151 | |
| 152 | switch { |
| 153 | case e.meta&bitFinTxn > 0: |
| 154 | txnTs, err := strconv.ParseUint(string(e.Value), 10, 64) |
| 155 | if err != nil { |
| 156 | return errors.Wrapf(err, "Unable to parse txn fin: %q", e.Value) |
| 157 | } |