From 8c3b399f4dc1e409445534a19c7ab69801682e8c Mon Sep 17 00:00:00 2001 From: ethanoroshiba Date: Wed, 6 Nov 2024 12:45:27 -0600 Subject: [PATCH 1/3] refactor sequencer app --- .../src/accounts/component.rs | 26 +- crates/astria-sequencer/src/app/app_abci.rs | 531 ++++++ .../src/app/benchmark_and_test_utils.rs | 1 + crates/astria-sequencer/src/app/benchmarks.rs | 2 +- crates/astria-sequencer/src/app/mod.rs | 1453 ++++++----------- .../src/app/tests_app/mempool.rs | 1 + .../astria-sequencer/src/app/tests_app/mod.rs | 82 +- .../src/app/tests_block_ordering.rs | 1 + .../src/app/tests_breaking_changes.rs | 5 +- .../src/authority/component.rs | 97 -- .../astria-sequencer/src/authority/genesis.rs | 42 + crates/astria-sequencer/src/authority/mod.rs | 5 +- crates/astria-sequencer/src/component.rs | 56 - .../src/fees/{component.rs => genesis.rs} | 26 +- crates/astria-sequencer/src/fees/mod.rs | 2 +- crates/astria-sequencer/src/genesis.rs | 18 + .../src/ibc/{component.rs => genesis.rs} | 33 +- crates/astria-sequencer/src/ibc/mod.rs | 2 +- crates/astria-sequencer/src/lib.rs | 2 +- .../astria-sequencer/src/service/consensus.rs | 5 +- .../src/service/mempool/tests.rs | 1 + 21 files changed, 1178 insertions(+), 1213 deletions(-) create mode 100644 crates/astria-sequencer/src/app/app_abci.rs delete mode 100644 crates/astria-sequencer/src/authority/component.rs create mode 100644 crates/astria-sequencer/src/authority/genesis.rs delete mode 100644 crates/astria-sequencer/src/component.rs rename crates/astria-sequencer/src/fees/{component.rs => genesis.rs} (89%) create mode 100644 crates/astria-sequencer/src/genesis.rs rename crates/astria-sequencer/src/ibc/{component.rs => genesis.rs} (57%) diff --git a/crates/astria-sequencer/src/accounts/component.rs b/crates/astria-sequencer/src/accounts/component.rs index c9d515ec45..23f4d210d0 100644 --- a/crates/astria-sequencer/src/accounts/component.rs +++ b/crates/astria-sequencer/src/accounts/component.rs @@ -1,28 +1,22 @@ -use std::sync::Arc; - use astria_core::protocol::genesis::v1::GenesisAppState; use astria_eyre::eyre::{ OptionExt as _, Result, WrapErr as _, }; -use tendermint::abci::request::{ - BeginBlock, - EndBlock, -}; use tracing::instrument; use crate::{ accounts, assets, - component::Component, + genesis::Genesis, }; #[derive(Default)] pub(crate) struct AccountsComponent; #[async_trait::async_trait] -impl Component for AccountsComponent { +impl Genesis for AccountsComponent { type AppState = GenesisAppState; #[instrument(name = "AccountsComponent::init_chain", skip_all)] @@ -47,20 +41,4 @@ impl Component for AccountsComponent { } Ok(()) } - - #[instrument(name = "AccountsComponent::begin_block", skip_all)] - async fn begin_block( - _state: &mut Arc, - _begin_block: &BeginBlock, - ) -> Result<()> { - Ok(()) - } - - #[instrument(name = "AccountsComponent::end_block", skip_all)] - async fn end_block( - _state: &mut Arc, - _end_block: &EndBlock, - ) -> Result<()> { - Ok(()) - } } diff --git a/crates/astria-sequencer/src/app/app_abci.rs b/crates/astria-sequencer/src/app/app_abci.rs new file mode 100644 index 0000000000..7e8eb2450b --- /dev/null +++ b/crates/astria-sequencer/src/app/app_abci.rs @@ -0,0 +1,531 @@ +use std::{ + collections::VecDeque, + sync::Arc, +}; + +use astria_core::{ + protocol::{ + abci::AbciErrorCode, + genesis::v1::GenesisAppState, + transaction::v1::{ + action::ValidatorUpdate, + Transaction, + }, + }, + Protobuf as _, +}; +use astria_eyre::eyre::{ + bail, + ensure, + eyre, + OptionExt as _, + Result, + WrapErr as _, +}; +use cnidarium::{ + ArcStateDeltaExt, + StateDelta, + StateRead, + Storage, +}; +use prost::Message as _; +use tendermint::{ + abci::{ + self, + types::ExecTxResult, + Code, + }, + AppHash, +}; +use tracing::{ + debug, + instrument, +}; + +use super::{ + App, + BlockData, + Mempool, +}; +use crate::{ + accounts::component::AccountsComponent, + address::StateWriteExt as _, + app::{ + PostTransactionExecutionResult, + StateWriteExt as _, + EXECUTION_RESULTS_KEY, + POST_TRANSACTION_EXECUTION_RESULT_KEY, + }, + assets::StateWriteExt as _, + authority::genesis::{ + AuthorityComponent, + AuthorityComponentAppState, + }, + bridge::StateReadExt as _, + fees::genesis::FeesComponent, + genesis::Genesis as _, + ibc::genesis::IbcComponent, + proposal::{ + block_size_constraints::BlockSizeConstraints, + commitment::{ + generate_rollup_datas_commitment, + GeneratedCommitments, + }, + }, + transaction::InvalidNonce, +}; + +#[async_trait::async_trait] +pub(crate) trait AppAbci { + async fn commit(&mut self, storage: Storage); + async fn finalize_block( + &mut self, + finalize_block: abci::request::FinalizeBlock, + storage: Storage, + ) -> Result; + async fn init_chain( + &mut self, + snapshot: Storage, + genesis_state: GenesisAppState, + genesis_validators: Vec, + chain_id: String, + ) -> Result; + async fn prepare_proposal( + &mut self, + prepare_proposal: abci::request::PrepareProposal, + storage: Storage, + ) -> Result; + async fn process_proposal( + &mut self, + process_proposal: abci::request::ProcessProposal, + storage: Storage, + ) -> Result<()>; +} + +#[async_trait::async_trait] +impl AppAbci for App { + #[instrument(name = "App::commit", skip_all)] + async fn commit(&mut self, storage: Storage) { + // Commit the pending writes, clearing the state. + let app_hash = storage + .commit_batch(self.write_batch.take().expect( + "write batch must be set, as `finalize_block` is always called before `commit`", + )) + .expect("must be able to successfully commit to storage"); + tracing::debug!( + app_hash = %telemetry::display::hex(&app_hash), + "finished committing state", + ); + self.app_hash = app_hash + .0 + .to_vec() + .try_into() + .expect("root hash to app hash conversion must succeed"); + + // Get the latest version of the state, now that we've committed it. + self.state = Arc::new(StateDelta::new(storage.latest_snapshot())); + } + + /// Executes the given block, but does not write it to disk. + /// + /// `commit` must be called after this to write the block to disk. + /// + /// This is called by cometbft after the block has already been + /// committed by the network's consensus. + #[instrument(name = "App::finalize_block", skip_all)] + async fn finalize_block( + &mut self, + finalize_block: abci::request::FinalizeBlock, + storage: Storage, + ) -> Result { + // If we previously executed txs in a different proposal than is being processed, + // reset cached state changes. + if self.executed_proposal_hash != finalize_block.hash { + self.update_state_for_new_round(&storage); + } + + ensure!( + finalize_block.txs.len() >= 2, + "block must contain at least two transactions: the rollup transactions commitment and + rollup IDs commitment" + ); + + // When the hash is not empty, we have already executed and cached the results + if self.executed_proposal_hash.is_empty() { + // convert tendermint id to astria address; this assumes they are + // the same address, as they are both ed25519 keys + let proposer_address = finalize_block.proposer_address; + let height = finalize_block.height; + let time = finalize_block.time; + + // we haven't executed anything yet, so set up the state for execution. + let block_data = BlockData { + misbehavior: finalize_block.misbehavior, + height, + time, + }; + + self.prepare_state_for_execution(block_data) + .await + .wrap_err("failed to prepare app state for execution")?; + + let mut tx_results = Vec::with_capacity(finalize_block.txs.len()); + // skip the first two transactions, as they are the rollup data commitments + for tx in finalize_block.txs.iter().skip(2) { + let signed_tx = signed_transaction_from_bytes(tx) + .wrap_err("protocol error; only valid txs should be finalized")?; + + match self.execute_transaction(Arc::new(signed_tx)).await { + Ok(events) => tx_results.push(ExecTxResult { + events, + ..Default::default() + }), + Err(e) => { + // this is actually a protocol error, as only valid txs should be finalized + tracing::error!( + error = AsRef::::as_ref(&e), + "failed to finalize transaction; ignoring it", + ); + let code = if e.downcast_ref::().is_some() { + AbciErrorCode::INVALID_NONCE + } else { + AbciErrorCode::INTERNAL_ERROR + }; + tx_results.push(ExecTxResult { + code: Code::Err(code.value()), + info: code.info(), + log: format!("{e:#}"), + ..Default::default() + }); + } + } + } + + self.update_state_and_end_block( + finalize_block.hash, + height, + time, + proposer_address, + finalize_block.txs, + tx_results, + ) + .await + .wrap_err("failed to update state post-execution")?; + } + + // update the priority of any txs in the mempool based on the updated app state + if self.recost_mempool { + self.metrics.increment_mempool_recosted(); + } + update_mempool_after_finalization(&mut self.mempool, &self.state, self.recost_mempool) + .await; + + let post_transaction_execution_result: PostTransactionExecutionResult = self + .state + .object_get(POST_TRANSACTION_EXECUTION_RESULT_KEY) + .expect( + "post_transaction_execution_result must be present, as txs were already executed \ + just now or during the proposal phase", + ); + + // prepare the `StagedWriteBatch` for a later commit. + let app_hash = self + .prepare_commit(storage) + .await + .wrap_err("failed to prepare commit")?; + let finalize_block = abci::response::FinalizeBlock { + events: post_transaction_execution_result.events, + validator_updates: post_transaction_execution_result.validator_updates, + consensus_param_updates: None, + app_hash, + tx_results: post_transaction_execution_result.tx_results, + }; + + Ok(finalize_block) + } + + #[instrument(name = "App:init_chain", skip_all)] + async fn init_chain( + &mut self, + storage: Storage, + genesis_state: GenesisAppState, + genesis_validators: Vec, + chain_id: String, + ) -> Result { + let mut state_tx = self + .state + .try_begin_transaction() + .expect("state Arc should not be referenced elsewhere"); + + state_tx + .put_base_prefix(genesis_state.address_prefixes().base().to_string()) + .wrap_err("failed to write base prefix to state")?; + state_tx + .put_ibc_compat_prefix(genesis_state.address_prefixes().ibc_compat().to_string()) + .wrap_err("failed to write ibc-compat prefix to state")?; + + if let Some(native_asset) = genesis_state.native_asset_base_denomination() { + state_tx + .put_native_asset(native_asset.clone()) + .wrap_err("failed to write native asset to state")?; + state_tx + .put_ibc_asset(native_asset.clone()) + .wrap_err("failed to commit native asset as ibc asset to state")?; + } + + state_tx + .put_chain_id_and_revision_number(chain_id.try_into().context("invalid chain ID")?) + .wrap_err("failed to write chain id to state")?; + state_tx + .put_block_height(0) + .wrap_err("failed to write block height to state")?; + + // call init_chain on all components + FeesComponent::init_chain(&mut state_tx, &genesis_state) + .await + .wrap_err("init_chain failed on FeesComponent")?; + AccountsComponent::init_chain(&mut state_tx, &genesis_state) + .await + .wrap_err("init_chain failed on AccountsComponent")?; + AuthorityComponent::init_chain( + &mut state_tx, + &AuthorityComponentAppState { + authority_sudo_address: *genesis_state.authority_sudo_address(), + genesis_validators, + }, + ) + .await + .wrap_err("init_chain failed on AuthorityComponent")?; + IbcComponent::init_chain(&mut state_tx, &genesis_state) + .await + .wrap_err("init_chain failed on IbcComponent")?; + + state_tx.apply(); + + let app_hash = self + .prepare_commit(storage) + .await + .wrap_err("failed to prepare commit")?; + debug!(app_hash = %telemetry::display::base64(&app_hash), "init_chain completed"); + Ok(app_hash) + } + + /// Generates a commitment to the `sequence::Actions` in the block's transactions. + /// + /// This is required so that a rollup can easily verify that the transactions it + /// receives are correct (ie. we actually included in a sequencer block, and none + /// are missing) + /// It puts this special "commitment" as the first transaction in a block. + /// When other validators receive the block, they know the first transaction is + /// supposed to be the commitment, and verifies that is it correct. + #[instrument(name = "App::prepare_proposal", skip_all)] + async fn prepare_proposal( + &mut self, + prepare_proposal: abci::request::PrepareProposal, + storage: Storage, + ) -> Result { + self.executed_proposal_fingerprint = Some(prepare_proposal.clone().into()); + self.update_state_for_new_round(&storage); + + let mut block_size_constraints = BlockSizeConstraints::new( + usize::try_from(prepare_proposal.max_tx_bytes) + .wrap_err("failed to convert max_tx_bytes to usize")?, + ) + .wrap_err("failed to create block size constraints")?; + + let block_data = BlockData { + misbehavior: prepare_proposal.misbehavior, + height: prepare_proposal.height, + time: prepare_proposal.time, + }; + + self.prepare_state_for_execution(block_data) + .await + .wrap_err("failed to prepare for executing block")?; + + // ignore the txs passed by cometbft in favour of our app-side mempool + let (included_tx_bytes, signed_txs_included) = self + .prepare_proposal_tx_execution(&mut block_size_constraints) + .await + .wrap_err("failed to execute transactions")?; + self.metrics + .record_proposal_transactions(signed_txs_included.len()); + + let deposits = self.state.get_cached_block_deposits(); + self.metrics.record_proposal_deposits(deposits.len()); + + // generate commitment to sequence::Actions and deposits and commitment to the rollup IDs + // included in the block + let res = generate_rollup_datas_commitment(&signed_txs_included, deposits); + let txs = res.into_transactions(included_tx_bytes); + Ok(abci::response::PrepareProposal { + txs, + }) + } + + /// Generates a commitment to the `sequence::Actions` in the block's transactions + /// and ensures it matches the commitment created by the proposer, which + /// should be the first transaction in the block. + #[instrument(name = "App::process_proposal", skip_all)] + async fn process_proposal( + &mut self, + process_proposal: abci::request::ProcessProposal, + storage: Storage, + ) -> Result<()> { + // if we proposed this block (ie. prepare_proposal was called directly before this), then + // we skip execution for this `process_proposal` call. + // + // if we didn't propose this block, `self.validator_address` will be None or a different + // value, so we will execute block as normal. + if let Some(constructed_id) = self.executed_proposal_fingerprint { + let proposal_id = process_proposal.clone().into(); + if constructed_id == proposal_id { + debug!("skipping process_proposal as we are the proposer for this block"); + self.executed_proposal_fingerprint = None; + self.executed_proposal_hash = process_proposal.hash; + + // if we're the proposer, we should have the execution results from + // `prepare_proposal`. run the post-tx-execution hook to generate the + // `SequencerBlock` and to set `self.finalize_block`. + // + // we can't run this in `prepare_proposal` as we don't know the block hash there. + let Some(tx_results) = self.state.object_get(EXECUTION_RESULTS_KEY) else { + bail!("execution results must be present after executing transactions") + }; + + self.update_state_and_end_block( + process_proposal.hash, + process_proposal.height, + process_proposal.time, + process_proposal.proposer_address, + process_proposal.txs, + tx_results, + ) + .await + .wrap_err("failed to run post execute transactions handler")?; + + return Ok(()); + } + self.metrics.increment_process_proposal_skipped_proposal(); + debug!( + "our validator address was set but we're not the proposer, so our previous \ + proposal was skipped, executing block" + ); + self.executed_proposal_fingerprint = None; + } + + self.update_state_for_new_round(&storage); + + let mut txs = VecDeque::from(process_proposal.txs.clone()); + let received_rollup_datas_root: [u8; 32] = txs + .pop_front() + .ok_or_eyre("no transaction commitment in proposal")? + .to_vec() + .try_into() + .map_err(|_| eyre!("transaction commitment must be 32 bytes"))?; + + let received_rollup_ids_root: [u8; 32] = txs + .pop_front() + .ok_or_eyre("no chain IDs commitment in proposal")? + .to_vec() + .try_into() + .map_err(|_| eyre!("chain IDs commitment must be 32 bytes"))?; + + let expected_txs_len = txs.len(); + + let block_data = BlockData { + misbehavior: process_proposal.misbehavior, + height: process_proposal.height, + time: process_proposal.time, + }; + + self.prepare_state_for_execution(block_data) + .await + .wrap_err("failed to prepare for executing block")?; + + // we don't care about the cometbft max_tx_bytes here, as cometbft would have + // rejected the proposal if it was too large. + // however, we should still validate the other constraints, namely + // the max sequenced data bytes. + let mut block_size_constraints = BlockSizeConstraints::new_unlimited_cometbft(); + + // deserialize txs into `Transaction`s; + // this does not error if any txs fail to be deserialized, but the `execution_results.len()` + // check below ensures that all txs in the proposal are deserializable (and + // executable). + let signed_txs = txs + .into_iter() + .filter_map(|bytes| signed_transaction_from_bytes(bytes.as_ref()).ok()) + .collect::>(); + + let tx_results = self + .process_proposal_tx_execution(signed_txs.clone(), &mut block_size_constraints) + .await + .wrap_err("failed to execute transactions")?; + + // all txs in the proposal should be deserializable and executable + // if any txs were not deserializeable or executable, they would not have been + // added to the `tx_results` list, thus the length of `txs_to_include` + // will be shorter than that of `tx_results`. + ensure!( + tx_results.len() == expected_txs_len, + "transactions to be included do not match expected", + ); + self.metrics.record_proposal_transactions(signed_txs.len()); + + let deposits = self.state.get_cached_block_deposits(); + self.metrics.record_proposal_deposits(deposits.len()); + + let GeneratedCommitments { + rollup_datas_root: expected_rollup_datas_root, + rollup_ids_root: expected_rollup_ids_root, + } = generate_rollup_datas_commitment(&signed_txs, deposits); + ensure!( + received_rollup_datas_root == expected_rollup_datas_root, + "transaction commitment does not match expected", + ); + + ensure!( + received_rollup_ids_root == expected_rollup_ids_root, + "chain IDs commitment does not match expected", + ); + + self.executed_proposal_hash = process_proposal.hash; + self.update_state_and_end_block( + process_proposal.hash, + process_proposal.height, + process_proposal.time, + process_proposal.proposer_address, + process_proposal.txs, + tx_results, + ) + .await + .wrap_err("failed to run post execute transactions handler")?; + + Ok(()) + } +} + +// updates the mempool to reflect current state +// +// NOTE: this function locks the mempool until all accounts have been cleaned. +// this could potentially stall consensus from moving to the next round if +// the mempool is large, especially if recosting transactions. +async fn update_mempool_after_finalization( + mempool: &mut Mempool, + state: &S, + recost: bool, +) { + mempool.run_maintenance(state, recost).await; +} + +fn signed_transaction_from_bytes(bytes: &[u8]) -> Result { + use astria_core::generated::protocol::transaction::v1 as Raw; + + let raw = Raw::Transaction::decode(bytes) + .wrap_err("failed to decode protobuf to signed transaction")?; + let tx = Transaction::try_from_raw(raw) + .wrap_err("failed to transform raw signed transaction to verified type")?; + + Ok(tx) +} diff --git a/crates/astria-sequencer/src/app/benchmark_and_test_utils.rs b/crates/astria-sequencer/src/app/benchmark_and_test_utils.rs index 6c46e5c6e9..48d5ff8bce 100644 --- a/crates/astria-sequencer/src/app/benchmark_and_test_utils.rs +++ b/crates/astria-sequencer/src/app/benchmark_and_test_utils.rs @@ -39,6 +39,7 @@ use cnidarium::{ }; use telemetry::Metrics as _; +use super::app_abci::AppAbci as _; use crate::{ accounts::StateWriteExt as _, app::App, diff --git a/crates/astria-sequencer/src/app/benchmarks.rs b/crates/astria-sequencer/src/app/benchmarks.rs index 0b6aa3ef2b..e40f5e2b1b 100644 --- a/crates/astria-sequencer/src/app/benchmarks.rs +++ b/crates/astria-sequencer/src/app/benchmarks.rs @@ -105,7 +105,7 @@ fn execute_transactions_prepare_proposal(bencher: divan::Bencher) { let (_tx_bytes, included_txs) = runtime.block_on(async { fixture .app - .execute_transactions_prepare_proposal(constraints) + .prepare_proposal_tx_execution(constraints) .await .unwrap() }); diff --git a/crates/astria-sequencer/src/app/mod.rs b/crates/astria-sequencer/src/app/mod.rs index 99d13a115c..80a075b669 100644 --- a/crates/astria-sequencer/src/app/mod.rs +++ b/crates/astria-sequencer/src/app/mod.rs @@ -1,4 +1,5 @@ mod action_handler; +pub(crate) mod app_abci; #[cfg(any(test, feature = "benchmark"))] pub(crate) mod benchmark_and_test_utils; #[cfg(feature = "benchmark")] @@ -16,24 +17,13 @@ mod tests_breaking_changes; #[cfg(test)] mod tests_execute_transaction; -use std::{ - collections::VecDeque, - sync::Arc, -}; +use std::sync::Arc; use astria_core::{ - generated::protocol::transaction::v1 as raw, - protocol::{ - abci::AbciErrorCode, - genesis::v1::GenesisAppState, - transaction::v1::{ - action::{ - group::Group, - ValidatorUpdate, - }, - Action, - Transaction, - }, + protocol::transaction::v1::{ + action::group::Group, + Action, + Transaction, }, sequencerblock::v1::block::SequencerBlock, Protobuf as _, @@ -42,9 +32,8 @@ use astria_eyre::{ anyhow_to_eyre, eyre::{ bail, - ensure, eyre, - OptionExt as _, + OptionExt, Result, WrapErr as _, }, @@ -54,7 +43,6 @@ use cnidarium::{ Snapshot, StagedWriteBatch, StateDelta, - StateRead, StateWrite, Storage, }; @@ -68,11 +56,9 @@ use tendermint::{ abci::{ self, types::ExecTxResult, - Code, Event, }, account, - block::Header, AppHash, Hash, }; @@ -90,17 +76,8 @@ pub(crate) use self::{ }, }; use crate::{ - accounts::{ - component::AccountsComponent, - StateWriteExt as _, - }, - address::StateWriteExt as _, - assets::StateWriteExt as _, + accounts::StateWriteExt as _, authority::{ - component::{ - AuthorityComponent, - AuthorityComponentAppState, - }, StateReadExt as _, StateWriteExt as _, }, @@ -108,25 +85,14 @@ use crate::{ StateReadExt as _, StateWriteExt as _, }, - component::Component as _, - fees::{ - component::FeesComponent, - StateReadExt as _, - }, + fees::StateReadExt as _, grpc::StateWriteExt as _, - ibc::component::IbcComponent, mempool::{ Mempool, RemovalReason, }, metrics::Metrics, - proposal::{ - block_size_constraints::BlockSizeConstraints, - commitment::{ - generate_rollup_datas_commitment, - GeneratedCommitments, - }, - }, + proposal::block_size_constraints::BlockSizeConstraints, transaction::InvalidNonce, }; @@ -269,280 +235,221 @@ impl App { }) } - #[instrument(name = "App:init_chain", skip_all)] - pub(crate) async fn init_chain( + #[instrument(skip_all, err)] + pub(crate) async fn prepare_commit(&mut self, storage: Storage) -> Result { + // extract the state we've built up to so we can prepare it as a `StagedWriteBatch`. + let dummy_state = StateDelta::new(storage.latest_snapshot()); + let mut state = Arc::try_unwrap(std::mem::replace(&mut self.state, Arc::new(dummy_state))) + .expect("we have exclusive ownership of the State at commit()"); + + // store the storage version indexed by block height + let new_version = storage.latest_version().wrapping_add(1); + let height = state + .get_block_height() + .await + .expect("block height must be set, as `put_block_height` was already called"); + state + .put_storage_version_by_height(height, new_version) + .wrap_err("failed to put storage version by height")?; + debug!( + height, + version = new_version, + "stored storage version for height" + ); + + let write_batch = storage + .prepare_commit(state) + .await + .map_err(anyhow_to_eyre) + .wrap_err("failed to prepare commit")?; + let app_hash: AppHash = write_batch + .root_hash() + .0 + .to_vec() + .try_into() + .wrap_err("failed to convert app hash")?; + self.write_batch = Some(write_batch); + Ok(app_hash) + } + + /// Executes a signed transaction. + #[instrument(name = "App::execute_transaction", skip_all)] + pub(crate) async fn execute_transaction( &mut self, - storage: Storage, - genesis_state: GenesisAppState, - genesis_validators: Vec, - chain_id: String, - ) -> Result { + signed_tx: Arc, + ) -> Result> { + signed_tx + .check_stateless() + .await + .wrap_err("stateless check failed")?; + let mut state_tx = self .state .try_begin_transaction() - .expect("state Arc should not be referenced elsewhere"); + .expect("state Arc should be present and unique"); - state_tx - .put_base_prefix(genesis_state.address_prefixes().base().to_string()) - .wrap_err("failed to write base prefix to state")?; - state_tx - .put_ibc_compat_prefix(genesis_state.address_prefixes().ibc_compat().to_string()) - .wrap_err("failed to write ibc-compat prefix to state")?; + signed_tx + .check_and_execute(&mut state_tx) + .await + .wrap_err("failed executing transaction")?; - if let Some(native_asset) = genesis_state.native_asset_base_denomination() { - state_tx - .put_native_asset(native_asset.clone()) - .wrap_err("failed to write native asset to state")?; - state_tx - .put_ibc_asset(native_asset.clone()) - .wrap_err("failed to commit native asset as ibc asset to state")?; - } + // flag mempool for cleaning if we ran a fee change action + self.recost_mempool = self.recost_mempool + || signed_tx.is_bundleable_sudo_action_group() + && signed_tx + .actions() + .iter() + .any(|act| act.is_fee_asset_change() || act.is_fee_change()); + + Ok(state_tx.apply().1) + } + + /// sets up the state for execution of the block's transactions. + /// set the current height and timestamp, and removes byzantine validators from state. + /// + /// this *must* be called anytime before a block's txs are executed, whether it's + /// during the proposal phase, or finalize_block phase. + #[instrument(name = "App::prepare_state_for_execution", skip_all, err)] + pub(crate) async fn prepare_state_for_execution( + &mut self, + block_data: BlockData, + ) -> Result<()> { + // reset recost flag + self.recost_mempool = false; + + let mut state_tx = StateDelta::new(self.state.clone()); state_tx - .put_chain_id_and_revision_number(chain_id.try_into().context("invalid chain ID")?) - .wrap_err("failed to write chain id to state")?; + .put_block_height(block_data.height.into()) + .wrap_err("failed to put block height")?; state_tx - .put_block_height(0) - .wrap_err("failed to write block height to state")?; + .put_block_timestamp(block_data.time) + .wrap_err("failed to put block timestamp")?; - // call init_chain on all components - FeesComponent::init_chain(&mut state_tx, &genesis_state) - .await - .wrap_err("init_chain failed on FeesComponent")?; - AccountsComponent::init_chain(&mut state_tx, &genesis_state) + let mut current_set = state_tx + .get_validator_set() .await - .wrap_err("init_chain failed on AccountsComponent")?; - AuthorityComponent::init_chain( - &mut state_tx, - &AuthorityComponentAppState { - authority_sudo_address: *genesis_state.authority_sudo_address(), - genesis_validators, - }, - ) - .await - .wrap_err("init_chain failed on AuthorityComponent")?; - IbcComponent::init_chain(&mut state_tx, &genesis_state) - .await - .wrap_err("init_chain failed on IbcComponent")?; + .wrap_err("failed getting validator set")?; - state_tx.apply(); + for misbehaviour in &block_data.misbehavior { + current_set.remove(&misbehaviour.validator.address); + } - let app_hash = self - .prepare_commit(storage) - .await - .wrap_err("failed to prepare commit")?; - debug!(app_hash = %telemetry::display::base64(&app_hash), "init_chain completed"); - Ok(app_hash) - } + state_tx + .put_validator_set(current_set) + .wrap_err("failed putting validator set")?; - fn update_state_for_new_round(&mut self, storage: &Storage) { - // reset app state to latest committed state, in case of a round not being committed - // but `self.state` was changed due to executing the previous round's data. - // - // if the previous round was committed, then the state stays the same. - // - // this also clears the ephemeral storage. - self.state = Arc::new(StateDelta::new(storage.latest_snapshot())); + self.apply(state_tx); - // clear the cached executed proposal hash - self.executed_proposal_hash = Hash::default(); + Ok(()) } - /// Generates a commitment to the `sequence::Actions` in the block's transactions. + /// updates the app state after transaction execution, and generates the resulting + /// `SequencerBlock`. /// - /// This is required so that a rollup can easily verify that the transactions it - /// receives are correct (ie. we actually included in a sequencer block, and none - /// are missing) - /// It puts this special "commitment" as the first transaction in a block. - /// When other validators receive the block, they know the first transaction is - /// supposed to be the commitment, and verifies that is it correct. - #[instrument(name = "App::prepare_proposal", skip_all)] - pub(crate) async fn prepare_proposal( + /// this must be called after a block's transactions are executed. + #[instrument(name = "App::update_state_and_end_block", skip_all)] + pub(crate) async fn update_state_and_end_block( &mut self, - prepare_proposal: abci::request::PrepareProposal, - storage: Storage, - ) -> Result { - self.executed_proposal_fingerprint = Some(prepare_proposal.clone().into()); - self.update_state_for_new_round(&storage); - - let mut block_size_constraints = BlockSizeConstraints::new( - usize::try_from(prepare_proposal.max_tx_bytes) - .wrap_err("failed to convert max_tx_bytes to usize")?, - ) - .wrap_err("failed to create block size constraints")?; - - let block_data = BlockData { - misbehavior: prepare_proposal.misbehavior, - height: prepare_proposal.height, - time: prepare_proposal.time, - next_validators_hash: prepare_proposal.next_validators_hash, - proposer_address: prepare_proposal.proposer_address, + block_hash: Hash, + height: tendermint::block::Height, + time: tendermint::Time, + proposer_address: account::Id, + txs: Vec, + tx_results: Vec, + ) -> Result<()> { + let Hash::Sha256(block_hash) = block_hash else { + bail!("block hash is empty; this should not occur") }; - self.pre_execute_transactions(block_data) + let chain_id = self + .state + .get_chain_id() + .await + .wrap_err("failed to get chain ID from state")?; + let sudo_address = self + .state + .get_sudo_address() .await - .wrap_err("failed to prepare for executing block")?; + .wrap_err("failed to get sudo address from state")?; + + let mut state_tx = StateDelta::new(self.state.clone()); - // ignore the txs passed by cometbft in favour of our app-side mempool - let (included_tx_bytes, signed_txs_included) = self - .execute_transactions_prepare_proposal(&mut block_size_constraints) + // get validator updates + let validator_updates = state_tx + .get_validator_updates() .await - .wrap_err("failed to execute transactions")?; - self.metrics - .record_proposal_transactions(signed_txs_included.len()); + .wrap_err("failed getting validator updates")?; - let deposits = self.state.get_cached_block_deposits(); - self.metrics.record_proposal_deposits(deposits.len()); + // update validator set + let mut current_set = state_tx + .get_validator_set() + .await + .wrap_err("failed getting validator set")?; + current_set.apply_updates(validator_updates.clone()); + state_tx + .put_validator_set(current_set) + .wrap_err("failed putting validator set")?; - // generate commitment to sequence::Actions and deposits and commitment to the rollup IDs - // included in the block - let res = generate_rollup_datas_commitment(&signed_txs_included, deposits); - let txs = res.into_transactions(included_tx_bytes); - Ok(abci::response::PrepareProposal { - txs, - }) - } + // clear validator updates + state_tx.clear_validator_updates(); - /// Generates a commitment to the `sequence::Actions` in the block's transactions - /// and ensures it matches the commitment created by the proposer, which - /// should be the first transaction in the block. - #[instrument(name = "App::process_proposal", skip_all)] - pub(crate) async fn process_proposal( - &mut self, - process_proposal: abci::request::ProcessProposal, - storage: Storage, - ) -> Result<()> { - // if we proposed this block (ie. prepare_proposal was called directly before this), then - // we skip execution for this `process_proposal` call. - // - // if we didn't propose this block, `self.validator_address` will be None or a different - // value, so we will execute block as normal. - if let Some(constructed_id) = self.executed_proposal_fingerprint { - let proposal_id = process_proposal.clone().into(); - if constructed_id == proposal_id { - debug!("skipping process_proposal as we are the proposer for this block"); - self.executed_proposal_fingerprint = None; - self.executed_proposal_hash = process_proposal.hash; - - // if we're the proposer, we should have the execution results from - // `prepare_proposal`. run the post-tx-execution hook to generate the - // `SequencerBlock` and to set `self.finalize_block`. - // - // we can't run this in `prepare_proposal` as we don't know the block hash there. - let Some(tx_results) = self.state.object_get(EXECUTION_RESULTS_KEY) else { - bail!("execution results must be present after executing transactions") - }; - - self.post_execute_transactions( - process_proposal.hash, - process_proposal.height, - process_proposal.time, - process_proposal.proposer_address, - process_proposal.txs, - tx_results, - ) + // gather block fees and transfer them to the sudo address + let fees = self.state.get_block_fees(); + for fee in fees { + state_tx + .increase_balance(&sudo_address, fee.asset(), fee.amount()) .await - .wrap_err("failed to run post execute transactions handler")?; - - return Ok(()); - } - self.metrics.increment_process_proposal_skipped_proposal(); - debug!( - "our validator address was set but we're not the proposer, so our previous \ - proposal was skipped, executing block" - ); - self.executed_proposal_fingerprint = None; + .wrap_err("failed to increase fee recipient balance")?; } - self.update_state_for_new_round(&storage); - - let mut txs = VecDeque::from(process_proposal.txs.clone()); - let received_rollup_datas_root: [u8; 32] = txs - .pop_front() - .ok_or_eyre("no transaction commitment in proposal")? - .to_vec() - .try_into() - .map_err(|_| eyre!("transaction commitment must be 32 bytes"))?; + // get deposits for this block from state's ephemeral cache and put them to storage. + let deposits_in_this_block = self.state.get_cached_block_deposits(); + debug!( + deposits = %telemetry::display::json(&deposits_in_this_block), + "got block deposits from state" + ); - let received_rollup_ids_root: [u8; 32] = txs - .pop_front() - .ok_or_eyre("no chain IDs commitment in proposal")? - .to_vec() - .try_into() - .map_err(|_| eyre!("chain IDs commitment must be 32 bytes"))?; + state_tx + .put_deposits(&block_hash, deposits_in_this_block.clone()) + .wrap_err("failed to put deposits to state")?; - let expected_txs_len = txs.len(); + // cometbft expects a result for every tx in the block, so we need to return a + // tx result for the commitments, even though they're not actually user txs. + // + // the tx_results passed to this function only contain results for every user + // transaction, not the commitment, so its length is len(txs) - 2. + let mut finalize_block_tx_results: Vec = Vec::with_capacity(txs.len()); + finalize_block_tx_results.extend(std::iter::repeat(ExecTxResult::default()).take(2)); + finalize_block_tx_results.extend(tx_results); - let block_data = BlockData { - misbehavior: process_proposal.misbehavior, - height: process_proposal.height, - time: process_proposal.time, - next_validators_hash: process_proposal.next_validators_hash, - proposer_address: process_proposal.proposer_address, - }; + let sequencer_block = SequencerBlock::try_from_block_info_and_data( + block_hash, + chain_id, + height, + time, + proposer_address, + txs, + deposits_in_this_block, + ) + .wrap_err("failed to convert block info and data to SequencerBlock")?; + state_tx + .put_sequencer_block(sequencer_block) + .wrap_err("failed to write sequencer block to state")?; - self.pre_execute_transactions(block_data) - .await - .wrap_err("failed to prepare for executing block")?; - - // we don't care about the cometbft max_tx_bytes here, as cometbft would have - // rejected the proposal if it was too large. - // however, we should still validate the other constraints, namely - // the max sequenced data bytes. - let mut block_size_constraints = BlockSizeConstraints::new_unlimited_cometbft(); - - // deserialize txs into `Transaction`s; - // this does not error if any txs fail to be deserialized, but the `execution_results.len()` - // check below ensures that all txs in the proposal are deserializable (and - // executable). - let signed_txs = txs - .into_iter() - .filter_map(|bytes| signed_transaction_from_bytes(bytes.as_ref()).ok()) - .collect::>(); - - let tx_results = self - .execute_transactions_process_proposal(signed_txs.clone(), &mut block_size_constraints) - .await - .wrap_err("failed to execute transactions")?; - - // all txs in the proposal should be deserializable and executable - // if any txs were not deserializeable or executable, they would not have been - // added to the `tx_results` list, thus the length of `txs_to_include` - // will be shorter than that of `tx_results`. - ensure!( - tx_results.len() == expected_txs_len, - "transactions to be included do not match expected", - ); - self.metrics.record_proposal_transactions(signed_txs.len()); - - let deposits = self.state.get_cached_block_deposits(); - self.metrics.record_proposal_deposits(deposits.len()); - - let GeneratedCommitments { - rollup_datas_root: expected_rollup_datas_root, - rollup_ids_root: expected_rollup_ids_root, - } = generate_rollup_datas_commitment(&signed_txs, deposits); - ensure!( - received_rollup_datas_root == expected_rollup_datas_root, - "transaction commitment does not match expected", - ); + let events = self.apply(state_tx); - ensure!( - received_rollup_ids_root == expected_rollup_ids_root, - "chain IDs commitment does not match expected", - ); + let mut state_tx = StateDelta::new(self.state.clone()); + let result = PostTransactionExecutionResult { + events, + validator_updates: validator_updates + .try_into_cometbft() + .wrap_err("failed converting astria validators to cometbft compatible type")?, + tx_results: finalize_block_tx_results, + }; - self.executed_proposal_hash = process_proposal.hash; - self.post_execute_transactions( - process_proposal.hash, - process_proposal.height, - process_proposal.time, - process_proposal.proposer_address, - process_proposal.txs, - tx_results, - ) - .await - .wrap_err("failed to run post execute transactions handler")?; + state_tx.object_put(POST_TRANSACTION_EXECUTION_RESULT_KEY, result); + let _ = self.apply(state_tx); Ok(()) } @@ -566,20 +473,18 @@ impl App { /// /// As a result, all transactions in a sequencer block are guaranteed to execute /// successfully. - #[instrument(name = "App::execute_transactions_prepare_proposal", skip_all)] - async fn execute_transactions_prepare_proposal( + #[instrument(name = "App::prepare_proposal_tx_execution", skip_all)] + async fn prepare_proposal_tx_execution( &mut self, block_size_constraints: &mut BlockSizeConstraints, ) -> Result<(Vec, Vec)> { let mempool_len = self.mempool.len().await; debug!(mempool_len, "executing transactions from mempool"); - let mut validated_txs: Vec = Vec::new(); - let mut included_signed_txs = Vec::new(); - let mut failed_tx_count: usize = 0; - let mut execution_results = Vec::new(); - let mut excluded_txs: usize = 0; - let mut current_tx_group = Group::BundleableGeneral; + let mut proposal_info = Proposal::Prepare(PrepareProposalInformation::new( + self.mempool.clone(), + self.metrics, + )); // get copy of transactions to execute from mempool let pending_txs = self @@ -591,122 +496,30 @@ impl App { let mut unused_count = pending_txs.len(); for (tx_hash, tx) in pending_txs { unused_count = unused_count.saturating_sub(1); - let tx_hash_base64 = telemetry::display::base64(&tx_hash).to_string(); - let bytes = tx.to_raw().encode_to_vec(); - let tx_len = bytes.len(); - info!(transaction_hash = %tx_hash_base64, "executing transaction"); - - // don't include tx if it would make the cometBFT block too large - if !block_size_constraints.cometbft_has_space(tx_len) { - self.metrics - .increment_prepare_proposal_excluded_transactions_cometbft_space(); - debug!( - transaction_hash = %tx_hash_base64, - block_size_constraints = %json(&block_size_constraints), - tx_data_bytes = tx_len, - "excluding remaining transactions: max cometBFT data limit reached" - ); - excluded_txs = excluded_txs.saturating_add(1); - - // break from loop, as the block is full - break; - } - - // check if tx's sequence data will fit into sequence block - let tx_sequence_data_bytes = tx - .unsigned_transaction() - .actions() - .iter() - .filter_map(Action::as_rollup_data_submission) - .fold(0usize, |acc, seq| acc.saturating_add(seq.data.len())); - - if !block_size_constraints.sequencer_has_space(tx_sequence_data_bytes) { - self.metrics - .increment_prepare_proposal_excluded_transactions_sequencer_space(); - debug!( - transaction_hash = %tx_hash_base64, - block_size_constraints = %json(&block_size_constraints), - tx_data_bytes = tx_sequence_data_bytes, - "excluding transaction: max block sequenced data limit reached" - ); - excluded_txs = excluded_txs.saturating_add(1); - - // continue as there might be non-sequence txs that can fit - continue; - } - - // ensure transaction's group is less than or equal to current action group - let tx_group = tx.group(); - if tx_group > current_tx_group { - debug!( - transaction_hash = %tx_hash_base64, - block_size_constraints = %json(&block_size_constraints), - "excluding transaction: group is higher priority than previously included transactions" - ); - excluded_txs = excluded_txs.saturating_add(1); - - // note: we don't remove the tx from mempool as it may be valid in the future - continue; - } - - // execute tx and store in `execution_results` list on success - match self.execute_transaction(tx.clone()).await { - Ok(events) => { - execution_results.push(ExecTxResult { - events, - ..Default::default() - }); - block_size_constraints - .sequencer_checked_add(tx_sequence_data_bytes) - .wrap_err("error growing sequencer block size")?; - block_size_constraints - .cometbft_checked_add(tx_len) - .wrap_err("error growing cometBFT block size")?; - validated_txs.push(bytes.into()); - included_signed_txs.push((*tx).clone()); - } - Err(e) => { - self.metrics - .increment_prepare_proposal_excluded_transactions_failed_execution(); - debug!( - transaction_hash = %tx_hash_base64, - error = AsRef::::as_ref(&e), - "failed to execute transaction, not including in block" - ); - - if e.downcast_ref::().is_some() { - // we don't remove the tx from mempool if it failed to execute - // due to an invalid nonce, as it may be valid in the future. - // if it's invalid due to the nonce being too low, it'll be - // removed from the mempool in `update_mempool_after_finalization`. - // - // this is important for possible out-of-order transaction - // groups fed into prepare_proposal. a transaction with a higher - // nonce might be in a higher priority group than a transaction - // from the same account wiht a lower nonce. this higher nonce - // could execute in the next block fine. - } else { - failed_tx_count = failed_tx_count.saturating_add(1); - - // remove the failing transaction from the mempool - // - // this will remove any transactions from the same sender - // as well, as the dependent nonces will not be able - // to execute - self.mempool - .remove_tx_invalid( - tx, - RemovalReason::FailedPrepareProposal(e.to_string()), - ) - .await; - } - } - } - - // update current action group to tx's action group - current_tx_group = tx_group; + let tx_hash_base_64 = telemetry::display::base64(&tx_hash).to_string(); + info!(transaction_hash = %tx_hash_base_64, "executing transaction"); + + proposal_checks_and_tx_execution( + self, + tx, + &tx_hash_base_64, + block_size_constraints, + &mut proposal_info, + ) + .await?; } + let PrepareProposalInformation { + validated_txs, + included_signed_txs, + failed_tx_count, + execution_results, + excluded_txs, + .. + } = proposal_info + .into_prepare() + .ok_or_eyre("expected `Proposal::Prepare`, received `Proposal::Process`")?; + if failed_tx_count > 0 { info!( failed_tx_count = failed_tx_count, @@ -746,582 +559,366 @@ impl App { /// /// As a result, all transactions in a sequencer block are guaranteed to execute /// successfully. - #[instrument(name = "App::execute_transactions_process_proposal", skip_all)] - async fn execute_transactions_process_proposal( + #[instrument(name = "App::process_proposal_tx_execution", skip_all)] + async fn process_proposal_tx_execution( &mut self, txs: Vec, block_size_constraints: &mut BlockSizeConstraints, ) -> Result> { - let mut execution_results = Vec::new(); - let mut current_tx_group = Group::BundleableGeneral; + let mut proposal_info = Proposal::Process(ProcessProposalInformation::new()); for tx in txs { + let tx = Arc::new(tx); let bytes = tx.to_raw().encode_to_vec(); let tx_hash = Sha256::digest(&bytes); - let tx_len = bytes.len(); - - // check if tx's sequence data will fit into sequence block - let tx_sequence_data_bytes = tx - .unsigned_transaction() - .actions() - .iter() - .filter_map(Action::as_rollup_data_submission) - .fold(0usize, |acc, seq| acc.saturating_add(seq.data.len())); - - if !block_size_constraints.sequencer_has_space(tx_sequence_data_bytes) { - debug!( - transaction_hash = %telemetry::display::base64(&tx_hash), - block_size_constraints = %json(&block_size_constraints), - tx_data_bytes = tx_sequence_data_bytes, - "transaction error: max block sequenced data limit passed" - ); - bail!("max block sequenced data limit passed"); - } - - // ensure transaction's group is less than or equal to current action group - let tx_group = tx.group(); - if tx_group > current_tx_group { - debug!( - transaction_hash = %telemetry::display::base64(&tx_hash), - "transaction error: block has incorrect transaction group ordering" - ); - bail!("transactions have incorrect transaction group ordering"); - } - - // execute tx and store in `execution_results` list on success - match self.execute_transaction(Arc::new(tx.clone())).await { - Ok(events) => { - execution_results.push(ExecTxResult { - events, - ..Default::default() - }); - block_size_constraints - .sequencer_checked_add(tx_sequence_data_bytes) - .wrap_err("error growing sequencer block size")?; - block_size_constraints - .cometbft_checked_add(tx_len) - .wrap_err("error growing cometBFT block size")?; - } - Err(e) => { - debug!( - transaction_hash = %telemetry::display::base64(&tx_hash), - error = AsRef::::as_ref(&e), - "transaction error: failed to execute transaction" - ); - return Err(e.wrap_err("transaction failed to execute")); - } - } - - // update current action group to tx's action group - current_tx_group = tx_group; + let tx_hash_base_64 = telemetry::display::base64(&tx_hash).to_string(); + + proposal_checks_and_tx_execution( + self, + tx, + &tx_hash_base_64, + block_size_constraints, + &mut proposal_info, + ) + .await?; } + let ProcessProposalInformation { + execution_results, .. + } = proposal_info + .into_process() + .ok_or_eyre("expected `Proposal::Process`, received `Proposal::Prepare`")?; + Ok(execution_results) } - /// sets up the state for execution of the block's transactions. - /// set the current height and timestamp, and calls `begin_block` on all components. - /// - /// this *must* be called anytime before a block's txs are executed, whether it's - /// during the proposal phase, or finalize_block phase. - #[instrument(name = "App::pre_execute_transactions", skip_all, err)] - async fn pre_execute_transactions(&mut self, block_data: BlockData) -> Result<()> { - let chain_id = self - .state - .get_chain_id() - .await - .wrap_err("failed to get chain ID from state")?; - - // reset recost flag - self.recost_mempool = false; - - // call begin_block on all components - // NOTE: the fields marked `unused` are not used by any of the components; - // however, we need to still construct a `BeginBlock` type for now as - // the penumbra IBC implementation still requires it as a parameter. - let begin_block: abci::request::BeginBlock = abci::request::BeginBlock { - hash: Hash::default(), // unused - byzantine_validators: block_data.misbehavior.clone(), - header: Header { - app_hash: self.app_hash.clone(), - chain_id: chain_id.clone(), - consensus_hash: Hash::default(), // unused - data_hash: Some(Hash::default()), // unused - evidence_hash: Some(Hash::default()), // unused - height: block_data.height, - last_block_id: None, // unused - last_commit_hash: Some(Hash::default()), // unused - last_results_hash: Some(Hash::default()), // unused - next_validators_hash: block_data.next_validators_hash, - proposer_address: block_data.proposer_address, - time: block_data.time, - validators_hash: Hash::default(), // unused - version: tendermint::block::header::Version { - // unused - app: 0, - block: 0, - }, - }, - last_commit_info: tendermint::abci::types::CommitInfo { - round: 0u16.into(), // unused - votes: vec![], - }, // unused - }; - - self.begin_block(&begin_block) - .await - .wrap_err("begin_block failed")?; - - Ok(()) - } - - /// updates the app state after transaction execution, and generates the resulting - /// `SequencerBlock`. - /// - /// this must be called after a block's transactions are executed. - #[instrument(name = "App::post_execute_transactions", skip_all)] - async fn post_execute_transactions( - &mut self, - block_hash: Hash, - height: tendermint::block::Height, - time: tendermint::Time, - proposer_address: account::Id, - txs: Vec, - tx_results: Vec, - ) -> Result<()> { - let Hash::Sha256(block_hash) = block_hash else { - bail!("block hash is empty; this should not occur") - }; - - let chain_id = self - .state - .get_chain_id() - .await - .wrap_err("failed to get chain ID from state")?; - let sudo_address = self - .state - .get_sudo_address() - .await - .wrap_err("failed to get sudo address from state")?; - - let end_block = self.end_block(height.value(), &sudo_address).await?; + // StateDelta::apply only works when the StateDelta wraps an underlying + // StateWrite. But if we want to share the StateDelta with spawned tasks, + // we usually can't wrap a StateWrite instance, which requires exclusive + // access. This method "externally" applies the state delta to the + // inter-block state. + // + // Invariant: state_tx and self.state are the only two references to the + // inter-block state. + fn apply(&mut self, state_tx: StateDelta) -> Vec { + let (state2, mut cache) = state_tx.flatten(); + std::mem::drop(state2); + // Now there is only one reference to the inter-block state: self.state - // get deposits for this block from state's ephemeral cache and put them to storage. - let mut state_tx = StateDelta::new(self.state.clone()); - let deposits_in_this_block = self.state.get_cached_block_deposits(); - debug!( - deposits = %telemetry::display::json(&deposits_in_this_block), - "got block deposits from state" + let events = cache.take_events(); + cache.apply_to( + Arc::get_mut(&mut self.state).expect("no other references to inter-block state"), ); - state_tx - .put_deposits(&block_hash, deposits_in_this_block.clone()) - .wrap_err("failed to put deposits to state")?; + events + } - // cometbft expects a result for every tx in the block, so we need to return a - // tx result for the commitments, even though they're not actually user txs. + fn update_state_for_new_round(&mut self, storage: &Storage) { + // reset app state to latest committed state, in case of a round not being committed + // but `self.state` was changed due to executing the previous round's data. // - // the tx_results passed to this function only contain results for every user - // transaction, not the commitment, so its length is len(txs) - 2. - let mut finalize_block_tx_results: Vec = Vec::with_capacity(txs.len()); - finalize_block_tx_results.extend(std::iter::repeat(ExecTxResult::default()).take(2)); - finalize_block_tx_results.extend(tx_results); - - let sequencer_block = SequencerBlock::try_from_block_info_and_data( - block_hash, - chain_id, - height, - time, - proposer_address, - txs, - deposits_in_this_block, - ) - .wrap_err("failed to convert block info and data to SequencerBlock")?; - state_tx - .put_sequencer_block(sequencer_block) - .wrap_err("failed to write sequencer block to state")?; - - let result = PostTransactionExecutionResult { - events: end_block.events, - validator_updates: end_block.validator_updates, - consensus_param_updates: end_block.consensus_param_updates, - tx_results: finalize_block_tx_results, - }; - - state_tx.object_put(POST_TRANSACTION_EXECUTION_RESULT_KEY, result); - - // events that occur after end_block are ignored here; - // there should be none anyways. - let _ = self.apply(state_tx); + // if the previous round was committed, then the state stays the same. + // + // this also clears the ephemeral storage. + self.state = Arc::new(StateDelta::new(storage.latest_snapshot())); - Ok(()) + // clear the cached executed proposal hash + self.executed_proposal_hash = Hash::default(); } +} - /// Executes the given block, but does not write it to disk. - /// - /// `commit` must be called after this to write the block to disk. - /// - /// This is called by cometbft after the block has already been - /// committed by the network's consensus. - #[instrument(name = "App::finalize_block", skip_all)] - pub(crate) async fn finalize_block( - &mut self, - finalize_block: abci::request::FinalizeBlock, - storage: Storage, - ) -> Result { - // If we previously executed txs in a different proposal than is being processed, - // reset cached state changes. - if self.executed_proposal_hash != finalize_block.hash { - self.update_state_for_new_round(&storage); - } +/// relevant data of a block being executed. +/// +/// used to setup the state before execution of transactions. +#[derive(Debug, Clone)] +struct BlockData { + misbehavior: Vec, + height: tendermint::block::Height, + time: tendermint::Time, +} - ensure!( - finalize_block.txs.len() >= 2, - "block must contain at least two transactions: the rollup transactions commitment and - rollup IDs commitment" - ); +#[derive(Clone, Debug)] +struct PostTransactionExecutionResult { + events: Vec, + tx_results: Vec, + validator_updates: Vec, +} - // When the hash is not empty, we have already executed and cached the results - if self.executed_proposal_hash.is_empty() { - // convert tendermint id to astria address; this assumes they are - // the same address, as they are both ed25519 keys - let proposer_address = finalize_block.proposer_address; - let height = finalize_block.height; - let time = finalize_block.time; - - // we haven't executed anything yet, so set up the state for execution. - let block_data = BlockData { - misbehavior: finalize_block.misbehavior, - height, - time, - next_validators_hash: finalize_block.next_validators_hash, - proposer_address, - }; - - self.pre_execute_transactions(block_data) - .await - .wrap_err("failed to execute block")?; - - let mut tx_results = Vec::with_capacity(finalize_block.txs.len()); - // skip the first two transactions, as they are the rollup data commitments - for tx in finalize_block.txs.iter().skip(2) { - let signed_tx = signed_transaction_from_bytes(tx) - .wrap_err("protocol error; only valid txs should be finalized")?; - - match self.execute_transaction(Arc::new(signed_tx)).await { - Ok(events) => tx_results.push(ExecTxResult { - events, - ..Default::default() - }), - Err(e) => { - // this is actually a protocol error, as only valid txs should be finalized - tracing::error!( - error = AsRef::::as_ref(&e), - "failed to finalize transaction; ignoring it", - ); - let code = if e.downcast_ref::().is_some() { - AbciErrorCode::INVALID_NONCE - } else { - AbciErrorCode::INTERNAL_ERROR - }; - tx_results.push(ExecTxResult { - code: Code::Err(code.value()), - info: code.info(), - log: format!("{e:#}"), - ..Default::default() - }); - } - } - } +#[derive(PartialEq)] +enum ExitContinue { + Exit, + Continue, +} - self.post_execute_transactions( - finalize_block.hash, - height, - time, - proposer_address, - finalize_block.txs, - tx_results, - ) - .await - .wrap_err("failed to run post execute transactions handler")?; - } +enum Proposal<'a> { + Prepare(PrepareProposalInformation<'a>), + Process(ProcessProposalInformation), +} - // update the priority of any txs in the mempool based on the updated app state - if self.recost_mempool { - self.metrics.increment_mempool_recosted(); +impl<'a> Proposal<'a> { + fn current_tx_group(&self) -> Group { + match self { + Proposal::Prepare(vars) => vars.current_tx_group, + Proposal::Process(vars) => vars.current_tx_group, } - update_mempool_after_finalization(&mut self.mempool, &self.state, self.recost_mempool) - .await; - - let post_transaction_execution_result: PostTransactionExecutionResult = self - .state - .object_get(POST_TRANSACTION_EXECUTION_RESULT_KEY) - .expect( - "post_transaction_execution_result must be present, as txs were already executed \ - just now or during the proposal phase", - ); - - // prepare the `StagedWriteBatch` for a later commit. - let app_hash = self - .prepare_commit(storage) - .await - .wrap_err("failed to prepare commit")?; - let finalize_block = abci::response::FinalizeBlock { - events: post_transaction_execution_result.events, - validator_updates: post_transaction_execution_result.validator_updates, - consensus_param_updates: post_transaction_execution_result.consensus_param_updates, - app_hash, - tx_results: post_transaction_execution_result.tx_results, - }; - - Ok(finalize_block) } - #[instrument(skip_all, err)] - async fn prepare_commit(&mut self, storage: Storage) -> Result { - // extract the state we've built up to so we can prepare it as a `StagedWriteBatch`. - let dummy_state = StateDelta::new(storage.latest_snapshot()); - let mut state = Arc::try_unwrap(std::mem::replace(&mut self.state, Arc::new(dummy_state))) - .expect("we have exclusive ownership of the State at commit()"); - - // store the storage version indexed by block height - let new_version = storage.latest_version().wrapping_add(1); - let height = state - .get_block_height() - .await - .expect("block height must be set, as `put_block_height` was already called"); - state - .put_storage_version_by_height(height, new_version) - .wrap_err("failed to put storage version by height")?; - debug!( - height, - version = new_version, - "stored storage version for height" - ); - - let write_batch = storage - .prepare_commit(state) - .await - .map_err(anyhow_to_eyre) - .wrap_err("failed to prepare commit")?; - let app_hash: AppHash = write_batch - .root_hash() - .0 - .to_vec() - .try_into() - .wrap_err("failed to convert app hash")?; - self.write_batch = Some(write_batch); - Ok(app_hash) + fn set_current_tx_group(&mut self, group: Group) { + match self { + Proposal::Prepare(vars) => vars.current_tx_group = group, + Proposal::Process(vars) => vars.current_tx_group = group, + } } - #[instrument(name = "App::begin_block", skip_all)] - async fn begin_block( - &mut self, - begin_block: &abci::request::BeginBlock, - ) -> Result> { - let mut state_tx = StateDelta::new(self.state.clone()); - - state_tx - .put_block_height(begin_block.header.height.into()) - .wrap_err("failed to put block height")?; - state_tx - .put_block_timestamp(begin_block.header.time) - .wrap_err("failed to put block timestamp")?; - - // call begin_block on all components - let mut arc_state_tx = Arc::new(state_tx); - AccountsComponent::begin_block(&mut arc_state_tx, begin_block) - .await - .wrap_err("begin_block failed on AccountsComponent")?; - AuthorityComponent::begin_block(&mut arc_state_tx, begin_block) - .await - .wrap_err("begin_block failed on AuthorityComponent")?; - IbcComponent::begin_block(&mut arc_state_tx, begin_block) - .await - .wrap_err("begin_block failed on IbcComponent")?; - FeesComponent::begin_block(&mut arc_state_tx, begin_block) - .await - .wrap_err("begin_block failed on FeesComponent")?; - - let state_tx = Arc::try_unwrap(arc_state_tx) - .expect("components should not retain copies of shared state"); - - Ok(self.apply(state_tx)) + fn into_prepare(self) -> Option> { + match self { + Proposal::Prepare(vars) => Some(vars), + Proposal::Process(_) => None, + } } - /// Executes a signed transaction. - #[instrument(name = "App::execute_transaction", skip_all)] - async fn execute_transaction(&mut self, signed_tx: Arc) -> Result> { - signed_tx - .check_stateless() - .await - .wrap_err("stateless check failed")?; - - let mut state_tx = self - .state - .try_begin_transaction() - .expect("state Arc should be present and unique"); - - signed_tx - .check_and_execute(&mut state_tx) - .await - .wrap_err("failed executing transaction")?; - - // flag mempool for cleaning if we ran a fee change action - self.recost_mempool = self.recost_mempool - || signed_tx.is_bundleable_sudo_action_group() - && signed_tx - .actions() - .iter() - .any(|act| act.is_fee_asset_change() || act.is_fee_change()); - - Ok(state_tx.apply().1) + fn into_process(self) -> Option { + match self { + Proposal::Prepare(_) => None, + Proposal::Process(vars) => Some(vars), + } } +} - #[instrument(name = "App::end_block", skip_all)] - async fn end_block( - &mut self, - height: u64, - fee_recipient: &[u8; 20], - ) -> Result { - let state_tx = StateDelta::new(self.state.clone()); - let mut arc_state_tx = Arc::new(state_tx); - - let end_block = abci::request::EndBlock { - height: height - .try_into() - .expect("a block height should be able to fit in an i64"), - }; - - // call end_block on all components - AccountsComponent::end_block(&mut arc_state_tx, &end_block) - .await - .wrap_err("end_block failed on AccountsComponent")?; - AuthorityComponent::end_block(&mut arc_state_tx, &end_block) - .await - .wrap_err("end_block failed on AuthorityComponent")?; - FeesComponent::end_block(&mut arc_state_tx, &end_block) - .await - .wrap_err("end_block failed on FeesComponent")?; - IbcComponent::end_block(&mut arc_state_tx, &end_block) - .await - .wrap_err("end_block failed on IbcComponent")?; - - let mut state_tx = Arc::try_unwrap(arc_state_tx) - .expect("components should not retain copies of shared state"); - - // gather and return validator updates - let validator_updates = self - .state - .get_validator_updates() - .await - .expect("failed getting validator updates"); +struct PrepareProposalInformation<'a> { + validated_txs: Vec, + included_signed_txs: Vec, + failed_tx_count: usize, + execution_results: Vec, + excluded_txs: usize, + current_tx_group: Group, + mempool: Mempool, + metrics: &'a Metrics, +} - // clear validator updates - state_tx.clear_validator_updates(); +impl<'a> PrepareProposalInformation<'a> { + fn new(mempool: Mempool, metrics: &'a Metrics) -> Self { + Self { + validated_txs: Vec::new(), + included_signed_txs: Vec::new(), + failed_tx_count: 0, + execution_results: Vec::new(), + excluded_txs: 0, + current_tx_group: Group::BundleableGeneral, + mempool, + metrics, + } + } +} - // gather block fees and transfer them to the block proposer - let fees = self.state.get_block_fees(); +struct ProcessProposalInformation { + execution_results: Vec, + current_tx_group: Group, +} - for fee in fees { - state_tx - .increase_balance(fee_recipient, fee.asset(), fee.amount()) - .await - .wrap_err("failed to increase fee recipient balance")?; +impl ProcessProposalInformation { + fn new() -> Self { + Self { + execution_results: Vec::new(), + current_tx_group: Group::BundleableGeneral, } - - let events = self.apply(state_tx); - Ok(abci::response::EndBlock { - validator_updates: validator_updates - .try_into_cometbft() - .wrap_err("failed converting astria validators to cometbft compatible type")?, - events, - ..Default::default() - }) } +} - #[instrument(name = "App::commit", skip_all)] - pub(crate) async fn commit(&mut self, storage: Storage) { - // Commit the pending writes, clearing the state. - let app_hash = storage - .commit_batch(self.write_batch.take().expect( - "write batch must be set, as `finalize_block` is always called before `commit`", - )) - .expect("must be able to successfully commit to storage"); - tracing::debug!( - app_hash = %telemetry::display::hex(&app_hash), - "finished committing state", - ); - self.app_hash = app_hash - .0 - .to_vec() - .try_into() - .expect("root hash to app hash conversion must succeed"); +#[instrument(skip_all)] +async fn proposal_checks_and_tx_execution( + app: &mut App, + tx: Arc, + tx_hash_base_64: &str, + block_size_constraints: &mut BlockSizeConstraints, + proposal_args: &mut Proposal<'_>, +) -> Result { + let tx_group = tx.group(); + let tx_sequence_data_bytes = tx_sequence_data_bytes(&tx); + let bytes = tx.to_raw().encode_to_vec(); + let tx_len = bytes.len(); + + let debug_msg = match proposal_args { + Proposal::Prepare(_) => "excluding transaction", + Proposal::Process(_) => "transaction error", + }; + + if let Proposal::Prepare(vars) = proposal_args { + if !block_size_constraints.cometbft_has_space(tx_len) { + vars.metrics + .increment_prepare_proposal_excluded_transactions_cometbft_space(); + debug!( + transaction_hash = %tx_hash_base_64, + block_size_constraints = %json(&block_size_constraints), + tx_data_bytes = tx_len, + "excluding remaining transactions: max cometBFT data limit reached" + ); + vars.excluded_txs = vars.excluded_txs.saturating_add(1); - // Get the latest version of the state, now that we've committed it. - self.state = Arc::new(StateDelta::new(storage.latest_snapshot())); + // break from loop, as the block is full + return Ok(ExitContinue::Exit); + } } - // StateDelta::apply only works when the StateDelta wraps an underlying - // StateWrite. But if we want to share the StateDelta with spawned tasks, - // we usually can't wrap a StateWrite instance, which requires exclusive - // access. This method "externally" applies the state delta to the - // inter-block state. - // - // Invariant: state_tx and self.state are the only two references to the - // inter-block state. - fn apply(&mut self, state_tx: StateDelta) -> Vec { - let (state2, mut cache) = state_tx.flatten(); - std::mem::drop(state2); - // Now there is only one reference to the inter-block state: self.state + if ExitContinue::Exit + == check_sequencer_block_space( + tx_hash_base_64, + tx_sequence_data_bytes, + block_size_constraints, + debug_msg, + proposal_args, + )? + { + return Ok(ExitContinue::Exit); + }; + check_tx_group(tx_hash_base_64, tx_group, debug_msg, proposal_args)?; + handle_proposal_tx_execution( + app, + tx, + tx_sequence_data_bytes, + tx_len, + tx_hash_base_64, + block_size_constraints, + proposal_args, + ) + .await?; + proposal_args.set_current_tx_group(tx_group); + Ok(ExitContinue::Continue) +} - let events = cache.take_events(); - cache.apply_to( - Arc::get_mut(&mut self.state).expect("no other references to inter-block state"), +#[instrument(skip_all)] +fn check_sequencer_block_space( + tx_hash_base64: &str, + tx_sequence_data_bytes: usize, + block_size_constraints: &BlockSizeConstraints, + debug_msg: &str, + proposal_vars: &mut Proposal, +) -> Result { + if !block_size_constraints.sequencer_has_space(tx_sequence_data_bytes) { + debug!( + transaction_hash = %tx_hash_base64, + block_size_constraints = %json(&block_size_constraints), + tx_data_bytes = tx_sequence_data_bytes, + "{debug_msg}: max block sequenced data limit reached" ); - - events + return match proposal_vars { + Proposal::Prepare(vars) => { + vars.metrics + .increment_prepare_proposal_excluded_transactions_sequencer_space(); + vars.excluded_txs = vars.excluded_txs.saturating_add(1); + Ok(ExitContinue::Exit) + } + Proposal::Process(_) => Err(eyre!("max block sequenced data limit passed")), + }; } + Ok(ExitContinue::Continue) } -// updates the mempool to reflect current state -// -// NOTE: this function locks the mempool until all accounts have been cleaned. -// this could potentially stall consensus from moving to the next round if -// the mempool is large, especially if recosting transactions. -async fn update_mempool_after_finalization( - mempool: &mut Mempool, - state: &S, - recost: bool, -) { - mempool.run_maintenance(state, recost).await; +#[instrument(skip_all)] +fn check_tx_group( + tx_hash_base64: &str, + tx_group: Group, + debug_msg: &str, + proposal_vars: &mut Proposal, +) -> Result<()> { + if tx_group > proposal_vars.current_tx_group() { + debug!( + transaction_hash = %tx_hash_base64, + "{debug_msg}: group is higher priority than previously included transactions" + ); + match proposal_vars { + Proposal::Prepare(vars) => vars.excluded_txs = vars.excluded_txs.saturating_add(1), + Proposal::Process(_) => { + return Err(eyre!( + "transactions have incorrect transaction group ordering" + )); + } + }; + } + Ok(()) } -/// relevant data of a block being executed. -/// -/// used to setup the state before execution of transactions. -#[derive(Debug, Clone)] -struct BlockData { - misbehavior: Vec, - height: tendermint::block::Height, - time: tendermint::Time, - next_validators_hash: Hash, - proposer_address: account::Id, -} +#[instrument(skip_all)] +async fn handle_proposal_tx_execution( + app: &mut App, + tx: Arc, + tx_sequence_data_bytes: usize, + tx_len: usize, + tx_hash: &str, + block_size_constraints: &mut BlockSizeConstraints, + proposal_args: &mut Proposal<'_>, +) -> Result<()> { + let (debug_msg, execution_results) = match proposal_args { + Proposal::Prepare(vars) => ("excluding transaction", &mut vars.execution_results), + Proposal::Process(vars) => ("transaction error", &mut vars.execution_results), + }; + + match app.execute_transaction(tx.clone()).await { + Ok(events) => { + execution_results.push(ExecTxResult { + events, + ..Default::default() + }); + block_size_constraints + .sequencer_checked_add(tx_sequence_data_bytes) + .wrap_err("error growing sequencer block size")?; + block_size_constraints + .cometbft_checked_add(tx_len) + .wrap_err("error growing cometBFT block size")?; + if let Proposal::Prepare(vars) = proposal_args { + vars.validated_txs.push(tx.to_raw().encode_to_vec().into()); + vars.included_signed_txs.push((*tx).clone()); + } + } + Err(e) => { + debug!( + transaction_hash = %tx_hash, + error = AsRef::::as_ref(&e), + "{debug_msg}: failed to execute transaction" + ); + match proposal_args { + Proposal::Prepare(vars) => { + vars.metrics + .increment_prepare_proposal_excluded_transactions_failed_execution(); + if e.downcast_ref::().is_some() { + // we don't remove the tx from mempool if it failed to execute + // due to an invalid nonce, as it may be valid in the future. + // if it's invalid due to the nonce being too low, it'll be + // removed from the mempool in `update_mempool_after_finalization`. + // + // this is important for possible out-of-order transaction + // groups fed into prepare_proposal. a transaction with a higher + // nonce might be in a higher priority group than a transaction + // from the same account wiht a lower nonce. this higher nonce + // could execute in the next block fine. + } else { + vars.failed_tx_count = vars.failed_tx_count.saturating_add(1); -fn signed_transaction_from_bytes(bytes: &[u8]) -> Result { - let raw = raw::Transaction::decode(bytes) - .wrap_err("failed to decode protobuf to signed transaction")?; - let tx = Transaction::try_from_raw(raw) - .wrap_err("failed to transform raw signed transaction to verified type")?; + // remove the failing transaction from the mempool + // + // this will remove any transactions from the same sender + // as well, as the dependent nonces will not be able + // to execute + vars.mempool + .remove_tx_invalid( + tx, + RemovalReason::FailedPrepareProposal(e.to_string()), + ) + .await; + } + } + Proposal::Process(_) => return Err(e.wrap_err("transaction failed to execute")), + } + } + }; - Ok(tx) + Ok(()) } -#[derive(Clone, Debug)] -struct PostTransactionExecutionResult { - events: Vec, - tx_results: Vec, - validator_updates: Vec, - consensus_param_updates: Option, +fn tx_sequence_data_bytes(tx: &Arc) -> usize { + tx.unsigned_transaction() + .actions() + .iter() + .filter_map(Action::as_rollup_data_submission) + .fold(0usize, |acc, seq| acc.saturating_add(seq.data.len())) } diff --git a/crates/astria-sequencer/src/app/tests_app/mempool.rs b/crates/astria-sequencer/src/app/tests_app/mempool.rs index adc304d585..a632264de8 100644 --- a/crates/astria-sequencer/src/app/tests_app/mempool.rs +++ b/crates/astria-sequencer/src/app/tests_app/mempool.rs @@ -1,5 +1,6 @@ use std::collections::HashMap; +use app_abci::AppAbci as _; use astria_core::{ protocol::{ fees::v1::TransferFeeComponents, diff --git a/crates/astria-sequencer/src/app/tests_app/mod.rs b/crates/astria-sequencer/src/app/tests_app/mod.rs index 1ca46cec4d..1e00dbf14f 100644 --- a/crates/astria-sequencer/src/app/tests_app/mod.rs +++ b/crates/astria-sequencer/src/app/tests_app/mod.rs @@ -2,6 +2,7 @@ mod mempool; use std::collections::HashMap; +use app_abci::AppAbci as _; use astria_core::{ primitive::v1::{ asset::TracePrefixed, @@ -16,6 +17,7 @@ use astria_core::{ RollupDataSubmission, SudoAddressChange, Transfer, + ValidatorUpdate, }, TransactionBody, }, @@ -46,12 +48,9 @@ use tendermint::{ }, account, block::{ - header::Version, - Header, Height, Round, }, - AppHash, Hash, Time, }; @@ -77,28 +76,6 @@ use crate::{ proposal::commitment::generate_rollup_datas_commitment, }; -fn default_tendermint_header() -> Header { - Header { - app_hash: AppHash::try_from(vec![]).unwrap(), - chain_id: "test".to_string().try_into().unwrap(), - consensus_hash: Hash::default(), - data_hash: Some(Hash::try_from([0u8; 32].to_vec()).unwrap()), - evidence_hash: Some(Hash::default()), - height: Height::default(), - last_block_id: None, - last_commit_hash: Some(Hash::default()), - last_results_hash: Some(Hash::default()), - next_validators_hash: Hash::default(), - proposer_address: account::Id::try_from([0u8; 20].to_vec()).unwrap(), - time: Time::now(), - validators_hash: Hash::default(), - version: Version { - app: 0, - block: 0, - }, - } -} - #[tokio::test] async fn app_genesis_and_init_chain() { let app = initialize_app(None, vec![]).await; @@ -125,18 +102,16 @@ async fn app_genesis_and_init_chain() { } #[tokio::test] -async fn app_pre_execute_transactions() { +async fn app_prepare_state_for_execution() { let mut app = initialize_app(None, vec![]).await; let block_data = BlockData { misbehavior: vec![], height: 1u8.into(), time: Time::now(), - next_validators_hash: Hash::default(), - proposer_address: account::Id::try_from([0u8; 20].to_vec()).unwrap(), }; - app.pre_execute_transactions(block_data.clone()) + app.prepare_state_for_execution(block_data.clone()) .await .unwrap(); assert_eq!(app.state.get_block_height().await.unwrap(), 1); @@ -147,7 +122,7 @@ async fn app_pre_execute_transactions() { } #[tokio::test] -async fn app_begin_block_remove_byzantine_validators() { +async fn app_prepare_state_for_execution_remove_byzantine_validators() { use tendermint::abci::types; let initial_validator_set = vec![ @@ -174,18 +149,13 @@ async fn app_begin_block_remove_byzantine_validators() { total_voting_power: 101u32.into(), }; - let mut begin_block = abci::request::BeginBlock { - header: default_tendermint_header(), - hash: Hash::default(), - last_commit_info: CommitInfo { - votes: vec![], - round: Round::default(), - }, - byzantine_validators: vec![misbehavior], + let block_data = BlockData { + misbehavior: vec![misbehavior], + height: 1u8.into(), + time: Time::now(), }; - begin_block.header.height = 1u8.into(); - app.begin_block(&begin_block).await.unwrap(); + app.prepare_state_for_execution(block_data).await.unwrap(); // assert that validator with pubkey_a is removed let validator_set = app.state.get_validator_set().await.unwrap(); @@ -912,10 +882,38 @@ async fn app_end_block_validator_updates() { .unwrap(); app.apply(state_tx); - let resp = app.end_block(1, &proposer_address).await.unwrap(); + let rollup_id = RollupId::from_unhashed_bytes(b"testchainid"); + let tx = TransactionBody::builder() + .actions(vec![ + RollupDataSubmission { + rollup_id, + data: Bytes::from_static(b"hello world"), + fee_asset: nria().into(), + } + .into(), + ]) + .chain_id("test") + .try_build() + .unwrap(); + let signed_tx = tx.sign(&get_alice_signing_key()); + let deposits = HashMap::from_iter(vec![(rollup_id, vec![])]); + let commitments = generate_rollup_datas_commitment(&[signed_tx.clone()], deposits.clone()); + + app.update_state_and_end_block( + Hash::Sha256([0u8; 32]), + 1u32.into(), + Time::now(), + proposer_address.to_vec().try_into().unwrap(), + commitments.into_transactions(vec![signed_tx.to_raw().encode_to_vec().into()]), + vec![], + ) + .await + .unwrap(); + let validator_updates = app.state.get_validator_updates().await.unwrap(); + // we only assert length here as the ordering of the updates is not guaranteed // and validator::Update does not implement Ord - assert_eq!(resp.validator_updates.len(), validator_updates.len()); + assert_eq!(validator_updates.len(), validator_updates.len()); // validator with pubkey_a should be removed (power set to 0) // validator with pubkey_b should be updated diff --git a/crates/astria-sequencer/src/app/tests_block_ordering.rs b/crates/astria-sequencer/src/app/tests_block_ordering.rs index c8c28893f4..1dbc8baff9 100644 --- a/crates/astria-sequencer/src/app/tests_block_ordering.rs +++ b/crates/astria-sequencer/src/app/tests_block_ordering.rs @@ -25,6 +25,7 @@ use tendermint::{ use super::test_utils::get_alice_signing_key; use crate::{ app::{ + app_abci::AppAbci as _, benchmark_and_test_utils::{ initialize_app_with_storage, mock_balances, diff --git a/crates/astria-sequencer/src/app/tests_breaking_changes.rs b/crates/astria-sequencer/src/app/tests_breaking_changes.rs index 031538a057..58d3bb064f 100644 --- a/crates/astria-sequencer/src/app/tests_breaking_changes.rs +++ b/crates/astria-sequencer/src/app/tests_breaking_changes.rs @@ -51,6 +51,7 @@ use tendermint::{ use crate::{ app::{ + app_abci::AppAbci as _, benchmark_and_test_utils::{ default_genesis_accounts, initialize_app_with_storage, @@ -64,7 +65,6 @@ use crate::{ initialize_app, }, }, - authority::StateReadExt as _, benchmark_and_test_utils::{ astria_address, astria_address_from_hex_string, @@ -345,9 +345,6 @@ async fn app_execute_transaction_with_every_action_snapshot() { let signed_tx = Arc::new(tx_bridge.sign(&bridge)); app.execute_transaction(signed_tx).await.unwrap(); - let sudo_address = app.state.get_sudo_address().await.unwrap(); - app.end_block(1, &sudo_address).await.unwrap(); - app.prepare_commit(storage.clone()).await.unwrap(); app.commit(storage.clone()).await; diff --git a/crates/astria-sequencer/src/authority/component.rs b/crates/astria-sequencer/src/authority/component.rs deleted file mode 100644 index f6347dcf0c..0000000000 --- a/crates/astria-sequencer/src/authority/component.rs +++ /dev/null @@ -1,97 +0,0 @@ -use std::sync::Arc; - -use astria_core::{ - primitive::v1::Address, - protocol::transaction::v1::action::ValidatorUpdate, -}; -use astria_eyre::eyre::{ - OptionExt as _, - Result, - WrapErr as _, -}; -use tendermint::abci::request::{ - BeginBlock, - EndBlock, -}; -use tracing::instrument; - -use super::{ - StateReadExt, - StateWriteExt, - ValidatorSet, -}; -use crate::component::Component; - -#[derive(Default)] -pub(crate) struct AuthorityComponent; - -#[derive(Debug)] -pub(crate) struct AuthorityComponentAppState { - pub(crate) authority_sudo_address: Address, - pub(crate) genesis_validators: Vec, -} - -#[async_trait::async_trait] -impl Component for AuthorityComponent { - type AppState = AuthorityComponentAppState; - - #[instrument(name = "AuthorityComponent::init_chain", skip_all)] - async fn init_chain(mut state: S, app_state: &Self::AppState) -> Result<()> { - // set sudo key and initial validator set - state - .put_sudo_address(app_state.authority_sudo_address) - .wrap_err("failed to set sudo key")?; - let genesis_validators = app_state.genesis_validators.clone(); - state - .put_validator_set(ValidatorSet::new_from_updates(genesis_validators)) - .wrap_err("failed to set validator set")?; - Ok(()) - } - - #[instrument(name = "AuthorityComponent::begin_block", skip_all)] - async fn begin_block( - state: &mut Arc, - begin_block: &BeginBlock, - ) -> Result<()> { - let mut current_set = state - .get_validator_set() - .await - .wrap_err("failed getting validator set")?; - - for misbehaviour in &begin_block.byzantine_validators { - current_set.remove(&misbehaviour.validator.address); - } - - let state = Arc::get_mut(state) - .ok_or_eyre("must only have one reference to the state; this is a bug")?; - state - .put_validator_set(current_set) - .wrap_err("failed putting validator set")?; - Ok(()) - } - - #[instrument(name = "AuthorityComponent::end_block", skip_all)] - async fn end_block( - state: &mut Arc, - _end_block: &EndBlock, - ) -> Result<()> { - // update validator set - let validator_updates = state - .get_validator_updates() - .await - .wrap_err("failed getting validator updates")?; - - let mut current_set = state - .get_validator_set() - .await - .wrap_err("failed getting validator set")?; - current_set.apply_updates(validator_updates); - - let state = Arc::get_mut(state) - .ok_or_eyre("must only have one reference to the state; this is a bug")?; - state - .put_validator_set(current_set) - .wrap_err("failed putting validator set")?; - Ok(()) - } -} diff --git a/crates/astria-sequencer/src/authority/genesis.rs b/crates/astria-sequencer/src/authority/genesis.rs new file mode 100644 index 0000000000..9527b908e9 --- /dev/null +++ b/crates/astria-sequencer/src/authority/genesis.rs @@ -0,0 +1,42 @@ +use astria_core::{ + primitive::v1::Address, + protocol::transaction::v1::action::ValidatorUpdate, +}; +use astria_eyre::eyre::{ + Result, + WrapErr as _, +}; +use tracing::instrument; + +use super::{ + StateWriteExt, + ValidatorSet, +}; +use crate::genesis::Genesis; + +#[derive(Default)] +pub(crate) struct AuthorityComponent; + +#[derive(Debug)] +pub(crate) struct AuthorityComponentAppState { + pub(crate) authority_sudo_address: Address, + pub(crate) genesis_validators: Vec, +} + +#[async_trait::async_trait] +impl Genesis for AuthorityComponent { + type AppState = AuthorityComponentAppState; + + #[instrument(name = "AuthorityComponent::init_chain", skip_all)] + async fn init_chain(mut state: S, app_state: &Self::AppState) -> Result<()> { + // set sudo key and initial validator set + state + .put_sudo_address(app_state.authority_sudo_address) + .wrap_err("failed to set sudo key")?; + let genesis_validators = app_state.genesis_validators.clone(); + state + .put_validator_set(ValidatorSet::new_from_updates(genesis_validators)) + .wrap_err("failed to set validator set")?; + Ok(()) + } +} diff --git a/crates/astria-sequencer/src/authority/mod.rs b/crates/astria-sequencer/src/authority/mod.rs index 22d8a00784..3dd4d4caaa 100644 --- a/crates/astria-sequencer/src/authority/mod.rs +++ b/crates/astria-sequencer/src/authority/mod.rs @@ -1,5 +1,5 @@ mod action; -pub(crate) mod component; +pub(crate) mod genesis; mod state_ext; pub(crate) mod storage; @@ -21,8 +21,7 @@ pub(crate) use state_ext::{ use crate::accounts::AddressBytes; /// A map of public keys to validator updates. -#[derive(Debug, PartialEq, Eq)] -#[cfg_attr(test, derive(Clone))] +#[derive(Debug, PartialEq, Eq, Clone)] pub(crate) struct ValidatorSet(BTreeMap<[u8; ADDRESS_LEN], ValidatorUpdate>); impl ValidatorSet { diff --git a/crates/astria-sequencer/src/component.rs b/crates/astria-sequencer/src/component.rs deleted file mode 100644 index f5e79beafc..0000000000 --- a/crates/astria-sequencer/src/component.rs +++ /dev/null @@ -1,56 +0,0 @@ -use std::sync::Arc; - -use astria_eyre::eyre::Result; -use async_trait::async_trait; -use cnidarium::StateWrite; -use tendermint::abci; - -/// A component of the Sequencer application. -/// Based off Penumbra's [`Component`], but with modifications. -#[async_trait] -pub(crate) trait Component { - /// A serialized representation of the component's application state, - /// passed in to [`Component::init_chain`]. - type AppState; - - /// Performs initialization, given the genesis state. - /// - /// This method is called once per chain, and should only perform - /// writes, since the backing tree for the [`State`] will - /// be empty. - async fn init_chain(state: S, app_state: &Self::AppState) -> Result<()>; - - /// Begins a new block, optionally inspecting the ABCI - /// [`BeginBlock`](abci::request::BeginBlock) request. - /// - /// # Invariants - /// - /// The `&mut Arc` allows the implementor to optionally share state with - /// its subtasks. The implementor SHOULD assume that when the method is - /// called, `state.get_mut().is_some()`, i.e., the `Arc` is not shared. The - /// implementor MUST ensure that any clones of the `Arc` are dropped before - /// it returns, so that `state.get_mut().is_some()` on completion. - async fn begin_block( - state: &mut Arc, - begin_block: &abci::request::BeginBlock, - ) -> Result<()>; - - /// Ends the block, optionally inspecting the ABCI - /// [`EndBlock`](abci::request::EndBlock) request, and performing any batch - /// processing. - /// - /// # Invariants - /// - /// This method should only be called after [`Component::begin_block`]. - /// No methods should be called following this method. - /// - /// The `&mut Arc` allows the implementor to optionally share state with - /// its subtasks. The implementor SHOULD assume that when the method is - /// called, `state.get_mut().is_some()`, i.e., the `Arc` is not shared. The - /// implementor MUST ensure that any clones of the `Arc` are dropped before - /// it returns, so that `state.get_mut().is_some()` on completion. - async fn end_block( - state: &mut Arc, - end_block: &abci::request::EndBlock, - ) -> Result<()>; -} diff --git a/crates/astria-sequencer/src/fees/component.rs b/crates/astria-sequencer/src/fees/genesis.rs similarity index 89% rename from crates/astria-sequencer/src/fees/component.rs rename to crates/astria-sequencer/src/fees/genesis.rs index 03ed7cb36c..d01c9fba11 100644 --- a/crates/astria-sequencer/src/fees/component.rs +++ b/crates/astria-sequencer/src/fees/genesis.rs @@ -1,26 +1,20 @@ -use std::sync::Arc; - use astria_core::protocol::genesis::v1::GenesisAppState; use astria_eyre::eyre::{ Result, WrapErr as _, }; -use tendermint::abci::request::{ - BeginBlock, - EndBlock, -}; use tracing::instrument; use crate::{ - component::Component, fees, + genesis::Genesis, }; #[derive(Default)] pub(crate) struct FeesComponent; #[async_trait::async_trait] -impl Component for FeesComponent { +impl Genesis for FeesComponent { type AppState = GenesisAppState; #[instrument(name = "FeesComponent::init_chain", skip_all)] @@ -132,20 +126,4 @@ impl Component for FeesComponent { Ok(()) } - - #[instrument(name = "FeesComponent::begin_block", skip_all)] - async fn begin_block( - _state: &mut Arc, - _begin_block: &BeginBlock, - ) -> Result<()> { - Ok(()) - } - - #[instrument(name = "FeesComponent::end_block", skip_all)] - async fn end_block( - _state: &mut Arc, - _end_block: &EndBlock, - ) -> Result<()> { - Ok(()) - } } diff --git a/crates/astria-sequencer/src/fees/mod.rs b/crates/astria-sequencer/src/fees/mod.rs index 2e3547e7fb..f3973dc04c 100644 --- a/crates/astria-sequencer/src/fees/mod.rs +++ b/crates/astria-sequencer/src/fees/mod.rs @@ -38,7 +38,7 @@ use crate::{ }; pub(crate) mod action; -pub(crate) mod component; +pub(crate) mod genesis; pub(crate) mod query; mod state_ext; pub(crate) mod storage; diff --git a/crates/astria-sequencer/src/genesis.rs b/crates/astria-sequencer/src/genesis.rs new file mode 100644 index 0000000000..919e722c12 --- /dev/null +++ b/crates/astria-sequencer/src/genesis.rs @@ -0,0 +1,18 @@ +use astria_eyre::eyre::Result; +use async_trait::async_trait; +use cnidarium::StateWrite; + +/// A component of the Sequencer application's implementation for genesis. +#[async_trait] +pub(crate) trait Genesis { + /// A serialized representation of the component's application state, + /// passed in to [`Genesis::init_chain`]. + type AppState; + + /// Performs initialization, given the genesis state. + /// + /// This method is called once per chain, and should only perform + /// writes, since the backing tree for the [`State`] will + /// be empty. + async fn init_chain(state: S, app_state: &Self::AppState) -> Result<()>; +} diff --git a/crates/astria-sequencer/src/ibc/component.rs b/crates/astria-sequencer/src/ibc/genesis.rs similarity index 57% rename from crates/astria-sequencer/src/ibc/component.rs rename to crates/astria-sequencer/src/ibc/genesis.rs index f8bc969e39..488fde1115 100644 --- a/crates/astria-sequencer/src/ibc/component.rs +++ b/crates/astria-sequencer/src/ibc/genesis.rs @@ -1,5 +1,3 @@ -use std::sync::Arc; - use astria_core::protocol::genesis::v1::GenesisAppState; use astria_eyre::eyre::{ Result, @@ -9,25 +7,18 @@ use penumbra_ibc::{ component::Ibc, genesis::Content, }; -use tendermint::abci::request::{ - BeginBlock, - EndBlock, -}; use tracing::instrument; use crate::{ - component::Component, - ibc::{ - host_interface::AstriaHost, - state_ext::StateWriteExt, - }, + genesis::Genesis, + ibc::state_ext::StateWriteExt, }; #[derive(Default)] pub(crate) struct IbcComponent; #[async_trait::async_trait] -impl Component for IbcComponent { +impl Genesis for IbcComponent { type AppState = GenesisAppState; #[instrument(name = "IbcComponent::init_chain", skip_all)] @@ -52,22 +43,4 @@ impl Component for IbcComponent { Ok(()) } - - #[instrument(name = "IbcComponent::begin_block", skip_all)] - async fn begin_block( - state: &mut Arc, - begin_block: &BeginBlock, - ) -> Result<()> { - Ibc::begin_block::(state, begin_block).await; - Ok(()) - } - - #[instrument(name = "IbcComponent::end_block", skip_all)] - async fn end_block( - state: &mut Arc, - end_block: &EndBlock, - ) -> Result<()> { - Ibc::end_block(state, end_block).await; - Ok(()) - } } diff --git a/crates/astria-sequencer/src/ibc/mod.rs b/crates/astria-sequencer/src/ibc/mod.rs index 5a94d8b536..bc5026af4c 100644 --- a/crates/astria-sequencer/src/ibc/mod.rs +++ b/crates/astria-sequencer/src/ibc/mod.rs @@ -1,4 +1,4 @@ -pub(crate) mod component; +pub(crate) mod genesis; pub(crate) mod host_interface; pub(crate) mod ibc_relayer_change; pub(crate) mod ics20_transfer; diff --git a/crates/astria-sequencer/src/lib.rs b/crates/astria-sequencer/src/lib.rs index 8701ee6a72..4b2e949257 100644 --- a/crates/astria-sequencer/src/lib.rs +++ b/crates/astria-sequencer/src/lib.rs @@ -9,9 +9,9 @@ pub(crate) mod benchmark_and_test_utils; pub(crate) mod benchmark_utils; pub(crate) mod bridge; mod build_info; -pub(crate) mod component; pub mod config; pub(crate) mod fees; +pub(crate) mod genesis; pub(crate) mod grpc; pub(crate) mod ibc; mod mempool; diff --git a/crates/astria-sequencer/src/service/consensus.rs b/crates/astria-sequencer/src/service/consensus.rs index 057fa091eb..82a4cfad2f 100644 --- a/crates/astria-sequencer/src/service/consensus.rs +++ b/crates/astria-sequencer/src/service/consensus.rs @@ -20,7 +20,10 @@ use tracing::{ Instrument, }; -use crate::app::App; +use crate::app::{ + app_abci::AppAbci, + App, +}; pub(crate) struct Consensus { queue: mpsc::Receiver>, diff --git a/crates/astria-sequencer/src/service/mempool/tests.rs b/crates/astria-sequencer/src/service/mempool/tests.rs index 2baa7bda6b..6506ff32ea 100644 --- a/crates/astria-sequencer/src/service/mempool/tests.rs +++ b/crates/astria-sequencer/src/service/mempool/tests.rs @@ -13,6 +13,7 @@ use tendermint::{ use crate::{ app::{ + app_abci::AppAbci as _, benchmark_and_test_utils::genesis_state, test_utils::MockTxBuilder, App, From 3d27b18e9fff8236dc2aa324a9957a83fb6e03c9 Mon Sep 17 00:00:00 2001 From: ethanoroshiba Date: Wed, 6 Nov 2024 15:26:39 -0600 Subject: [PATCH 2/3] add call to ibc::begin_block --- crates/astria-sequencer/src/app/app_abci.rs | 6 ++ crates/astria-sequencer/src/app/mod.rs | 60 ++++++++++++++++++- .../astria-sequencer/src/app/tests_app/mod.rs | 4 ++ 3 files changed, 69 insertions(+), 1 deletion(-) diff --git a/crates/astria-sequencer/src/app/app_abci.rs b/crates/astria-sequencer/src/app/app_abci.rs index 7e8eb2450b..fd8ecb4e58 100644 --- a/crates/astria-sequencer/src/app/app_abci.rs +++ b/crates/astria-sequencer/src/app/app_abci.rs @@ -163,6 +163,8 @@ impl AppAbci for App { misbehavior: finalize_block.misbehavior, height, time, + next_validators_hash: finalize_block.next_validators_hash, + proposer_address, }; self.prepare_state_for_execution(block_data) @@ -337,6 +339,8 @@ impl AppAbci for App { misbehavior: prepare_proposal.misbehavior, height: prepare_proposal.height, time: prepare_proposal.time, + next_validators_hash: prepare_proposal.next_validators_hash, + proposer_address: prepare_proposal.proposer_address, }; self.prepare_state_for_execution(block_data) @@ -437,6 +441,8 @@ impl AppAbci for App { misbehavior: process_proposal.misbehavior, height: process_proposal.height, time: process_proposal.time, + next_validators_hash: process_proposal.next_validators_hash, + proposer_address: process_proposal.proposer_address, }; self.prepare_state_for_execution(block_data) diff --git a/crates/astria-sequencer/src/app/mod.rs b/crates/astria-sequencer/src/app/mod.rs index 80a075b669..af4f7a9989 100644 --- a/crates/astria-sequencer/src/app/mod.rs +++ b/crates/astria-sequencer/src/app/mod.rs @@ -46,6 +46,7 @@ use cnidarium::{ StateWrite, Storage, }; +use penumbra_ibc::component::Ibc; use prost::Message as _; use sha2::{ Digest as _, @@ -59,6 +60,8 @@ use tendermint::{ Event, }, account, + block::Header, + chain::Id, AppHash, Hash, }; @@ -87,6 +90,7 @@ use crate::{ }, fees::StateReadExt as _, grpc::StateWriteExt as _, + ibc::host_interface::AstriaHost, mempool::{ Mempool, RemovalReason, @@ -305,7 +309,8 @@ impl App { } /// sets up the state for execution of the block's transactions. - /// set the current height and timestamp, and removes byzantine validators from state. + /// set the current height and timestamp, removes byzantine validators from state, and calls + /// [`Ibc::begin_block`]. /// /// this *must* be called anytime before a block's txs are executed, whether it's /// during the proposal phase, or finalize_block phase. @@ -314,6 +319,12 @@ impl App { &mut self, block_data: BlockData, ) -> Result<()> { + let chain_id = self + .state + .get_chain_id() + .await + .wrap_err("failed to get chain ID from state")?; + // reset recost flag self.recost_mempool = false; @@ -339,6 +350,16 @@ impl App { .put_validator_set(current_set) .wrap_err("failed putting validator set")?; + let begin_block = create_begin_block(self.app_hash.clone(), chain_id, &block_data); + let mut arc_state_tx = Arc::new(state_tx); + Ibc::begin_block::>>>( + &mut arc_state_tx, + &begin_block, + ) + .await; + let state_tx = Arc::try_unwrap(arc_state_tx) + .expect("components should not retain copies of shared state"); + self.apply(state_tx); Ok(()) @@ -635,6 +656,8 @@ struct BlockData { misbehavior: Vec, height: tendermint::block::Height, time: tendermint::Time, + next_validators_hash: Hash, + proposer_address: account::Id, } #[derive(Clone, Debug)] @@ -922,3 +945,38 @@ fn tx_sequence_data_bytes(tx: &Arc) -> usize { .filter_map(Action::as_rollup_data_submission) .fold(0usize, |acc, seq| acc.saturating_add(seq.data.len())) } + +fn create_begin_block( + app_hash: AppHash, + chain_id: Id, + block_data: &BlockData, +) -> abci::request::BeginBlock { + abci::request::BeginBlock { + hash: Hash::default(), // unused + byzantine_validators: block_data.misbehavior.clone(), + header: Header { + app_hash, + chain_id, + consensus_hash: Hash::default(), // unused + data_hash: Some(Hash::default()), // unused + evidence_hash: Some(Hash::default()), // unused + height: block_data.height, + last_block_id: None, // unused + last_commit_hash: Some(Hash::default()), // unused + last_results_hash: Some(Hash::default()), // unused + next_validators_hash: block_data.next_validators_hash, + proposer_address: block_data.proposer_address, + time: block_data.time, + validators_hash: Hash::default(), // unused + version: tendermint::block::header::Version { + // unused + app: 0, + block: 0, + }, + }, + last_commit_info: tendermint::abci::types::CommitInfo { + round: 0u16.into(), // unused + votes: vec![], + }, // unused + } +} diff --git a/crates/astria-sequencer/src/app/tests_app/mod.rs b/crates/astria-sequencer/src/app/tests_app/mod.rs index 1e00dbf14f..7fc062c2dd 100644 --- a/crates/astria-sequencer/src/app/tests_app/mod.rs +++ b/crates/astria-sequencer/src/app/tests_app/mod.rs @@ -109,6 +109,8 @@ async fn app_prepare_state_for_execution() { misbehavior: vec![], height: 1u8.into(), time: Time::now(), + next_validators_hash: Hash::default(), + proposer_address: account::Id::try_from([0u8; 20].to_vec()).unwrap(), }; app.prepare_state_for_execution(block_data.clone()) @@ -153,6 +155,8 @@ async fn app_prepare_state_for_execution_remove_byzantine_validators() { misbehavior: vec![misbehavior], height: 1u8.into(), time: Time::now(), + next_validators_hash: Hash::default(), + proposer_address: account::Id::try_from([0u8; 20].to_vec()).unwrap(), }; app.prepare_state_for_execution(block_data).await.unwrap(); From d830341a65c474baf6906ad2ffddefdc8f6d5661 Mon Sep 17 00:00:00 2001 From: ethanoroshiba Date: Thu, 7 Nov 2024 07:27:47 -0600 Subject: [PATCH 3/3] fix every action snapshot --- .../src/app/tests_breaking_changes.rs | 80 +++++++++++++------ 1 file changed, 57 insertions(+), 23 deletions(-) diff --git a/crates/astria-sequencer/src/app/tests_breaking_changes.rs b/crates/astria-sequencer/src/app/tests_breaking_changes.rs index 58d3bb064f..18e1c1a90f 100644 --- a/crates/astria-sequencer/src/app/tests_breaking_changes.rs +++ b/crates/astria-sequencer/src/app/tests_breaking_changes.rs @@ -71,7 +71,10 @@ use crate::{ nria, ASTRIA_PREFIX, }, - bridge::StateWriteExt as _, + bridge::{ + StateReadExt, + StateWriteExt as _, + }, proposal::commitment::generate_rollup_datas_commitment, }; @@ -179,6 +182,7 @@ async fn app_execute_transaction_with_every_action_snapshot() { let bridge_address = astria_address(&bridge.address_bytes()); let bob_address = astria_address_from_hex_string(BOB_ADDRESS); let carol_address = astria_address_from_hex_string(CAROL_ADDRESS); + let mut signed_txs = vec![]; let accounts = { let mut acc = default_genesis_accounts(); @@ -225,7 +229,8 @@ async fn app_execute_transaction_with_every_action_snapshot() { ]) .chain_id("test") .try_build() - .unwrap(); + .unwrap() + .sign(&alice); let tx_bundleable_sudo = TransactionBody::builder() .actions(vec![ @@ -239,7 +244,8 @@ async fn app_execute_transaction_with_every_action_snapshot() { .nonce(1) .chain_id("test") .try_build() - .unwrap(); + .unwrap() + .sign(&alice); let tx_sudo_ibc = TransactionBody::builder() .actions(vec![ @@ -251,7 +257,8 @@ async fn app_execute_transaction_with_every_action_snapshot() { .nonce(2) .chain_id("test") .try_build() - .unwrap(); + .unwrap() + .sign(&alice); let tx_sudo = TransactionBody::builder() .actions(vec![ @@ -263,23 +270,26 @@ async fn app_execute_transaction_with_every_action_snapshot() { .nonce(3) .chain_id("test") .try_build() - .unwrap(); + .unwrap() + .sign(&alice); - let signed_tx_general_bundleable = Arc::new(tx_bundleable_general.sign(&alice)); - app.execute_transaction(signed_tx_general_bundleable) + signed_txs.insert(0, tx_bundleable_general.clone()); + app.execute_transaction(Arc::new(tx_bundleable_general)) .await .unwrap(); - let signed_tx_sudo_bundleable = Arc::new(tx_bundleable_sudo.sign(&alice)); - app.execute_transaction(signed_tx_sudo_bundleable) + signed_txs.insert(0, tx_bundleable_sudo.clone()); + app.execute_transaction(Arc::new(tx_bundleable_sudo)) .await .unwrap(); - let signed_tx_sudo_ibc = Arc::new(tx_sudo_ibc.sign(&alice)); - app.execute_transaction(signed_tx_sudo_ibc).await.unwrap(); + signed_txs.insert(0, tx_sudo_ibc.clone()); + app.execute_transaction(Arc::new(tx_sudo_ibc)) + .await + .unwrap(); - let signed_tx_sudo = Arc::new(tx_sudo.sign(&alice)); - app.execute_transaction(signed_tx_sudo).await.unwrap(); + signed_txs.insert(0, tx_sudo.clone()); + app.execute_transaction(Arc::new(tx_sudo)).await.unwrap(); let tx = TransactionBody::builder() .actions(vec![ @@ -294,9 +304,10 @@ async fn app_execute_transaction_with_every_action_snapshot() { ]) .chain_id("test") .try_build() - .unwrap(); - let signed_tx = Arc::new(tx.sign(&bridge)); - app.execute_transaction(signed_tx).await.unwrap(); + .unwrap() + .sign(&bridge); + signed_txs.insert(0, tx.clone()); + app.execute_transaction(Arc::new(tx)).await.unwrap(); let tx_bridge_bundleable = TransactionBody::builder() .actions(vec![ @@ -322,10 +333,13 @@ async fn app_execute_transaction_with_every_action_snapshot() { .nonce(1) .chain_id("test") .try_build() - .unwrap(); + .unwrap() + .sign(&bridge); - let signed_tx = Arc::new(tx_bridge_bundleable.sign(&bridge)); - app.execute_transaction(signed_tx).await.unwrap(); + signed_txs.insert(0, tx_bridge_bundleable.clone()); + app.execute_transaction(Arc::new(tx_bridge_bundleable)) + .await + .unwrap(); let tx_bridge = TransactionBody::builder() .actions(vec![ @@ -340,10 +354,30 @@ async fn app_execute_transaction_with_every_action_snapshot() { .nonce(2) .chain_id("test") .try_build() - .unwrap(); - - let signed_tx = Arc::new(tx_bridge.sign(&bridge)); - app.execute_transaction(signed_tx).await.unwrap(); + .unwrap() + .sign(&bridge); + + signed_txs.insert(0, tx_bridge.clone()); + app.execute_transaction(Arc::new(tx_bridge)).await.unwrap(); + + let cached_deposits = app.state.get_cached_block_deposits(); + let commitments = generate_rollup_datas_commitment(&signed_txs, cached_deposits.clone()); + + app.update_state_and_end_block( + Hash::Sha256([0u8; 32]), + 1u32.into(), + Time::now(), + [0; 20].to_vec().try_into().unwrap(), + commitments.into_transactions( + signed_txs + .iter() + .map(|tx| tx.to_raw().encode_to_vec().into()) + .collect(), + ), + vec![], + ) + .await + .unwrap(); app.prepare_commit(storage.clone()).await.unwrap(); app.commit(storage.clone()).await;