MCPcopy
hub / github.com/dgraph-io/badger / replayFunction

Method replayFunction

db.go:100–191  ·  view source on GitHub ↗
()

Source from the content-addressed store, hash-verified

98)
99
100func (db *DB) replayFunction() func(Entry, valuePointer) error {
101 type txnEntry struct {
102 nk []byte
103 v y.ValueStruct
104 }
105
106 var txn []txnEntry
107 var lastCommit uint64
108
109 toLSM := func(nk []byte, vs y.ValueStruct) {
110 for err := db.ensureRoomForWrite(); err != nil; err = db.ensureRoomForWrite() {
111 db.opt.Debugf("Replay: Making room for writes")
112 time.Sleep(10 * time.Millisecond)
113 }
114 db.mt.Put(nk, vs)
115 }
116
117 first := true
118 return func(e Entry, vp valuePointer) error { // Function for replaying.
119 if first {
120 db.opt.Debugf("First key=%q\n", e.Key)
121 }
122 first = false
123 db.orc.Lock()
124 if db.orc.nextTxnTs < y.ParseTs(e.Key) {
125 db.orc.nextTxnTs = y.ParseTs(e.Key)
126 }
127 db.orc.Unlock()
128
129 nk := make([]byte, len(e.Key))
130 copy(nk, e.Key)
131 var nv []byte
132 meta := e.meta
133 if db.shouldWriteValueToLSM(e) {
134 nv = make([]byte, len(e.Value))
135 copy(nv, e.Value)
136 } else {
137 nv = vp.Encode()
138 meta = meta | bitValuePointer
139 // Update vhead. If the crash happens while replay was in progess
140 // and the head is not updated, we will end up replaying all the
141 // files again.
142 db.updateHead([]valuePointer{vp})
143 }
144
145 v := y.ValueStruct{
146 Value: nv,
147 Meta: meta,
148 UserMeta: e.UserMeta,
149 ExpiresAt: e.ExpiresAt,
150 }
151
152 switch {
153 case e.meta&bitFinTxn > 0:
154 txnTs, err := strconv.ParseUint(string(e.Value), 10, 64)
155 if err != nil {
156 return errors.Wrapf(err, "Unable to parse txn fin: %q", e.Value)
157 }

Callers 2

OpenFunction · 0.80

Calls 9

ensureRoomForWriteMethod · 0.95
shouldWriteValueToLSMMethod · 0.95
updateHeadMethod · 0.95
ParseTsFunction · 0.92
AssertTrueFunction · 0.92
PutMethod · 0.80
DebugfMethod · 0.65
WarningfMethod · 0.65
EncodeMethod · 0.45

Tested by 1