Skip to content

Commit

Permalink
core/state: use parallel trie insert to update storage trie
Browse files Browse the repository at this point in the history
When there are more than parallelInsertThreshold (currently set to 1000) pending
storages update to storage trie, we will use the new TryBatchInsert to parallel
insert these storages if possible.
  • Loading branch information
minh-bq committed Oct 10, 2024
1 parent ea23006 commit a9905e4
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 1 deletion.
87 changes: 87 additions & 0 deletions core/blockchain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package core

import (
"encoding/binary"
"errors"
"fmt"
"io/ioutil"
Expand Down Expand Up @@ -4312,3 +4313,89 @@ func TestSidecarsPruning(t *testing.T) {
}
}
}

func TestBlockChain_2000StorageUpdate(t *testing.T) {
var (
numTxs = 2000
signer = types.HomesteadSigner{}
testBankKey, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
testBankAddress = crypto.PubkeyToAddress(testBankKey.PublicKey)
bankFunds = big.NewInt(100000000000000000)
contractAddress = common.HexToAddress("0x1234")
gspec = Genesis{
Config: params.TestChainConfig,
Alloc: GenesisAlloc{
testBankAddress: {Balance: bankFunds},
contractAddress: {
Nonce: 1,
Balance: common.Big0,
// Store 1 into slot passed by calldata
Code: []byte{
byte(vm.PUSH0),
byte(vm.CALLDATALOAD),
byte(vm.PUSH1),
byte(0x1),
byte(vm.SWAP1),
byte(vm.SSTORE),
byte(vm.STOP),
},
Storage: make(map[common.Hash]common.Hash),
},
},
GasLimit: 100e6, // 100 M
}
)

for i := 0; i < 1000; i++ {
gspec.Alloc[contractAddress].Storage[common.BigToHash(big.NewInt(int64(i)))] = common.BigToHash(big.NewInt(0x100))
}

// Generate the original common chain segment and the two competing forks
engine := ethash.NewFaker()
db := rawdb.NewMemoryDatabase()
genesis := gspec.MustCommit(db)

blockGenerator := func(i int, block *BlockGen) {
block.SetCoinbase(common.Address{1})
for txi := 0; txi < numTxs; txi++ {
var calldata [32]byte
binary.BigEndian.PutUint64(calldata[:], uint64(txi))
tx, err := types.SignTx(
types.NewTransaction(uint64(txi), contractAddress, common.Big0, 100_000,
block.header.BaseFee, calldata[:]),
signer,
testBankKey)
if err != nil {
t.Error(err)
}
block.AddTx(tx)
}
}

shared, _ := GenerateChain(params.TestChainConfig, genesis, engine, db, 1, blockGenerator, true)
err := os.Mkdir("./pebble", 0775)
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll("./pebble")
// Import the shared chain and the original canonical one
diskdb, err := rawdb.NewPebbleDBDatabase("./pebble", 1024, 500000, "", false, false)
if err != nil {
t.Fatal(err)
}
defer diskdb.Close()
gspec.MustCommit(diskdb)

chain, err := NewBlockChain(diskdb, nil, params.TestChainConfig, engine, vm.Config{}, nil, nil)
if err != nil {
t.Fatalf("failed to create tester chain: %v", err)
}
if _, err := chain.InsertChain(shared, nil); err != nil {
t.Fatalf("failed to insert shared chain: %v", err)
}

blockHash := chain.CurrentBlock().Hash()
if blockHash != (common.HexToHash("0x684f656efba5a77f0e8b4c768a2b3479b28250fd7b81dbb9a888abf6180b01bd")) {
t.Fatalf("Block hash mismatches, exp %s got %s", common.Hash{}, blockHash)
}
}
24 changes: 23 additions & 1 deletion core/state/state_object.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,13 @@ import (
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/trie"
)

var emptyCodeHash = crypto.Keccak256(nil)

const ParallelInsertThreshold = 1000

type Code []byte

func (c Code) String() string {
Expand Down Expand Up @@ -338,6 +341,16 @@ func (s *stateObject) updateTrie(db Database) Trie {
tr := s.getTrie(db)
hasher := s.db.hasher

var (
parallelInsert, ok bool
secureTrie *trie.SecureTrie
keys, values [][]byte
)
if len(s.pendingStorage) > ParallelInsertThreshold {
if secureTrie, ok = tr.(*trie.SecureTrie); ok {
parallelInsert = true
}
}
usedStorage := make([][]byte, 0, len(s.pendingStorage))
for key, value := range s.pendingStorage {
// Skip noop changes, persist actual changes
Expand All @@ -353,8 +366,14 @@ func (s *stateObject) updateTrie(db Database) Trie {
} else {
// Encoding []byte cannot fail, ok to ignore the error.
v, _ = rlp.EncodeToBytes(common.TrimLeftZeroes(value[:]))
s.setError(tr.TryUpdate(key[:], v))
s.db.StorageUpdated += 1
if parallelInsert {
key := key
keys = append(keys, key[:])
values = append(values, v)
} else {
s.setError(tr.TryUpdate(key[:], v))
}
}
// If state snapshotting is active, cache the data til commit
if s.db.snap != nil {
Expand All @@ -369,6 +388,9 @@ func (s *stateObject) updateTrie(db Database) Trie {
}
usedStorage = append(usedStorage, common.CopyBytes(key[:])) // Copy needed for closure
}
if parallelInsert && len(keys) > 0 {
s.setError(secureTrie.TryBatchInsert(keys, values))
}
if s.db.prefetcher != nil {
s.db.prefetcher.used(s.data.Root, usedStorage)
}
Expand Down

0 comments on commit a9905e4

Please sign in to comment.