-
Notifications
You must be signed in to change notification settings - Fork 268
/
import.go
239 lines (209 loc) · 6.41 KB
/
import.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
package iavl
import (
"bytes"
"errors"
"fmt"
"cosmossdk.io/core/store"
)
// maxBatchSize is the maximum size of the import batch before flushing it to the database
const maxBatchSize = 10000
// ErrNoImport is returned when calling methods on a closed importer
var ErrNoImport = errors.New("no import in progress")
// Importer imports data into an empty MutableTree. It is created by MutableTree.Import(). Users
// must call Close() when done.
//
// ExportNodes must be imported in the order returned by Exporter, i.e. depth-first post-order (LRN).
//
// Importer is not concurrency-safe, it is the caller's responsibility to ensure the tree is not
// modified while performing an import.
type Importer struct {
tree *MutableTree
version int64
batch store.Batch
batchSize uint32
stack []*Node
nonces []uint32
// inflightCommit tracks a batch commit, if any.
inflightCommit <-chan error
}
// newImporter creates a new Importer for an empty MutableTree.
//
// version should correspond to the version that was initially exported. It must be greater than
// or equal to the highest ExportNode version number given.
func newImporter(tree *MutableTree, version int64) (*Importer, error) {
if version < 0 {
return nil, errors.New("imported version cannot be negative")
}
if tree.ndb.latestVersion > 0 {
return nil, fmt.Errorf("found database at version %d, must be 0", tree.ndb.latestVersion)
}
if !tree.IsEmpty() {
return nil, errors.New("tree must be empty")
}
return &Importer{
tree: tree,
version: version,
batch: tree.ndb.db.NewBatch(),
stack: make([]*Node, 0, 8),
nonces: make([]uint32, version+1),
}, nil
}
// writeNode writes the node content to the storage.
func (i *Importer) writeNode(node *Node) error {
node._hash(node.nodeKey.version)
if err := node.validate(); err != nil {
return err
}
buf := bufPool.Get().(*bytes.Buffer)
buf.Reset()
defer bufPool.Put(buf)
if err := node.writeBytes(buf); err != nil {
return err
}
bytesCopy := make([]byte, buf.Len())
copy(bytesCopy, buf.Bytes())
if err := i.batch.Set(i.tree.ndb.nodeKey(node.GetKey()), bytesCopy); err != nil {
return err
}
i.batchSize++
if i.batchSize >= maxBatchSize {
// Wait for previous batch.
var err error
if i.inflightCommit != nil {
err = <-i.inflightCommit
i.inflightCommit = nil
}
if err != nil {
return err
}
result := make(chan error)
i.inflightCommit = result
go func(batch store.Batch) {
defer batch.Close()
result <- batch.Write()
}(i.batch)
i.batch = i.tree.ndb.db.NewBatch()
i.batchSize = 0
}
return nil
}
// Close frees all resources. It is safe to call multiple times. Uncommitted nodes may already have
// been flushed to the database, but will not be visible.
func (i *Importer) Close() {
if i.inflightCommit != nil {
<-i.inflightCommit
i.inflightCommit = nil
}
if i.batch != nil {
i.batch.Close()
}
i.batch = nil
i.tree = nil
}
// Add adds an ExportNode to the import. ExportNodes must be added in the order returned by
// Exporter, i.e. depth-first post-order (LRN). Nodes are periodically flushed to the database,
// but the imported version is not visible until Commit() is called.
func (i *Importer) Add(exportNode *ExportNode) error {
if i.tree == nil {
return ErrNoImport
}
if exportNode == nil {
return errors.New("node cannot be nil")
}
if exportNode.Version > i.version {
return fmt.Errorf("node version %v can't be greater than import version %v",
exportNode.Version, i.version)
}
node := &Node{
key: exportNode.Key,
value: exportNode.Value,
subtreeHeight: exportNode.Height,
}
// We build the tree from the bottom-left up. The stack is used to store unresolved left
// children while constructing right children. When all children are built, the parent can
// be constructed and the resolved children can be discarded from the stack. Using a stack
// ensures that we can handle additional unresolved left children while building a right branch.
//
// We don't modify the stack until we've verified the built node, to avoid leaving the
// importer in an inconsistent state when we return an error.
stackSize := len(i.stack)
if node.subtreeHeight == 0 {
node.size = 1
} else if stackSize >= 2 && i.stack[stackSize-1].subtreeHeight < node.subtreeHeight && i.stack[stackSize-2].subtreeHeight < node.subtreeHeight {
leftNode := i.stack[stackSize-2]
rightNode := i.stack[stackSize-1]
node.leftNode = leftNode
node.rightNode = rightNode
node.leftNodeKey = leftNode.GetKey()
node.rightNodeKey = rightNode.GetKey()
node.size = leftNode.size + rightNode.size
// Update the stack now.
if err := i.writeNode(leftNode); err != nil {
return err
}
if err := i.writeNode(rightNode); err != nil {
return err
}
i.stack = i.stack[:stackSize-2]
// remove the recursive references to avoid memory leak
leftNode.leftNode = nil
leftNode.rightNode = nil
rightNode.leftNode = nil
rightNode.rightNode = nil
}
i.nonces[exportNode.Version]++
node.nodeKey = &NodeKey{
version: exportNode.Version,
// Nonce is 1-indexed, but start at 2 since the root node having a nonce of 1.
nonce: i.nonces[exportNode.Version] + 1,
}
i.stack = append(i.stack, node)
return nil
}
// Commit finalizes the import by flushing any outstanding nodes to the database, making the
// version visible, and updating the tree metadata. It can only be called once, and calls Close()
// internally.
func (i *Importer) Commit() error {
if i.tree == nil {
return ErrNoImport
}
switch len(i.stack) {
case 0:
if err := i.batch.Set(i.tree.ndb.nodeKey(GetRootKey(i.version)), []byte{}); err != nil {
return err
}
case 1:
i.stack[0].nodeKey.nonce = 1
if err := i.writeNode(i.stack[0]); err != nil {
return err
}
if i.stack[0].nodeKey.version < i.version { // it means there is no update in the given version
if err := i.batch.Set(i.tree.ndb.nodeKey(GetRootKey(i.version)), i.tree.ndb.nodeKey(i.stack[0].nodeKey.GetKey())); err != nil {
return err
}
}
default:
return fmt.Errorf("invalid node structure, found stack size %v when committing",
len(i.stack))
}
// Wait for previous batch.
var err error
if i.inflightCommit != nil {
err = <-i.inflightCommit
i.inflightCommit = nil
}
if err != nil {
return err
}
err = i.batch.WriteSync()
if err != nil {
return err
}
i.tree.ndb.resetLatestVersion(i.version)
_, err = i.tree.LoadVersion(i.version)
if err != nil {
return err
}
i.Close()
return nil
}