diff --git a/Cargo.lock b/Cargo.lock index 063e3a9d2b..74eec93164 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -593,6 +593,7 @@ dependencies = [ "astria-config", "astria-core", "astria-eyre", + "astria-grpc-mock", "astria-sequencer-client", "astria-telemetry", "astria-test-utils", diff --git a/charts/composer/Chart.yaml b/charts/composer/Chart.yaml index 26b0666774..dd68073389 100644 --- a/charts/composer/Chart.yaml +++ b/charts/composer/Chart.yaml @@ -15,7 +15,7 @@ type: application # This is the chart version. This version number should be incremented each time you make changes # to the chart and its templates, including the app version. # Versions are expected to follow Semantic Versioning (https://semver.org/) -version: 0.1.5 +version: 0.1.6 # This is the version number of the application being deployed. This version number should be # incremented each time you make changes to the application. Versions are not expected to diff --git a/charts/composer/templates/configmap.yaml b/charts/composer/templates/configmap.yaml index 3d9b13a739..d11e419eaf 100644 --- a/charts/composer/templates/configmap.yaml +++ b/charts/composer/templates/configmap.yaml @@ -8,7 +8,6 @@ data: ASTRIA_COMPOSER_API_LISTEN_ADDR: "0.0.0.0:{{ .Values.ports.healthApi }}" ASTRIA_COMPOSER_GRPC_ADDR: "0.0.0.0:{{ .Values.ports.grpc }}" ASTRIA_COMPOSER_SEQUENCER_CHAIN_ID: "{{ tpl .Values.config.sequencerChainId . }}" - ASTRIA_COMPOSER_SEQUENCER_URL: "{{ tpl .Values.config.sequencerRpc . }}" ASTRIA_COMPOSER_ROLLUPS: "{{ include "composer.rollups" . }}" ASTRIA_COMPOSER_PRIVATE_KEY_FILE: "/var/secrets/{{ .Values.config.privateKey.secret.filename }}" ASTRIA_COMPOSER_MAX_BYTES_PER_BUNDLE: "{{ .Values.config.maxBytesPerBundle }}" @@ -30,7 +29,10 @@ data: OTEL_EXPORTER_OTLP_TRACE_HEADERS: "{{ tpl .Values.otel.traceHeaders . }}" OTEL_SERVICE_NAME: "{{ tpl .Values.otel.serviceName . }}" {{- if not .Values.global.dev }} + ASTRIA_COMPOSER_SEQUENCER_URL: "{{ tpl .Values.config.sequencerRpc . }}" {{- else }} + ASTRIA_COMPOSER_SEQUENCER_ABCI_ENDPOINT: "{{ tpl .Values.config.sequencerRpc . }}" + ASTRIA_COMPOSER_SEQUENCER_GRPC_ENDPOINT: "{{ tpl .Values.config.sequencerGrpc . }}" {{- end }} --- {{- if not .Values.secretProvider.enabled }} diff --git a/charts/composer/values.yaml b/charts/composer/values.yaml index 95e79f9185..7dd3f3ec1a 100644 --- a/charts/composer/values.yaml +++ b/charts/composer/values.yaml @@ -20,6 +20,7 @@ config: sequencerAddressPrefix: astria sequencerNativeAssetBaseDenomination: "nria" sequencerRpc: "" + sequencerGrpc: "" sequencerChainId: "" privateKey: devContent: "" diff --git a/charts/evm-stack/Chart.lock b/charts/evm-stack/Chart.lock index 59d9b0d782..f9ed82cec8 100644 --- a/charts/evm-stack/Chart.lock +++ b/charts/evm-stack/Chart.lock @@ -7,7 +7,7 @@ dependencies: version: 0.27.6 - name: composer repository: file://../composer - version: 0.1.5 + version: 0.1.6 - name: evm-faucet repository: file://../evm-faucet version: 0.1.2 @@ -20,5 +20,5 @@ dependencies: - name: blockscout-stack repository: https://blockscout.github.io/helm-charts version: 1.6.2 -digest: sha256:0428a6d56fd86c170e322ad79c7b5f87628b6187a1df5ad47ae7c2281b7f12da -generated: "2024-10-14T15:11:45.153501+02:00" +digest: sha256:80a70740a70f834b6ff6cdcfbb5f4a3504d6963f784ff678d1d52a7284b1dc20 +generated: "2024-10-14T16:04:40.995885+02:00" diff --git a/charts/evm-stack/Chart.yaml b/charts/evm-stack/Chart.yaml index b208660420..24ce1d35c1 100644 --- a/charts/evm-stack/Chart.yaml +++ b/charts/evm-stack/Chart.yaml @@ -15,7 +15,7 @@ type: application # This is the chart version. This version number should be incremented each time you make changes # to the chart and its templates, including the app version. # Versions are expected to follow Semantic Versioning (https://semver.org/) -version: 0.6.3 +version: 0.6.4 dependencies: - name: celestia-node @@ -26,7 +26,7 @@ dependencies: version: 0.27.6 repository: "file://../evm-rollup" - name: composer - version: 0.1.5 + version: 0.1.6 repository: "file://../composer" condition: composer.enabled - name: evm-faucet diff --git a/crates/astria-composer/Cargo.toml b/crates/astria-composer/Cargo.toml index c17567aaac..7039b15af1 100644 --- a/crates/astria-composer/Cargo.toml +++ b/crates/astria-composer/Cargo.toml @@ -13,7 +13,11 @@ name = "astria-composer" [dependencies] astria-build-info = { path = "../astria-build-info", features = ["runtime"] } -astria-core = { path = "../astria-core", features = ["serde", "server"] } +astria-core = { path = "../astria-core", features = [ + "client", + "serde", + "server", +] } astria-eyre = { path = "../astria-eyre" } config = { package = "astria-config", path = "../astria-config" } telemetry = { package = "astria-telemetry", path = "../astria-telemetry", features = [ @@ -59,6 +63,7 @@ path = "../astria-sequencer-client" features = ["http"] [dev-dependencies] +astria-grpc-mock = { path = "../astria-grpc-mock" } config = { package = "astria-config", path = "../astria-config", features = [ "tests", ] } diff --git a/crates/astria-composer/local.env.example b/crates/astria-composer/local.env.example index 8936089de1..eb37561857 100644 --- a/crates/astria-composer/local.env.example +++ b/crates/astria-composer/local.env.example @@ -24,8 +24,11 @@ NO_COLOR= # Address of the API server ASTRIA_COMPOSER_API_LISTEN_ADDR="0.0.0.0:0" -# Address of the RPC server for the sequencer chain -ASTRIA_COMPOSER_SEQUENCER_URL="http://127.0.0.1:26657" +# Address of the ABCI server for the sequencer chain +ASTRIA_COMPOSER_SEQUENCER_ABCI_ENDPOINT="http://127.0.0.1:26657" + +# Address of the gRPC server for the sequencer chain +ASTRIA_COMPOSER_SEQUENCER_GRPC_ENDPOINT="http://127.0.0.1:8080" # Chain ID of the sequencer chain which transactions are submitted to. ASTRIA_COMPOSER_SEQUENCER_CHAIN_ID="astria-dev-1" diff --git a/crates/astria-composer/src/composer.rs b/crates/astria-composer/src/composer.rs index dc9ab4ce17..4a6e5b774d 100644 --- a/crates/astria-composer/src/composer.rs +++ b/crates/astria-composer/src/composer.rs @@ -124,7 +124,8 @@ impl Composer { let shutdown_token = CancellationToken::new(); let (executor, executor_handle) = executor::Builder { - sequencer_url: cfg.sequencer_url.clone(), + sequencer_abci_endpoint: cfg.sequencer_abci_endpoint.clone(), + sequencer_grpc_endpoint: cfg.sequencer_grpc_endpoint.clone(), sequencer_chain_id: cfg.sequencer_chain_id.clone(), private_key_file: cfg.private_key_file.clone(), sequencer_address_prefix: cfg.sequencer_address_prefix.clone(), diff --git a/crates/astria-composer/src/config.rs b/crates/astria-composer/src/config.rs index b3f127d9ab..227dd2c2a6 100644 --- a/crates/astria-composer/src/config.rs +++ b/crates/astria-composer/src/config.rs @@ -26,8 +26,11 @@ pub struct Config { /// Address of the API server pub api_listen_addr: SocketAddr, - /// Address of the RPC server for the sequencer chain - pub sequencer_url: String, + /// Address of the ABCI server for the sequencer chain + pub sequencer_abci_endpoint: String, + + /// Address of the GRPC server for the sequencer chain + pub sequencer_grpc_endpoint: String, /// The chain ID of the sequencer chain pub sequencer_chain_id: String, diff --git a/crates/astria-composer/src/executor/builder.rs b/crates/astria-composer/src/executor/builder.rs index b84ef794d6..e8b1e972b7 100644 --- a/crates/astria-composer/src/executor/builder.rs +++ b/crates/astria-composer/src/executor/builder.rs @@ -6,6 +6,7 @@ use std::{ use astria_core::{ crypto::SigningKey, + generated::sequencerblock::v1alpha1::sequencer_service_client::SequencerServiceClient, primitive::v1::Address, protocol::transaction::v1alpha1::action::Sequence, }; @@ -24,7 +25,8 @@ use crate::{ }; pub(crate) struct Builder { - pub(crate) sequencer_url: String, + pub(crate) sequencer_abci_endpoint: String, + pub(crate) sequencer_grpc_endpoint: String, pub(crate) sequencer_chain_id: String, pub(crate) private_key_file: String, pub(crate) sequencer_address_prefix: String, @@ -38,7 +40,8 @@ pub(crate) struct Builder { impl Builder { pub(crate) fn build(self) -> eyre::Result<(super::Executor, executor::Handle)> { let Self { - sequencer_url, + sequencer_abci_endpoint, + sequencer_grpc_endpoint, sequencer_chain_id, private_key_file, sequencer_address_prefix, @@ -48,8 +51,14 @@ impl Builder { shutdown_token, metrics, } = self; - let sequencer_client = sequencer_client::HttpClient::new(sequencer_url.as_str()) - .wrap_err("failed constructing sequencer client")?; + let abci_client = sequencer_client::HttpClient::new(sequencer_abci_endpoint.as_str()) + .wrap_err("failed constructing sequencer http client")?; + + let grpc_client = + connect_sequencer_grpc(sequencer_grpc_endpoint.as_str()).wrap_err_with(|| { + format!("failed to connect to sequencer over gRPC at `{sequencer_grpc_endpoint}`") + })?; + let (status, _) = watch::channel(Status::new()); let sequencer_key = read_signing_key_from_file(&private_key_file).wrap_err_with(|| { @@ -69,7 +78,8 @@ impl Builder { super::Executor { status, serialized_rollup_transactions: serialized_rollup_transaction_rx, - sequencer_client, + abci_client, + grpc_client, sequencer_chain_id, sequencer_key, address: sequencer_address, @@ -91,3 +101,14 @@ fn read_signing_key_from_file>(path: P) -> eyre::Result eyre::Result> { + let uri: tonic::transport::Uri = grpc_endpoint + .parse() + .wrap_err("failed to parse endpoint as URI")?; + Ok(SequencerServiceClient::new( + tonic::transport::Endpoint::from(uri).connect_lazy(), + )) +} diff --git a/crates/astria-composer/src/executor/mod.rs b/crates/astria-composer/src/executor/mod.rs index f1512d43b7..45b0efdc22 100644 --- a/crates/astria-composer/src/executor/mod.rs +++ b/crates/astria-composer/src/executor/mod.rs @@ -12,6 +12,13 @@ use std::{ use astria_core::{ crypto::SigningKey, + generated::sequencerblock::v1alpha1::{ + sequencer_service_client::{ + self, + SequencerServiceClient, + }, + GetPendingNonceRequest, + }, protocol::{ abci::AbciErrorCode, transaction::v1alpha1::{ @@ -64,6 +71,7 @@ use tokio::{ }, }; use tokio_util::sync::CancellationToken; +use tonic::transport::Channel; use tracing::{ debug, error, @@ -89,8 +97,6 @@ use crate::{ mod bundle_factory; pub(crate) mod builder; -#[cfg(test)] -mod tests; pub(crate) use builder::Builder; @@ -122,8 +128,10 @@ pub(super) struct Executor { // Channel for receiving `SequenceAction`s to be bundled. serialized_rollup_transactions: mpsc::Receiver, // The client for submitting wrapped and signed pending eth transactions to the astria - // sequencer. - sequencer_client: sequencer_client::HttpClient, + // sequencer via the ABCI client. + abci_client: sequencer_client::HttpClient, + // The grpc client for grabbing the latest nonce from. + grpc_client: sequencer_service_client::SequencerServiceClient, // The chain id used for submission of transactions to the sequencer. sequencer_chain_id: String, // Private key used to sign sequencer transactions @@ -197,7 +205,8 @@ impl Executor { metrics: &'static Metrics, ) -> Fuse> { SubmitFut { - client: self.sequencer_client.clone(), + abci_client: self.abci_client.clone(), + grpc_client: self.grpc_client.clone(), address: self.address, nonce, chain_id: self.sequencer_chain_id.clone(), @@ -332,7 +341,7 @@ impl Executor { self.ensure_chain_id_is_correct() .await .wrap_err("failed to validate chain id")?; - let nonce = get_latest_nonce(self.sequencer_client.clone(), self.address, self.metrics) + let nonce = get_pending_nonce(self.grpc_client.clone(), self.address, self.metrics) .await .wrap_err("failed getting initial nonce from sequencer")?; Ok(nonce) @@ -378,10 +387,9 @@ impl Executor { futures::future::ready(()) }, ); - let client_genesis: tendermint::Genesis = - tryhard::retry_fn(|| self.sequencer_client.genesis()) - .with_config(retry_config) - .await?; + let client_genesis: tendermint::Genesis = tryhard::retry_fn(|| self.abci_client.genesis()) + .with_config(retry_config) + .await?; Ok(client_genesis.chain_id) } @@ -460,23 +468,21 @@ impl Executor { } } -/// Queries the sequencer for the latest nonce with an exponential backoff -#[instrument(name = "get latest nonce", skip_all, fields(%address), err)] -async fn get_latest_nonce( - client: sequencer_client::HttpClient, +/// Queries the sequencer for the latest pending nonce with an exponential backoff +#[instrument(name = "get pending nonce", skip_all, fields(%address), err)] +async fn get_pending_nonce( + client: sequencer_service_client::SequencerServiceClient, address: Address, metrics: &Metrics, ) -> eyre::Result { - debug!("fetching latest nonce from sequencer"); + debug!("fetching pending nonce from sequencer"); let span = Span::current(); let start = Instant::now(); let retry_config = tryhard::RetryFutureConfig::new(1024) .exponential_backoff(Duration::from_millis(200)) .max_delay(Duration::from_secs(60)) .on_retry( - |attempt, - next_delay: Option, - err: &sequencer_client::extension_trait::Error| { + |attempt, next_delay: Option, err: &tonic::Status| { metrics.increment_nonce_fetch_failure_count(); let wait_duration = next_delay @@ -493,14 +499,22 @@ async fn get_latest_nonce( }, ); let res = tryhard::retry_fn(|| { - let client = client.clone(); - let span = info_span!(parent: span.clone(), "attempt get nonce"); + let mut client = client.clone(); + let span = info_span!(parent: span.clone(), "attempt get pending nonce"); metrics.increment_nonce_fetch_count(); - async move { client.get_latest_nonce(address).await.map(|rsp| rsp.nonce) }.instrument(span) + async move { + client + .get_pending_nonce(GetPendingNonceRequest { + address: Some(address.into_raw()), + }) + .await + .map(|rsp| rsp.into_inner().inner) + } + .instrument(span) }) .with_config(retry_config) .await - .wrap_err("failed getting latest nonce from sequencer after 1024 attempts"); + .wrap_err("failed getting pending nonce from sequencer after 1024 attempts"); metrics.record_nonce_fetch_latency(start.elapsed()); @@ -637,7 +651,8 @@ pin_project! { /// If the sequencer returned a non-zero abci code (albeit not `INVALID_NONCE`), this future will return with /// that nonce it used to submit the non-zero abci code request. struct SubmitFut { - client: sequencer_client::HttpClient, + abci_client: sequencer_client::HttpClient, + grpc_client: SequencerServiceClient, address: Address, chain_id: String, nonce: u32, @@ -670,6 +685,8 @@ impl Future for SubmitFut { // FIXME (https://github.com/astriaorg/astria/issues/1572): This function is too long and should be refactored. fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll { const INVALID_NONCE: Code = Code::Err(AbciErrorCode::INVALID_NONCE.value()); + const NONCE_TAKEN: Code = Code::Err(AbciErrorCode::NONCE_TAKEN.value()); + loop { let this = self.as_mut().project(); @@ -686,7 +703,7 @@ impl Future for SubmitFut { "submitting transaction to sequencer", ); SubmitState::WaitingForSend { - fut: submit_tx(this.client.clone(), tx, self.metrics).boxed(), + fut: submit_tx(this.abci_client.clone(), tx, self.metrics).boxed(), } } @@ -708,14 +725,14 @@ impl Future for SubmitFut { .checked_add(1) .expect("nonce should not overflow"))); } - INVALID_NONCE => { + INVALID_NONCE | NONCE_TAKEN => { info!( "sequencer rejected transaction due to invalid nonce; fetching \ new nonce" ); SubmitState::WaitingForNonce { - fut: get_latest_nonce( - this.client.clone(), + fut: get_pending_nonce( + this.grpc_client.clone(), *this.address, self.metrics, ) @@ -759,7 +776,7 @@ impl Future for SubmitFut { "resubmitting transaction to sequencer with new nonce", ); SubmitState::WaitingForSend { - fut: submit_tx(this.client.clone(), tx, self.metrics).boxed(), + fut: submit_tx(this.abci_client.clone(), tx, self.metrics).boxed(), } } Err(error) => { diff --git a/crates/astria-composer/src/executor/tests.rs b/crates/astria-composer/src/executor/tests.rs deleted file mode 100644 index 5abb7c6ff7..0000000000 --- a/crates/astria-composer/src/executor/tests.rs +++ /dev/null @@ -1,647 +0,0 @@ -use std::{ - io::Write, - net::{ - IpAddr, - SocketAddr, - }, - sync::LazyLock, - time::Duration, -}; - -use astria_core::{ - generated::protocol::accounts::v1alpha1::NonceResponse, - primitive::v1::{ - asset::{ - Denom, - IbcPrefixed, - }, - RollupId, - ROLLUP_ID_LEN, - }, - protocol::transaction::v1alpha1::action::Sequence, -}; -use astria_eyre::eyre; -use prost::{ - bytes::Bytes, - Message as _, -}; -use sequencer_client::SignedTransaction; -use serde_json::json; -use telemetry::Metrics as _; -use tempfile::NamedTempFile; -use tendermint::{ - consensus::{ - params::{ - AbciParams, - ValidatorParams, - }, - Params, - }, - Genesis, - Time, -}; -use tendermint_rpc::{ - endpoint::broadcast::tx_sync, - request, - response, - Id, -}; -use tokio::{ - sync::watch, - time, -}; -use tokio_util::sync::CancellationToken; -use tracing::debug; -use wiremock::{ - matchers::{ - body_partial_json, - body_string_contains, - }, - Mock, - MockGuard, - MockServer, - Request, - ResponseTemplate, -}; - -use crate::{ - executor, - executor::EnsureChainIdError, - metrics::Metrics, - test_utils::sequence_action_of_max_size, - Config, -}; - -static TELEMETRY: LazyLock<()> = LazyLock::new(|| { - // This config can be meaningless - it's only used inside `try_init` to init the metrics, but we - // haven't configured telemetry to provide metrics here. - let config = Config { - log: String::new(), - api_listen_addr: SocketAddr::new(IpAddr::from([0, 0, 0, 0]), 0), - sequencer_url: String::new(), - sequencer_chain_id: String::new(), - rollups: String::new(), - private_key_file: String::new(), - sequencer_address_prefix: String::new(), - block_time_ms: 0, - max_bytes_per_bundle: 0, - bundle_queue_capacity: 0, - force_stdout: false, - no_otel: false, - no_metrics: false, - metrics_http_listener_addr: String::new(), - pretty_print: false, - grpc_addr: SocketAddr::new(IpAddr::from([0, 0, 0, 0]), 0), - fee_asset: Denom::IbcPrefixed(IbcPrefixed::new([0; 32])), - }; - if std::env::var_os("TEST_LOG").is_some() { - let filter_directives = std::env::var("RUST_LOG").unwrap_or_else(|_| "info".into()); - telemetry::configure() - .set_no_otel(true) - .set_stdout_writer(std::io::stdout) - .set_filter_directives(&filter_directives) - .try_init::(&config) - .unwrap(); - } else { - telemetry::configure() - .set_no_otel(true) - .set_stdout_writer(std::io::sink) - .try_init::(&config) - .unwrap(); - } -}); - -fn sequence_action() -> Sequence { - Sequence { - rollup_id: RollupId::new([0; ROLLUP_ID_LEN]), - data: Bytes::new(), - fee_asset: "nria".parse().unwrap(), - } -} - -/// Start a mock sequencer server and mount a mock for the `accounts/nonce` query. -async fn setup() -> (MockServer, Config, NamedTempFile) { - LazyLock::force(&TELEMETRY); - let server = MockServer::start().await; - - let keyfile = NamedTempFile::new().unwrap(); - (&keyfile) - .write_all("2bd806c97f0e00af1a1fc3328fa763a9269723c8db8fac4f93af71db186d6e90".as_bytes()) - .unwrap(); - - let cfg = Config { - log: String::new(), - api_listen_addr: "127.0.0.1:0".parse().unwrap(), - rollups: String::new(), - sequencer_url: server.uri(), - sequencer_chain_id: "test-chain-1".to_string(), - private_key_file: keyfile.path().to_string_lossy().to_string(), - sequencer_address_prefix: "astria".into(), - block_time_ms: 2000, - max_bytes_per_bundle: 1000, - bundle_queue_capacity: 10, - no_otel: false, - force_stdout: false, - no_metrics: false, - metrics_http_listener_addr: String::new(), - pretty_print: true, - grpc_addr: "127.0.0.1:0".parse().unwrap(), - fee_asset: "nria".parse().unwrap(), - }; - (server, cfg, keyfile) -} - -/// Assert that given error is of correct type and contains the expected chain IDs. -#[track_caller] -fn assert_chain_id_err( - err: &EnsureChainIdError, - configured_expected: &str, - configured_actual: &tendermint::chain::Id, -) { - match err { - EnsureChainIdError::WrongChainId { - expected, - actual, - } => { - assert_eq!(*expected, configured_expected); - assert_eq!(*actual, *configured_actual); - } - other @ EnsureChainIdError::GetChainId(_) => { - panic!("expected `EnsureChainIdError::WrongChainId`, but got '{other:?}'") - } - } -} - -/// Mount a mock for the `abci_query` endpoint. -async fn mount_default_nonce_query_mock(server: &MockServer) -> MockGuard { - let query_path = "accounts/nonce"; - let response = NonceResponse { - height: 0, - nonce: 0, - }; - let expected_body = json!({ - "method": "abci_query" - }); - let response = tendermint_rpc::endpoint::abci_query::Response { - response: tendermint_rpc::endpoint::abci_query::AbciQuery { - value: response.encode_to_vec(), - ..Default::default() - }, - }; - let wrapper = response::Wrapper::new_with_id(Id::Num(1), Some(response), None); - Mock::given(body_partial_json(&expected_body)) - .and(body_string_contains(query_path)) - .respond_with( - ResponseTemplate::new(200) - .set_body_json(&wrapper) - .append_header("Content-Type", "application/json"), - ) - .up_to_n_times(1) - .expect(1) - .mount_as_scoped(server) - .await -} - -/// Convert a `Request` object to a `SignedTransaction` -fn signed_tx_from_request(request: &Request) -> SignedTransaction { - use astria_core::generated::protocol::transactions::v1alpha1::SignedTransaction as RawSignedTransaction; - use prost::Message as _; - - let wrapped_tx_sync_req: request::Wrapper = - serde_json::from_slice(&request.body) - .expect("can't deserialize to JSONRPC wrapped tx_sync::Request"); - let raw_signed_tx = RawSignedTransaction::decode(&*wrapped_tx_sync_req.params().tx) - .expect("can't deserialize signed sequencer tx from broadcast jsonrpc request"); - let signed_tx = SignedTransaction::try_from_raw(raw_signed_tx) - .expect("can't convert raw signed tx to checked signed tx"); - debug!(?signed_tx, "sequencer mock received signed transaction"); - - signed_tx -} - -/// Deserializes the bytes contained in a `tx_sync::Request` to a signed sequencer transaction -/// and verifies that the contained sequence action is in the given `expected_rollup_ids` and -/// `expected_nonces`. -async fn mount_broadcast_tx_sync_seq_actions_mock(server: &MockServer) -> MockGuard { - let matcher = move |request: &Request| { - let signed_tx = signed_tx_from_request(request); - let actions = signed_tx.actions(); - - // verify all received actions are sequence actions - actions.iter().all(|action| action.as_sequence().is_some()) - }; - let jsonrpc_rsp = response::Wrapper::new_with_id( - Id::Num(1), - Some(tx_sync::Response { - code: 0.into(), - data: vec![].into(), - log: String::new(), - hash: tendermint::Hash::Sha256([0; 32]), - }), - None, - ); - - Mock::given(matcher) - .respond_with(ResponseTemplate::new(200).set_body_json(&jsonrpc_rsp)) - .up_to_n_times(1) - .expect(1) - .mount_as_scoped(server) - .await -} - -/// Mounts genesis file with specified sequencer chain ID -async fn mount_genesis(server: &MockServer, mock_sequencer_chain_id: &str) { - Mock::given(body_partial_json( - json!({"jsonrpc": "2.0", "method": "genesis", "params": null}), - )) - .respond_with(ResponseTemplate::new(200).set_body_json( - tendermint_rpc::response::Wrapper::new_with_id( - tendermint_rpc::Id::uuid_v4(), - Some( - tendermint_rpc::endpoint::genesis::Response:: { - genesis: Genesis { - genesis_time: Time::from_unix_timestamp(1, 1).unwrap(), - chain_id: mock_sequencer_chain_id.try_into().unwrap(), - initial_height: 1, - consensus_params: Params { - block: tendermint::block::Size { - max_bytes: 1024, - max_gas: 1024, - time_iota_ms: 1000, - }, - evidence: tendermint::evidence::Params { - max_age_num_blocks: 1000, - max_age_duration: tendermint::evidence::Duration( - Duration::from_secs(3600), - ), - max_bytes: 1_048_576, - }, - validator: ValidatorParams { - pub_key_types: vec![tendermint::public_key::Algorithm::Ed25519], - }, - version: None, - abci: AbciParams::default(), - }, - validators: vec![], - app_hash: tendermint::hash::AppHash::default(), - app_state: serde_json::Value::Null, - }, - }, - ), - None, - ), - )) - .expect(1..) - .mount(server) - .await; -} - -/// Helper to wait for the executor to connect to the mock sequencer -async fn wait_for_startup( - mut status: watch::Receiver, - nonce_guard: MockGuard, -) -> eyre::Result<()> { - // wait to receive executor status - status - .wait_for(executor::Status::is_connected) - .await - .unwrap(); - - tokio::time::timeout( - Duration::from_millis(100), - nonce_guard.wait_until_satisfied(), - ) - .await - .unwrap(); - - Ok(()) -} - -/// Test to check that the executor sends a signed transaction to the sequencer as soon as it -/// receives a `SequenceAction` that fills it beyond its `max_bundle_size`. -#[tokio::test] -async fn full_bundle() { - // set up the executor, channel for writing seq actions, and the sequencer mock - let (sequencer, cfg, _keyfile) = setup().await; - let shutdown_token = CancellationToken::new(); - let metrics = Box::leak(Box::new(Metrics::noop_metrics(&cfg).unwrap())); - mount_genesis(&sequencer, &cfg.sequencer_chain_id).await; - let (executor, executor_handle) = executor::Builder { - sequencer_url: cfg.sequencer_url.clone(), - sequencer_chain_id: cfg.sequencer_chain_id.clone(), - private_key_file: cfg.private_key_file.clone(), - sequencer_address_prefix: "astria".into(), - block_time_ms: cfg.block_time_ms, - max_bytes_per_bundle: cfg.max_bytes_per_bundle, - bundle_queue_capacity: cfg.bundle_queue_capacity, - shutdown_token: shutdown_token.clone(), - metrics, - } - .build() - .unwrap(); - - let nonce_guard = mount_default_nonce_query_mock(&sequencer).await; - let status = executor.subscribe(); - - let _executor_task = tokio::spawn(executor.run_until_stopped()); - // wait for sequencer to get the initial nonce request from sequencer - wait_for_startup(status, nonce_guard).await.unwrap(); - - let response_guard = mount_broadcast_tx_sync_seq_actions_mock(&sequencer).await; - - // send two sequence actions to the executor, the first of which is large enough to fill the - // bundle sending the second should cause the first to immediately be submitted in - // order to make space for the second - let seq0 = sequence_action_of_max_size(cfg.max_bytes_per_bundle); - - let seq1 = Sequence { - rollup_id: RollupId::new([1; ROLLUP_ID_LEN]), - ..sequence_action_of_max_size(cfg.max_bytes_per_bundle) - }; - - // push both sequence actions to the executor in order to force the full bundle to be sent - executor_handle - .send_timeout(seq0.clone(), Duration::from_millis(1000)) - .await - .unwrap(); - executor_handle - .send_timeout(seq1.clone(), Duration::from_millis(1000)) - .await - .unwrap(); - - // wait for the mock sequencer to receive the signed transaction - tokio::time::timeout( - Duration::from_millis(100), - response_guard.wait_until_satisfied(), - ) - .await - .unwrap(); - - // verify only one signed transaction was received by the mock sequencer - // i.e. only the full bundle was sent and not the second one due to the block timer - let expected_seq_actions = [seq0]; - let requests = response_guard.received_requests().await; - assert_eq!(requests.len(), 1); - - // verify the expected sequence actions were received - let signed_tx = signed_tx_from_request(&requests[0]); - let actions = signed_tx.actions(); - - assert_eq!( - actions.len(), - expected_seq_actions.len(), - "received more than one action, one was supposed to fill the bundle" - ); - - for (action, expected_seq_action) in actions.iter().zip(expected_seq_actions.iter()) { - let seq_action = action.as_sequence().unwrap(); - assert_eq!( - seq_action.rollup_id, expected_seq_action.rollup_id, - "chain id does not match. actual {:?} expected {:?}", - seq_action.rollup_id, expected_seq_action.rollup_id - ); - assert_eq!( - seq_action.data, expected_seq_action.data, - "data does not match expected data for action with rollup_id {:?}", - seq_action.rollup_id, - ); - } -} - -/// Test to check that the executor sends a signed transaction to the sequencer after its -/// `block_timer` has ticked -#[tokio::test] -async fn bundle_triggered_by_block_timer() { - // set up the executor, channel for writing seq actions, and the sequencer mock - let (sequencer, cfg, _keyfile) = setup().await; - let shutdown_token = CancellationToken::new(); - let metrics = Box::leak(Box::new(Metrics::noop_metrics(&cfg).unwrap())); - mount_genesis(&sequencer, &cfg.sequencer_chain_id).await; - let (executor, executor_handle) = executor::Builder { - sequencer_url: cfg.sequencer_url.clone(), - sequencer_chain_id: cfg.sequencer_chain_id.clone(), - private_key_file: cfg.private_key_file.clone(), - sequencer_address_prefix: "astria".into(), - block_time_ms: cfg.block_time_ms, - max_bytes_per_bundle: cfg.max_bytes_per_bundle, - bundle_queue_capacity: cfg.bundle_queue_capacity, - shutdown_token: shutdown_token.clone(), - metrics, - } - .build() - .unwrap(); - - let nonce_guard = mount_default_nonce_query_mock(&sequencer).await; - let status = executor.subscribe(); - - let _executor_task = tokio::spawn(executor.run_until_stopped()); - - // wait for sequencer to get the initial nonce request from sequencer - wait_for_startup(status, nonce_guard).await.unwrap(); - - let response_guard = mount_broadcast_tx_sync_seq_actions_mock(&sequencer).await; - - // send two sequence actions to the executor, both small enough to fit in a single bundle - // without filling it - let seq0 = Sequence { - data: vec![0u8; cfg.max_bytes_per_bundle / 4].into(), - ..sequence_action() - }; - - // make sure at least one block has passed so that the executor will submit the bundle - // despite it not being full - time::pause(); - executor_handle - .send_timeout(seq0.clone(), Duration::from_millis(1000)) - .await - .unwrap(); - time::advance(Duration::from_millis(cfg.block_time_ms)).await; - time::resume(); - - // wait for the mock sequencer to receive the signed transaction - tokio::time::timeout( - Duration::from_millis(100), - response_guard.wait_until_satisfied(), - ) - .await - .unwrap(); - - // verify only one signed transaction was received by the mock sequencer - let expected_seq_actions = [seq0]; - let requests = response_guard.received_requests().await; - assert_eq!(requests.len(), 1); - - // verify the expected sequence actions were received - let signed_tx = signed_tx_from_request(&requests[0]); - let actions = signed_tx.actions(); - - assert_eq!( - actions.len(), - expected_seq_actions.len(), - "received more than one action, one was supposed to fill the bundle" - ); - - for (action, expected_seq_action) in actions.iter().zip(expected_seq_actions.iter()) { - let seq_action = action.as_sequence().unwrap(); - assert_eq!( - seq_action.rollup_id, expected_seq_action.rollup_id, - "chain id does not match. actual {:?} expected {:?}", - seq_action.rollup_id, expected_seq_action.rollup_id - ); - assert_eq!( - seq_action.data, expected_seq_action.data, - "data does not match expected data for action with rollup_id {:?}", - seq_action.rollup_id, - ); - } -} - -/// Test to check that the executor sends a signed transaction with two sequence actions to the -/// sequencer. -#[tokio::test] -async fn two_seq_actions_single_bundle() { - // set up the executor, channel for writing seq actions, and the sequencer mock - let (sequencer, cfg, _keyfile) = setup().await; - let shutdown_token = CancellationToken::new(); - let metrics = Box::leak(Box::new(Metrics::noop_metrics(&cfg).unwrap())); - mount_genesis(&sequencer, &cfg.sequencer_chain_id).await; - let (executor, executor_handle) = executor::Builder { - sequencer_url: cfg.sequencer_url.clone(), - sequencer_chain_id: cfg.sequencer_chain_id.clone(), - private_key_file: cfg.private_key_file.clone(), - sequencer_address_prefix: "astria".into(), - block_time_ms: cfg.block_time_ms, - max_bytes_per_bundle: cfg.max_bytes_per_bundle, - bundle_queue_capacity: cfg.bundle_queue_capacity, - shutdown_token: shutdown_token.clone(), - metrics, - } - .build() - .unwrap(); - - let nonce_guard = mount_default_nonce_query_mock(&sequencer).await; - let status = executor.subscribe(); - let _executor_task = tokio::spawn(executor.run_until_stopped()); - - // wait for sequencer to get the initial nonce request from sequencer - wait_for_startup(status, nonce_guard).await.unwrap(); - - let response_guard = mount_broadcast_tx_sync_seq_actions_mock(&sequencer).await; - - // send two sequence actions to the executor, both small enough to fit in a single bundle - // without filling it - let seq0 = Sequence { - data: vec![0u8; cfg.max_bytes_per_bundle / 4].into(), - ..sequence_action() - }; - - let seq1 = Sequence { - rollup_id: RollupId::new([1; ROLLUP_ID_LEN]), - data: vec![1u8; cfg.max_bytes_per_bundle / 4].into(), - ..sequence_action() - }; - - // make sure at least one block has passed so that the executor will submit the bundle - // despite it not being full - time::pause(); - executor_handle - .send_timeout(seq0.clone(), Duration::from_millis(1000)) - .await - .unwrap(); - executor_handle - .send_timeout(seq1.clone(), Duration::from_millis(1000)) - .await - .unwrap(); - time::advance(Duration::from_millis(cfg.block_time_ms)).await; - time::resume(); - - // wait for the mock sequencer to receive the signed transaction - tokio::time::timeout( - Duration::from_millis(100), - response_guard.wait_until_satisfied(), - ) - .await - .unwrap(); - - // verify only one signed transaction was received by the mock sequencer - let expected_seq_actions = [seq0, seq1]; - let requests = response_guard.received_requests().await; - assert_eq!(requests.len(), 1); - - // verify the expected sequence actions were received - let signed_tx = signed_tx_from_request(&requests[0]); - let actions = signed_tx.actions(); - - assert_eq!( - actions.len(), - expected_seq_actions.len(), - "received more than one action, one was supposed to fill the bundle" - ); - - for (action, expected_seq_action) in actions.iter().zip(expected_seq_actions.iter()) { - let seq_action = action.as_sequence().unwrap(); - assert_eq!( - seq_action.rollup_id, expected_seq_action.rollup_id, - "chain id does not match. actual {:?} expected {:?}", - seq_action.rollup_id, expected_seq_action.rollup_id - ); - assert_eq!( - seq_action.data, expected_seq_action.data, - "data does not match expected data for action with rollup_id {:?}", - seq_action.rollup_id, - ); - } -} - -/// Test to check that executor's chain ID check is properly checked against the sequencer's chain -/// ID -#[tokio::test] -async fn chain_id_mismatch_returns_error() { - use tendermint::chain::Id; - - // set up sequencer mock - let (sequencer, cfg, _keyfile) = setup().await; - let shutdown_token = CancellationToken::new(); - let metrics = Box::leak(Box::new(Metrics::noop_metrics(&cfg).unwrap())); - - // mount a status response with an incorrect chain_id - mount_genesis(&sequencer, "bad-chain-id").await; - - // build the executor with the correct chain_id - let (executor, _executor_handle) = executor::Builder { - sequencer_url: cfg.sequencer_url.clone(), - sequencer_chain_id: cfg.sequencer_chain_id.clone(), - private_key_file: cfg.private_key_file.clone(), - sequencer_address_prefix: cfg.sequencer_address_prefix.clone(), - block_time_ms: cfg.block_time_ms, - max_bytes_per_bundle: cfg.max_bytes_per_bundle, - bundle_queue_capacity: cfg.bundle_queue_capacity, - shutdown_token: shutdown_token.clone(), - metrics, - } - .build() - .unwrap(); - - // ensure that run_until_stopped returns WrongChainId error - let err = executor.run_until_stopped().await.expect_err( - "should exit with an error when reading a bad chain ID, but exited with success", - ); - let mut found = false; - for cause in err.chain() { - if let Some(err) = cause.downcast_ref::() { - assert_chain_id_err( - err, - &cfg.sequencer_chain_id, - &Id::try_from("bad-chain-id".to_string()).unwrap(), - ); - found = true; - break; - } - } - - // ensure that the error chain contains the expected error - assert!( - found, - "expected `EnsureChainIdError::WrongChainId` in error chain, but it was not found" - ); -} diff --git a/crates/astria-composer/tests/blackbox/geth_collector.rs b/crates/astria-composer/tests/blackbox/geth_collector.rs index e6bce832e9..8acf6c8178 100644 --- a/crates/astria-composer/tests/blackbox/geth_collector.rs +++ b/crates/astria-composer/tests/blackbox/geth_collector.rs @@ -1,9 +1,6 @@ use std::time::Duration; -use astria_core::{ - generated::protocol::accounts::v1alpha1::NonceResponse, - primitive::v1::RollupId, -}; +use astria_core::primitive::v1::RollupId; use ethers::types::Transaction; use crate::helper::{ @@ -19,12 +16,6 @@ async fn tx_from_one_rollup_is_received_by_sequencer() { // Spawn a composer with a mock sequencer and a mock rollup node // Initial nonce is 0 let test_composer = spawn_composer(&["test1"]).await; - tokio::time::timeout( - Duration::from_millis(100), - test_composer.setup_guard.wait_until_satisfied(), - ) - .await - .expect("composer and sequencer should have been setup successfully"); let expected_rollup_ids = vec![RollupId::from_unhashed_bytes("test1")]; let mock_guard = @@ -47,12 +38,6 @@ async fn collector_restarts_after_exit() { // Spawn a composer with a mock sequencer and a mock rollup node // Initial nonce is 0 let test_composer = spawn_composer(&["test1"]).await; - tokio::time::timeout( - Duration::from_millis(100), - test_composer.setup_guard.wait_until_satisfied(), - ) - .await - .expect("composer and sequencer should have been setup successfully"); // get rollup node let rollup_node = test_composer.rollup_nodes.get("test1").unwrap(); @@ -84,17 +69,9 @@ async fn collector_restarts_after_exit() { #[tokio::test] async fn invalid_nonce_causes_resubmission_under_different_nonce() { - use crate::helper::mock_sequencer::mount_abci_query_mock; - // Spawn a composer with a mock sequencer and a mock rollup node // Initial nonce is 0 let test_composer = spawn_composer(&["test1"]).await; - tokio::time::timeout( - Duration::from_millis(100), - test_composer.setup_guard.wait_until_satisfied(), - ) - .await - .expect("composer and sequencer should have been setup successfully"); // Reject the first transaction for invalid nonce let invalid_nonce_guard = mount_broadcast_tx_sync_invalid_nonce_mock( @@ -103,22 +80,17 @@ async fn invalid_nonce_causes_resubmission_under_different_nonce() { ) .await; - // Mount a response of 0 to a nonce query - let nonce_refetch_guard = mount_abci_query_mock( - &test_composer.sequencer, - "accounts/nonce", - NonceResponse { - height: 0, - nonce: 1, - }, - ) - .await; - let expected_rollup_ids = vec![RollupId::from_unhashed_bytes("test1")]; // Expect nonce 1 again so that the resubmitted tx is accepted let valid_nonce_guard = mount_broadcast_tx_sync_mock(&test_composer.sequencer, expected_rollup_ids, vec![1]).await; + // Mount a response of 1 to a nonce query + test_composer + .sequencer_mock + .mount_pending_nonce_response(1, "setup correct nonce") + .await; + // Push a tx to the rollup node so that it is picked up by the composer and submitted with the // stored nonce of 0, triggering the nonce refetch process test_composer.rollup_nodes["test1"] @@ -135,10 +107,49 @@ async fn invalid_nonce_causes_resubmission_under_different_nonce() { tokio::time::timeout( Duration::from_millis(100), - nonce_refetch_guard.wait_until_satisfied(), + valid_nonce_guard.wait_until_satisfied(), ) .await - .expect("new nonce should have been fetched from the sequencer"); + .expect("sequencer tx should have been accepted after nonce refetch"); +} + +#[tokio::test] +async fn nonce_taken_causes_resubmission_under_different_nonce() { + // Spawn a composer with a mock sequencer and a mock rollup node + // Initial nonce is 0 + let test_composer = spawn_composer(&["test1"]).await; + + // Reject the first transaction for taken nonce + let invalid_nonce_guard = mount_broadcast_tx_sync_invalid_nonce_mock( + &test_composer.sequencer, + RollupId::from_unhashed_bytes("test1"), + ) + .await; + + let expected_rollup_ids = vec![RollupId::from_unhashed_bytes("test1")]; + // Expect nonce 1 again so that the resubmitted tx is accepted + let valid_nonce_guard = + mount_broadcast_tx_sync_mock(&test_composer.sequencer, expected_rollup_ids, vec![1]).await; + + // Mount a response of 1 to a nonce query + test_composer + .sequencer_mock + .mount_pending_nonce_response(1, "setup correct nonce") + .await; + + // Push a tx to the rollup node so that it is picked up by the composer and submitted with the + // stored nonce of 0, triggering the nonce refetch process + test_composer.rollup_nodes["test1"] + .push_tx(Transaction::default()) + .unwrap(); + + // wait for 1 sequencer block time to make sure the bundle is preempted + tokio::time::timeout( + Duration::from_millis(test_composer.cfg.block_time_ms + 1000), + invalid_nonce_guard.wait_until_satisfied(), + ) + .await + .expect("sequencer tx should have been rejected due to invalid nonce"); tokio::time::timeout( Duration::from_millis(100), @@ -153,12 +164,6 @@ async fn single_rollup_tx_payload_integrity() { // Spawn a composer with a mock sequencer and a mock rollup node // Initial nonce is 0 let test_composer = spawn_composer(&["test1"]).await; - tokio::time::timeout( - Duration::from_millis(100), - test_composer.setup_guard.wait_until_satisfied(), - ) - .await - .expect("composer and sequencer should have been setup successfully"); let tx: Transaction = serde_json::from_str(TEST_ETH_TX_JSON).unwrap(); let mock_guard = diff --git a/crates/astria-composer/tests/blackbox/grpc_collector.rs b/crates/astria-composer/tests/blackbox/grpc_collector.rs index 8a4a963acc..bc0586a073 100644 --- a/crates/astria-composer/tests/blackbox/grpc_collector.rs +++ b/crates/astria-composer/tests/blackbox/grpc_collector.rs @@ -1,12 +1,9 @@ use std::time::Duration; use astria_core::{ - generated::{ - composer::v1alpha1::{ - grpc_collector_service_client::GrpcCollectorServiceClient, - SubmitRollupTransactionRequest, - }, - protocol::accounts::v1alpha1::NonceResponse, + generated::composer::v1alpha1::{ + grpc_collector_service_client::GrpcCollectorServiceClient, + SubmitRollupTransactionRequest, }, primitive::v1::RollupId, }; @@ -24,13 +21,6 @@ use crate::helper::{ #[tokio::test] async fn tx_from_one_rollup_is_received_by_sequencer() { let test_composer = spawn_composer(&[]).await; - tokio::time::timeout( - Duration::from_millis(100), - test_composer.setup_guard.wait_until_satisfied(), - ) - .await - .expect("composer and sequencer were not setup successfully"); - let rollup_id = RollupId::from_unhashed_bytes("test1"); let expected_chain_ids = vec![rollup_id]; let mock_guard = @@ -63,33 +53,20 @@ async fn tx_from_one_rollup_is_received_by_sequencer() { #[tokio::test] async fn invalid_nonce_causes_resubmission_under_different_nonce() { - use crate::helper::mock_sequencer::mount_abci_query_mock; - // Spawn a composer with a mock sequencer and a mock rollup node // Initial nonce is 0 let rollup_id = RollupId::from_unhashed_bytes("test1"); let test_composer = spawn_composer(&[]).await; - tokio::time::timeout( - Duration::from_millis(100), - test_composer.setup_guard.wait_until_satisfied(), - ) - .await - .expect("composer and sequencer should have been setup successfully"); // Reject the first transaction for invalid nonce let invalid_nonce_guard = mount_broadcast_tx_sync_invalid_nonce_mock(&test_composer.sequencer, rollup_id).await; - // Mount a response of 0 to a nonce query - let nonce_refetch_guard = mount_abci_query_mock( - &test_composer.sequencer, - "accounts/nonce", - NonceResponse { - height: 0, - nonce: 1, - }, - ) - .await; + // Mount a response of 1 to a nonce query + test_composer + .sequencer_mock + .mount_pending_nonce_response(1, "setup correct nonce") + .await; let expected_chain_ids = vec![rollup_id]; // Expect nonce 1 again so that the resubmitted tx is accepted @@ -122,13 +99,6 @@ async fn invalid_nonce_causes_resubmission_under_different_nonce() { .await .expect("sequencer tx should have been rejected due to invalid nonce"); - tokio::time::timeout( - Duration::from_millis(100), - nonce_refetch_guard.wait_until_satisfied(), - ) - .await - .expect("new nonce should have been fetched from the sequencer"); - tokio::time::timeout( Duration::from_millis(100), valid_nonce_guard.wait_until_satisfied(), @@ -143,12 +113,6 @@ async fn single_rollup_tx_payload_integrity() { // Initial nonce is 0 let rollup_id = RollupId::from_unhashed_bytes("test1"); let test_composer = spawn_composer(&[]).await; - tokio::time::timeout( - Duration::from_millis(100), - test_composer.setup_guard.wait_until_satisfied(), - ) - .await - .expect("composer and sequencer should have been setup successfully"); let tx: Transaction = serde_json::from_str(TEST_ETH_TX_JSON).unwrap(); let mock_guard = diff --git a/crates/astria-composer/tests/blackbox/helper/mock_sequencer.rs b/crates/astria-composer/tests/blackbox/helper/mock_abci_sequencer.rs similarity index 63% rename from crates/astria-composer/tests/blackbox/helper/mock_sequencer.rs rename to crates/astria-composer/tests/blackbox/helper/mock_abci_sequencer.rs index da8dea87e0..28f5ca53a2 100644 --- a/crates/astria-composer/tests/blackbox/helper/mock_sequencer.rs +++ b/crates/astria-composer/tests/blackbox/helper/mock_abci_sequencer.rs @@ -1,6 +1,5 @@ use std::time::Duration; -use prost::Message; use serde_json::json; use tendermint::{ consensus::{ @@ -13,63 +12,17 @@ use tendermint::{ Genesis, Time, }; -use tendermint_rpc::{ - response, - Id, -}; use wiremock::{ - matchers::{ - body_partial_json, - body_string_contains, - }, + matchers::body_partial_json, Mock, - MockGuard, MockServer, ResponseTemplate, }; -pub async fn start() -> (MockServer, MockGuard) { - use astria_core::generated::protocol::accounts::v1alpha1::NonceResponse; +pub async fn start() -> MockServer { let server = MockServer::start().await; - let startup_guard = mount_abci_query_mock( - &server, - "accounts/nonce", - NonceResponse { - height: 0, - nonce: 0, - }, - ) - .await; mount_genesis(&server, "test-chain-1").await; - (server, startup_guard) -} - -pub async fn mount_abci_query_mock( - server: &MockServer, - query_path: &str, - response: impl Message, -) -> MockGuard { - let expected_body = json!({ - "method": "abci_query" - }); - let response = tendermint_rpc::endpoint::abci_query::Response { - response: tendermint_rpc::endpoint::abci_query::AbciQuery { - value: response.encode_to_vec(), - ..Default::default() - }, - }; - let wrapper = response::Wrapper::new_with_id(Id::Num(1), Some(response), None); - Mock::given(body_partial_json(&expected_body)) - .and(body_string_contains(query_path)) - .respond_with( - ResponseTemplate::new(200) - .set_body_json(&wrapper) - .append_header("Content-Type", "application/json"), - ) - .up_to_n_times(1) - .expect(1) - .mount_as_scoped(server) - .await + server } async fn mount_genesis(server: &MockServer, mock_sequencer_chain_id: &str) { diff --git a/crates/astria-composer/tests/blackbox/helper/mock_grpc_sequencer.rs b/crates/astria-composer/tests/blackbox/helper/mock_grpc_sequencer.rs new file mode 100644 index 0000000000..546e3b9ebc --- /dev/null +++ b/crates/astria-composer/tests/blackbox/helper/mock_grpc_sequencer.rs @@ -0,0 +1,122 @@ +use std::{ + net::SocketAddr, + sync::Arc, +}; + +use astria_core::{ + self, + generated::sequencerblock::v1alpha1::{ + sequencer_service_server::{ + SequencerService, + SequencerServiceServer, + }, + FilteredSequencerBlock as RawFilteredSequencerBlock, + GetFilteredSequencerBlockRequest, + GetPendingNonceRequest, + GetPendingNonceResponse, + GetSequencerBlockRequest, + SequencerBlock as RawSequencerBlock, + }, +}; +use astria_eyre::eyre::{ + self, + WrapErr as _, +}; +use astria_grpc_mock::{ + matcher::message_type, + response::constant_response, + Mock, + MockServer, +}; +use tokio::task::JoinHandle; +use tonic::{ + transport::Server, + Request, + Response, + Status, +}; + +// NOTE: the actual full path name is +// /astria.sequencerblock.v1alpha1.SequencerService/GetPendingNonce +const GET_PENDING_NONCE_GRPC_NAME: &str = "get_pending_nonce"; + +pub struct MockGrpcSequencer { + _server: JoinHandle>, + pub(crate) mock_server: MockServer, + pub(crate) local_addr: SocketAddr, +} + +impl MockGrpcSequencer { + pub(crate) async fn spawn() -> Self { + use tokio_stream::wrappers::TcpListenerStream; + + let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); + let local_addr = listener.local_addr().unwrap(); + + let mock_server = MockServer::new(); + + let server = { + let sequencer_service = SequencerServiceImpl(mock_server.clone()); + tokio::spawn(async move { + Server::builder() + .add_service(SequencerServiceServer::new(sequencer_service)) + .serve_with_incoming(TcpListenerStream::new(listener)) + .await + .wrap_err("gRPC sequencer server failed") + }) + }; + Self { + _server: server, + mock_server, + local_addr, + } + } + + pub(crate) async fn mount_pending_nonce_response( + &self, + nonce_to_mount: u32, + debug_name: impl Into, + ) { + let resp = GetPendingNonceResponse { + inner: nonce_to_mount, + }; + Mock::for_rpc_given( + GET_PENDING_NONCE_GRPC_NAME, + message_type::(), + ) + .respond_with(constant_response(resp)) + .up_to_n_times(1) + .expect(1) + .with_name(debug_name) + .mount(&self.mock_server) + .await; + } +} + +struct SequencerServiceImpl(MockServer); + +#[tonic::async_trait] +impl SequencerService for SequencerServiceImpl { + async fn get_sequencer_block( + self: Arc, + _request: Request, + ) -> Result, Status> { + unimplemented!() + } + + async fn get_filtered_sequencer_block( + self: Arc, + _request: Request, + ) -> Result, Status> { + unimplemented!() + } + + async fn get_pending_nonce( + self: Arc, + request: Request, + ) -> Result, Status> { + self.0 + .handle_request(GET_PENDING_NONCE_GRPC_NAME, request) + .await + } +} diff --git a/crates/astria-composer/tests/blackbox/helper/mod.rs b/crates/astria-composer/tests/blackbox/helper/mod.rs index ecd5090441..d9405fdf93 100644 --- a/crates/astria-composer/tests/blackbox/helper/mod.rs +++ b/crates/astria-composer/tests/blackbox/helper/mod.rs @@ -29,6 +29,7 @@ use astria_core::{ }; use astria_eyre::eyre; use ethers::prelude::Transaction; +use mock_grpc_sequencer::MockGrpcSequencer; use telemetry::metrics; use tempfile::NamedTempFile; use tendermint_rpc::{ @@ -48,7 +49,8 @@ use wiremock::{ ResponseTemplate, }; -pub mod mock_sequencer; +pub mod mock_abci_sequencer; +pub mod mock_grpc_sequencer; static TELEMETRY: LazyLock<()> = LazyLock::new(|| { // This config can be meaningless - it's only used inside `try_init` to init the metrics, but we @@ -56,7 +58,8 @@ static TELEMETRY: LazyLock<()> = LazyLock::new(|| { let config = Config { log: String::new(), api_listen_addr: SocketAddr::new(IpAddr::from([0, 0, 0, 0]), 0), - sequencer_url: String::new(), + sequencer_abci_endpoint: String::new(), + sequencer_grpc_endpoint: String::new(), sequencer_chain_id: String::new(), rollups: String::new(), private_key_file: String::new(), @@ -96,7 +99,7 @@ pub struct TestComposer { pub composer: JoinHandle>, pub rollup_nodes: HashMap, pub sequencer: wiremock::MockServer, - pub setup_guard: MockGuard, + pub sequencer_mock: MockGrpcSequencer, pub grpc_collector_addr: SocketAddr, pub metrics_handle: metrics::Handle, } @@ -117,7 +120,8 @@ pub async fn spawn_composer(rollup_ids: &[&str]) -> TestComposer { rollup_nodes.insert((*id).to_string(), geth); rollups.push_str(&format!("{id}::{execution_url},")); } - let (sequencer, sequencer_setup_guard) = mock_sequencer::start().await; + let sequencer = mock_abci_sequencer::start().await; + let grpc_server = MockGrpcSequencer::spawn().await; let sequencer_url = sequencer.uri(); let keyfile = NamedTempFile::new().unwrap(); (&keyfile) @@ -128,7 +132,8 @@ pub async fn spawn_composer(rollup_ids: &[&str]) -> TestComposer { api_listen_addr: "127.0.0.1:0".parse().unwrap(), sequencer_chain_id: "test-chain-1".to_string(), rollups, - sequencer_url, + sequencer_abci_endpoint: sequencer_url.to_string(), + sequencer_grpc_endpoint: format!("http://{}", grpc_server.local_addr), private_key_file: keyfile.path().to_string_lossy().to_string(), sequencer_address_prefix: "astria".into(), block_time_ms: 2000, @@ -149,6 +154,11 @@ pub async fn spawn_composer(rollup_ids: &[&str]) -> TestComposer { .unwrap(); let metrics = Box::leak(Box::new(metrics)); + // prepare get nonce response + grpc_server + .mount_pending_nonce_response(0, "startup::wait_for_mempool()") + .await; + let (composer_addr, grpc_collector_addr, composer_handle) = { let composer = Composer::from_config(&config, metrics).await.unwrap(); let composer_addr = composer.local_addr(); @@ -163,7 +173,7 @@ pub async fn spawn_composer(rollup_ids: &[&str]) -> TestComposer { composer: composer_handle, rollup_nodes, sequencer, - setup_guard: sequencer_setup_guard, + sequencer_mock: grpc_server, grpc_collector_addr, metrics_handle, } @@ -332,6 +342,35 @@ pub async fn mount_broadcast_tx_sync_invalid_nonce_mock( .await } +/// Deserializes the bytes contained in a `tx_sync::Request` to a signed sequencer transaction and +/// verifies that the contained sequence action is for the given `expected_rollup_id`. It then +/// rejects the transaction for a taken nonce. +pub async fn mount_broadcast_tx_sync_nonce_taken_mock( + server: &MockServer, + expected_rollup_id: RollupId, +) -> MockGuard { + let matcher = move |request: &Request| { + let (rollup_id, _) = rollup_id_nonce_from_request(request); + rollup_id == expected_rollup_id + }; + let jsonrpc_rsp = response::Wrapper::new_with_id( + Id::Num(1), + Some(tx_sync::Response { + code: tendermint::abci::Code::Err(AbciErrorCode::NONCE_TAKEN.value()), + data: vec![].into(), + log: String::new(), + hash: tendermint::Hash::Sha256([0; 32]), + }), + None, + ); + Mock::given(matcher) + .respond_with(ResponseTemplate::new(200).set_body_json(&jsonrpc_rsp)) + .up_to_n_times(1) + .expect(1) + .mount_as_scoped(server) + .await +} + // A Uniswap V2 DAI-ETH swap transaction from mainnet // Etherscan link: https://etherscan.io/tx/0x99850dd1cf325c8ede9ba62b9d8a11aa199794450b581ce3a7bb8c1e5bb7562f pub const TEST_ETH_TX_JSON: &str = r#"{"blockHash":"0xe365f2163edb844b617ebe3d2af183b31d6c7ffa794f21d0b2d111d63e979a02","blockNumber":"0x1157959","from":"0xdc975a9bb00f4c030e4eb3268f68e4b8d0fa0362","gas":"0xcdf49","gasPrice":"0x374128344","maxFeePerGas":"0x374128344","maxPriorityFeePerGas":"0x0","hash":"0x99850dd1cf325c8ede9ba62b9d8a11aa199794450b581ce3a7bb8c1e5bb7562f","input":"0x022c0d9f0000000000000000000000000000000000000000000000c88a1ad5e15105525500000000000000000000000000000000000000000000000000000000000000000000000000000000000000001a2d11cb90d1de13bb81ee7b772a08ac234a8058000000000000000000000000000000000000000000000000000000000000008000000000000000000000000000000000000000000000000000000000000001208974000000000000000000000000000000000000000000000000000000004de4000000000000000000000000000000000000000000000000017038152c223cb100000000000000000000000000000000000000000000000000000000000000600000000000000000000000000000000000000000000000000000000000000005200000000000000000000000000000000000000000000000000000000000000000000000000000000000000087870bca3f3fd6335c3f4ce8392d69350b4fa4e2000000000000000000000000ab12275f2d91f87b301a4f01c9af4e83b3f45baa0000000000000000000000006b175474e89094c44da98b954eedeac495271d0f000000000000000000000000c02aaa39b223fe8d0a0e5c4f27ead9083c756cc2","nonce":"0x28","to":"0xa478c2975ab1ea89e8196811f51a7b7ade33eb11","transactionIndex":"0x2","value":"0x0","type":"0x2","accessList":[{"address":"0x5f4ec3df9cbd43714fe2740f5e3616155c5b8419","storageKeys":["0x0000000000000000000000000000000000000000000000000000000000000005","0x0000000000000000000000000000000000000000000000000000000000000002"]},{"address":"0x7effd7b47bfd17e52fb7559d3f924201b9dbff3d","storageKeys":[]},{"address":"0x018008bfb33d285247a21d44e50697654f754e63","storageKeys":["0x360894a13ba1a3210667c828492db98dca3e2076cc3735a920a3ca505d382bbc"]},{"address":"0x1a2d11cb90d1de13bb81ee7b772a08ac234a8058","storageKeys":[]},{"address":"0xe62b71cf983019bff55bc83b48601ce8419650cc","storageKeys":["0x9a09f352b299559621084d9b8d2625e8d5a97f382735872dd3bb1bdbdccc3fee","0x000000000000000000000000000000000000000000000000000000000000002b","0xfee3a99380070b792e111dd9a6a15e929983e2d0b7e170a5520e51b99be0c359"]},{"address":"0x87870bca3f3fd6335c3f4ce8392d69350b4fa4e2","storageKeys":["0x360894a13ba1a3210667c828492db98dca3e2076cc3735a920a3ca505d382bbc","0x070a95ec3546cae47592e0bcea195bf8f96287077fbb7a23785cc2887152941c","0x070a95ec3546cae47592e0bcea195bf8f96287077fbb7a23785cc28871529420","0xf81d8d79f42adb4c73cc3aa0c78e25d3343882d0313c0b80ece3d3a103ef1ec6","0x5e14560e314427eb9d0c466a6058089f672317c8e26719a770a709c3f2481e4b","0xf81d8d79f42adb4c73cc3aa0c78e25d3343882d0313c0b80ece3d3a103ef1ebf","0xf81d8d79f42adb4c73cc3aa0c78e25d3343882d0313c0b80ece3d3a103ef1ec0","0x4c0bd942d17410ca1f6d3278a62feef7078602605466e37de958808f1454efbd","0x5e14560e314427eb9d0c466a6058089f672317c8e26719a770a709c3f2481e48","0xf81d8d79f42adb4c73cc3aa0c78e25d3343882d0313c0b80ece3d3a103ef1ec3","0x5e14560e314427eb9d0c466a6058089f672317c8e26719a770a709c3f2481e4f","0x5e14560e314427eb9d0c466a6058089f672317c8e26719a770a709c3f2481e4a","0x5e14560e314427eb9d0c466a6058089f672317c8e26719a770a709c3f2481e50","0x5e14560e314427eb9d0c466a6058089f672317c8e26719a770a709c3f2481e4d","0x4cb2b152c1b54ce671907a93c300fd5aa72383a9d4ec19a81e3333632ae92e00","0xf81d8d79f42adb4c73cc3aa0c78e25d3343882d0313c0b80ece3d3a103ef1ec4","0xf81d8d79f42adb4c73cc3aa0c78e25d3343882d0313c0b80ece3d3a103ef1ec7","0x4bea7244bd9088ac961c659a818b4f060de9712d20dc006c24f0985f19cf62d1","0x5e14560e314427eb9d0c466a6058089f672317c8e26719a770a709c3f2481e49","0xf81d8d79f42adb4c73cc3aa0c78e25d3343882d0313c0b80ece3d3a103ef1ec2","0x070a95ec3546cae47592e0bcea195bf8f96287077fbb7a23785cc2887152941d","0x5e14560e314427eb9d0c466a6058089f672317c8e26719a770a709c3f2481e4c","0x5e14560e314427eb9d0c466a6058089f672317c8e26719a770a709c3f2481e4e","0x4480713a5820391a4815a640728dab70c3847e45854ef9e8117382da26ce9105","0x070a95ec3546cae47592e0bcea195bf8f96287077fbb7a23785cc2887152941f","0x000000000000000000000000000000000000000000000000000000000000003b","0x108718ddd11d4cf696a068770009c44aef387eb858097a37824291f99278d5e3","0xf81d8d79f42adb4c73cc3aa0c78e25d3343882d0313c0b80ece3d3a103ef1ec1","0xf81d8d79f42adb4c73cc3aa0c78e25d3343882d0313c0b80ece3d3a103ef1ec5"]},{"address":"0x2f39d218133afab8f2b819b1066c7e434ad94e9e","storageKeys":["0x740f710666bd7a12af42df98311e541e47f7fd33d382d11602457a6d540cbd63","0x0d2c1bcee56447b4f46248272f34207a580a5c40f666a31f4e2fbb470ea53ab8"]},{"address":"0xe7b67f44ea304dd7f6d215b13686637ff64cd2b2","storageKeys":[]},{"address":"0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2","storageKeys":["0x7f6377583d24615ddfe989626525aeed0d158f924ee8c91664ab0dffd7863d00","0x3afb575d989d656a39ee0690da12b019915f3bd8709cc522e681b8dd04237970","0xa535fbd0ab3e0ad4ee444570368f3d474545b71fcc49228fe96a6406676fc126","0xb064600732a82908427d092d333e607598a6238a59aeb45e1288cb0bac7161cf"]},{"address":"0x4d5f47fa6a74757f35c14fd3a6ef8e3c9bc514e8","storageKeys":["0x000000000000000000000000000000000000000000000000000000000000003c","0x14a553e31736f19e3e380cf55bfb2f82dfd6d880cd07235affb68d8d3e0cac4d","0x360894a13ba1a3210667c828492db98dca3e2076cc3735a920a3ca505d382bbc","0x5e8cc6ee686108b7fd15638e2dbb32555b30d0bd1a191628bb70b5459b86cedc","0x000000000000000000000000000000000000000000000000000000000000003d","0x0000000000000000000000000000000000000000000000000000000000000036","0x0000000000000000000000000000000000000000000000000000000000000039"]},{"address":"0x6b175474e89094c44da98b954eedeac495271d0f","storageKeys":["0xd86cc1e239204d48eb0055f151744c4bb3d2337612287be803ae8247e95a67d2","0xe7ab5c3b3c86286a122f1937d4c70a3170dba7ef4f7603d830e8bcf7c9af583b","0x87c358b8e65d7446f52ffce25e44c9673d2bf461b3d3e4748afcf1238e9224a3","0xad740bfd58072c0bd719418966c52da18e837afec1b47e07bba370568cc87fbb"]},{"address":"0xe175de51f29d822b86e46a9a61246ec90631210d","storageKeys":[]},{"address":"0xcf8d0c70c850859266f5c338b38f9d663181c314","storageKeys":["0x0000000000000000000000000000000000000000000000000000000000000037","0x000000000000000000000000000000000000000000000000000000000000003d","0x360894a13ba1a3210667c828492db98dca3e2076cc3735a920a3ca505d382bbc","0x000000000000000000000000000000000000000000000000000000000000003a","0x4bea7244bd9088ac961c659a818b4f060de9712d20dc006c24f0985f19cf62d1"]},{"address":"0x413adac9e2ef8683adf5ddaece8f19613d60d1bb","storageKeys":["0x360894a13ba1a3210667c828492db98dca3e2076cc3735a920a3ca505d382bbc","0x000000000000000000000000000000000000000000000000000000000000003f","0x000000000000000000000000000000000000000000000000000000000000003a","0x4bea7244bd9088ac961c659a818b4f060de9712d20dc006c24f0985f19cf62d1"]},{"address":"0xaed0c38402a5d19df6e4c03f4e2dced6e29c1ee9","storageKeys":["0x0000000000000000000000000000000000000000000000000000000000000005","0x0000000000000000000000000000000000000000000000000000000000000002"]},{"address":"0xea51d7853eefb32b6ee06b1c12e6dcca88be0ffe","storageKeys":["0x360894a13ba1a3210667c828492db98dca3e2076cc3735a920a3ca505d382bbc","0x000000000000000000000000000000000000000000000000000000000000003a"]},{"address":"0x54586be62e3c3580375ae3723c145253060ca0c2","storageKeys":["0x7145bb02480b505fc02ccfdba07d3ba3a9d821606f0688263abedd0ac6e5bec5","0x2a11cb67ca5c7e99dba99b50e02c11472d0f19c22ed5af42a1599a7f57e1c7a4","0x5306b8fbe80b30a74098357ee8e26fad8dc069da9011cca5f0870a0a5982e541"]},{"address":"0x478238a1c8b862498c74d0647329aef9ea6819ed","storageKeys":["0x9ef04667c5a1bd8192837ceac2ad5f2c41549d4db3406185e8c6aa95ea557bc5","0x000000000000000000000000000000000000000000000000000000000000002b","0x0020b304a2489d03d215fadd3bb6d3de2dda5a6a1235e76d693c30263e3cd054"]},{"address":"0xa700b4eb416be35b2911fd5dee80678ff64ff6c9","storageKeys":["0x360894a13ba1a3210667c828492db98dca3e2076cc3735a920a3ca505d382bbc","0x5e8cc6ee686108b7fd15638e2dbb32555b30d0bd1a191628bb70b5459b86cedc"]},{"address":"0x8164cc65827dcfe994ab23944cbc90e0aa80bfcb","storageKeys":["0x76f8b43dabb591eb6681562420f7f6aa393e6903d4e02e6f59e2957d94ceab20","0x360894a13ba1a3210667c828492db98dca3e2076cc3735a920a3ca505d382bbc","0x176062dac4e737f036c34baf4b07185f9c9fd3c1337ca36eb7c1f7a74aedb8ea"]},{"address":"0x9a158802cd924747ef336ca3f9de3bdb60cf43d3","storageKeys":[]},{"address":"0xac725cb59d16c81061bdea61041a8a5e73da9ec6","storageKeys":[]},{"address":"0x15c5620dffac7c7366eed66c20ad222ddbb1ed57","storageKeys":[]},{"address":"0x547a514d5e3769680ce22b2361c10ea13619e8a9","storageKeys":["0x0000000000000000000000000000000000000000000000000000000000000005","0x0000000000000000000000000000000000000000000000000000000000000002"]},{"address":"0x8116b273cd75d79c382afacc706659ded5e0a59d","storageKeys":["0x0fb35ae12d348b84dc0910bcce7d3b0a3f6d23a3e1d0b53bbe5f135078b97b13","0x000000000000000000000000000000000000000000000000000000000000002b","0x1d90d8e683e6736ac0564a19732a642e4be100e7ee8c225feba909bbdaf1522b"]},{"address":"0x9f8ccdafcc39f3c7d6ebf637c9151673cbc36b88","storageKeys":[]},{"address":"0xa478c2975ab1ea89e8196811f51a7b7ade33eb11","storageKeys":["0x0000000000000000000000000000000000000000000000000000000000000007","0x0000000000000000000000000000000000000000000000000000000000000009","0x000000000000000000000000000000000000000000000000000000000000000a","0x000000000000000000000000000000000000000000000000000000000000000c","0x0000000000000000000000000000000000000000000000000000000000000008","0x0000000000000000000000000000000000000000000000000000000000000006"]},{"address":"0xf1cd4193bbc1ad4a23e833170f49d60f3d35a621","storageKeys":[]},{"address":"0x102633152313c81cd80419b6ecf66d14ad68949a","storageKeys":["0x360894a13ba1a3210667c828492db98dca3e2076cc3735a920a3ca505d382bbc","0x000000000000000000000000000000000000000000000000000000000000003f","0x000000000000000000000000000000000000000000000000000000000000003a"]},{"address":"0xb02381b1d27aa9845e5012083ca288c1818884f0","storageKeys":[]}],"chainId":"0x1","v":"0x0","r":"0xcb4eccf09e298388220c5560a6539322bde17581cee6908d56a92a19575e28e2","s":"0x2b4e34adad48aee14b6600c6366ad683c00c63c9da88fc2a232308421cf69a21"}"#;