diff --git a/Cargo.lock b/Cargo.lock index d116d2712a..4843a90f43 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -593,6 +593,7 @@ dependencies = [ "astria-config", "astria-core", "astria-eyre", + "astria-grpc-mock", "astria-sequencer-client", "astria-telemetry", "astria-test-utils", @@ -601,6 +602,7 @@ dependencies = [ "ethers", "futures", "hex", + "http 0.2.12", "humantime", "hyper 0.14.30", "insta", diff --git a/charts/composer/Chart.yaml b/charts/composer/Chart.yaml index a5e2ac72fe..26b0666774 100644 --- a/charts/composer/Chart.yaml +++ b/charts/composer/Chart.yaml @@ -15,7 +15,7 @@ type: application # This is the chart version. This version number should be incremented each time you make changes # to the chart and its templates, including the app version. # Versions are expected to follow Semantic Versioning (https://semver.org/) -version: 0.1.4 +version: 0.1.5 # This is the version number of the application being deployed. This version number should be # incremented each time you make changes to the application. Versions are not expected to diff --git a/charts/composer/templates/configmap.yaml b/charts/composer/templates/configmap.yaml index 3d9b13a739..92e0d8ae5f 100644 --- a/charts/composer/templates/configmap.yaml +++ b/charts/composer/templates/configmap.yaml @@ -8,7 +8,8 @@ data: ASTRIA_COMPOSER_API_LISTEN_ADDR: "0.0.0.0:{{ .Values.ports.healthApi }}" ASTRIA_COMPOSER_GRPC_ADDR: "0.0.0.0:{{ .Values.ports.grpc }}" ASTRIA_COMPOSER_SEQUENCER_CHAIN_ID: "{{ tpl .Values.config.sequencerChainId . }}" - ASTRIA_COMPOSER_SEQUENCER_URL: "{{ tpl .Values.config.sequencerRpc . }}" + ASTRIA_COMPOSER_SEQUENCER_HTTP_URL: "{{ tpl .Values.config.sequencerRpc . }}" + ASTRIA_COMPOSER_SEQUENCER_GRPC_URL: "{{ tpl .Values.config.sequencerGrpc . }}" ASTRIA_COMPOSER_ROLLUPS: "{{ include "composer.rollups" . }}" ASTRIA_COMPOSER_PRIVATE_KEY_FILE: "/var/secrets/{{ .Values.config.privateKey.secret.filename }}" ASTRIA_COMPOSER_MAX_BYTES_PER_BUNDLE: "{{ .Values.config.maxBytesPerBundle }}" diff --git a/charts/composer/values.yaml b/charts/composer/values.yaml index 1aec56c3e1..68745a2c43 100644 --- a/charts/composer/values.yaml +++ b/charts/composer/values.yaml @@ -7,10 +7,10 @@ global: images: composer: - repo: ghcr.io/astriaorg/composer + repo: ghcr.io/astriaorg/astria-composer pullPolicy: IfNotPresent - tag: "0.8.3" - devTag: latest + tag: "0.8.4" + devTag: local config: logLevel: "debug" @@ -20,6 +20,7 @@ config: sequencerAddressPrefix: astria sequencerNativeAssetBaseDenomination: "nria" sequencerRpc: "" + sequencerGrpc: "" sequencerChainId: "" privateKey: devContent: "" diff --git a/charts/evm-stack/Chart.lock b/charts/evm-stack/Chart.lock index 7b103a22aa..d9fbfd629d 100644 --- a/charts/evm-stack/Chart.lock +++ b/charts/evm-stack/Chart.lock @@ -7,7 +7,7 @@ dependencies: version: 0.27.5 - name: composer repository: file://../composer - version: 0.1.4 + version: 0.1.5 - name: evm-faucet repository: file://../evm-faucet version: 0.1.2 @@ -20,5 +20,5 @@ dependencies: - name: blockscout-stack repository: https://blockscout.github.io/helm-charts version: 1.6.2 -digest: sha256:2d7e2f23cd9bbdb43b7cf42112db9ede0a7d5eee9e426b0b2344e43fcf52e1b1 -generated: "2024-10-02T09:43:51.238571-04:00" +digest: sha256:84cd9e57ee284191d1637ce8fac850a3403932521c0d23a18f0beb0eb27dd398 +generated: "2024-10-08T18:13:14.539588+02:00" diff --git a/charts/evm-stack/Chart.yaml b/charts/evm-stack/Chart.yaml index e2cf556597..db374800b6 100644 --- a/charts/evm-stack/Chart.yaml +++ b/charts/evm-stack/Chart.yaml @@ -15,7 +15,7 @@ type: application # This is the chart version. This version number should be incremented each time you make changes # to the chart and its templates, including the app version. # Versions are expected to follow Semantic Versioning (https://semver.org/) -version: 0.6.2 +version: 0.6.3 dependencies: - name: celestia-node @@ -26,7 +26,7 @@ dependencies: version: 0.27.5 repository: "file://../evm-rollup" - name: composer - version: 0.1.4 + version: 0.1.5 repository: "file://../composer" condition: composer.enabled - name: evm-faucet diff --git a/crates/astria-composer/Cargo.toml b/crates/astria-composer/Cargo.toml index c17567aaac..1cc5b91a26 100644 --- a/crates/astria-composer/Cargo.toml +++ b/crates/astria-composer/Cargo.toml @@ -12,8 +12,14 @@ homepage = "https://astria.org" name = "astria-composer" [dependencies] +http = "0.2.9" + astria-build-info = { path = "../astria-build-info", features = ["runtime"] } -astria-core = { path = "../astria-core", features = ["serde", "server"] } +astria-core = { path = "../astria-core", features = [ + "client", + "serde", + "server", +] } astria-eyre = { path = "../astria-eyre" } config = { package = "astria-config", path = "../astria-config" } telemetry = { package = "astria-telemetry", path = "../astria-telemetry", features = [ @@ -59,6 +65,7 @@ path = "../astria-sequencer-client" features = ["http"] [dev-dependencies] +astria-grpc-mock = { path = "../astria-grpc-mock" } config = { package = "astria-config", path = "../astria-config", features = [ "tests", ] } diff --git a/crates/astria-composer/local.env.example b/crates/astria-composer/local.env.example index 8936089de1..768b97962f 100644 --- a/crates/astria-composer/local.env.example +++ b/crates/astria-composer/local.env.example @@ -25,7 +25,10 @@ NO_COLOR= ASTRIA_COMPOSER_API_LISTEN_ADDR="0.0.0.0:0" # Address of the RPC server for the sequencer chain -ASTRIA_COMPOSER_SEQUENCER_URL="http://127.0.0.1:26657" +ASTRIA_COMPOSER_SEQUENCER_HTTP_URL="http://127.0.0.1:26657" + +# Address of the RPC server for the sequencer chain +ASTRIA_COMPOSER_SEQUENCER_GRPC_URL="http://127.0.0.1:8080" # Chain ID of the sequencer chain which transactions are submitted to. ASTRIA_COMPOSER_SEQUENCER_CHAIN_ID="astria-dev-1" diff --git a/crates/astria-composer/src/composer.rs b/crates/astria-composer/src/composer.rs index dc9ab4ce17..0f08932d16 100644 --- a/crates/astria-composer/src/composer.rs +++ b/crates/astria-composer/src/composer.rs @@ -124,7 +124,8 @@ impl Composer { let shutdown_token = CancellationToken::new(); let (executor, executor_handle) = executor::Builder { - sequencer_url: cfg.sequencer_url.clone(), + sequencer_http_url: cfg.sequencer_http_url.clone(), + sequencer_grpc_url: cfg.sequencer_grpc_url.clone(), sequencer_chain_id: cfg.sequencer_chain_id.clone(), private_key_file: cfg.private_key_file.clone(), sequencer_address_prefix: cfg.sequencer_address_prefix.clone(), diff --git a/crates/astria-composer/src/config.rs b/crates/astria-composer/src/config.rs index b3f127d9ab..1198b66c5b 100644 --- a/crates/astria-composer/src/config.rs +++ b/crates/astria-composer/src/config.rs @@ -27,7 +27,10 @@ pub struct Config { pub api_listen_addr: SocketAddr, /// Address of the RPC server for the sequencer chain - pub sequencer_url: String, + pub sequencer_http_url: String, + + /// Address of the GRPC server for the sequencer chain + pub sequencer_grpc_url: String, /// The chain ID of the sequencer chain pub sequencer_chain_id: String, diff --git a/crates/astria-composer/src/executor/builder.rs b/crates/astria-composer/src/executor/builder.rs index 7234600f05..0442f6f66c 100644 --- a/crates/astria-composer/src/executor/builder.rs +++ b/crates/astria-composer/src/executor/builder.rs @@ -6,6 +6,7 @@ use std::{ use astria_core::{ crypto::SigningKey, + generated::sequencerblock::v1alpha1::sequencer_service_client::SequencerServiceClient, primitive::v1::Address, protocol::transaction::v1alpha1::action::SequenceAction, }; @@ -14,6 +15,7 @@ use astria_eyre::eyre::{ eyre, WrapErr as _, }; +use http::Uri; use tokio::sync::watch; use tokio_util::sync::CancellationToken; @@ -24,7 +26,8 @@ use crate::{ }; pub(crate) struct Builder { - pub(crate) sequencer_url: String, + pub(crate) sequencer_http_url: String, + pub(crate) sequencer_grpc_url: String, pub(crate) sequencer_chain_id: String, pub(crate) private_key_file: String, pub(crate) sequencer_address_prefix: String, @@ -38,7 +41,8 @@ pub(crate) struct Builder { impl Builder { pub(crate) fn build(self) -> eyre::Result<(super::Executor, executor::Handle)> { let Self { - sequencer_url, + sequencer_http_url, + sequencer_grpc_url, sequencer_chain_id, private_key_file, sequencer_address_prefix, @@ -48,8 +52,14 @@ impl Builder { shutdown_token, metrics, } = self; - let sequencer_client = sequencer_client::HttpClient::new(sequencer_url.as_str()) - .wrap_err("failed constructing sequencer client")?; + let sequencer_http_client = sequencer_client::HttpClient::new(sequencer_http_url.as_str()) + .wrap_err("failed constructing sequencer http client")?; + + let sequencer_grpc_client = connect_sequencer_grpc(sequencer_grpc_url.as_str()) + .wrap_err_with(|| { + format!("failed to connect to sequencer over gRPC at `{sequencer_grpc_url}`") + })?; + let (status, _) = watch::channel(Status::new()); let sequencer_key = read_signing_key_from_file(&private_key_file).wrap_err_with(|| { @@ -69,7 +79,8 @@ impl Builder { super::Executor { status, serialized_rollup_transactions: serialized_rollup_transaction_rx, - sequencer_client, + sequencer_http_client, + sequencer_grpc_client, sequencer_chain_id, sequencer_key, address: sequencer_address, @@ -91,3 +102,14 @@ fn read_signing_key_from_file>(path: P) -> eyre::Result eyre::Result> { + let uri: Uri = sequencer_grpc_endpoint + .parse() + .wrap_err("failed to parse endpoint as URI")?; + Ok(SequencerServiceClient::new( + tonic::transport::Endpoint::from(uri).connect_lazy(), + )) +} diff --git a/crates/astria-composer/src/executor/mod.rs b/crates/astria-composer/src/executor/mod.rs index 898f3bde4e..f417258901 100644 --- a/crates/astria-composer/src/executor/mod.rs +++ b/crates/astria-composer/src/executor/mod.rs @@ -12,6 +12,13 @@ use std::{ use astria_core::{ crypto::SigningKey, + generated::sequencerblock::v1alpha1::{ + sequencer_service_client::{ + self, + SequencerServiceClient, + }, + GetPendingNonceRequest, + }, protocol::{ abci::AbciErrorCode, transaction::v1alpha1::{ @@ -64,6 +71,7 @@ use tokio::{ }, }; use tokio_util::sync::CancellationToken; +use tonic::transport::Channel; use tracing::{ debug, error, @@ -122,8 +130,10 @@ pub(super) struct Executor { // Channel for receiving `SequenceAction`s to be bundled. serialized_rollup_transactions: mpsc::Receiver, // The client for submitting wrapped and signed pending eth transactions to the astria - // sequencer. - sequencer_client: sequencer_client::HttpClient, + // sequencer via the cometbft client. + sequencer_http_client: sequencer_client::HttpClient, + // The grpc client for grabbing the latest nonce from. + sequencer_grpc_client: sequencer_service_client::SequencerServiceClient, // The chain id used for submission of transactions to the sequencer. sequencer_chain_id: String, // Private key used to sign sequencer transactions @@ -197,7 +207,8 @@ impl Executor { metrics: &'static Metrics, ) -> Fuse> { SubmitFut { - client: self.sequencer_client.clone(), + sequencer_http_client: self.sequencer_http_client.clone(), + sequencer_grpc_client: self.sequencer_grpc_client.clone(), address: self.address, nonce, chain_id: self.sequencer_chain_id.clone(), @@ -332,9 +343,13 @@ impl Executor { self.ensure_chain_id_is_correct() .await .wrap_err("failed to validate chain id")?; - let nonce = get_latest_nonce(self.sequencer_client.clone(), self.address, self.metrics) - .await - .wrap_err("failed getting initial nonce from sequencer")?; + let nonce = get_pending_nonce( + self.sequencer_grpc_client.clone(), + self.address, + self.metrics, + ) + .await + .wrap_err("failed getting initial nonce from sequencer")?; Ok(nonce) } @@ -379,7 +394,7 @@ impl Executor { }, ); let client_genesis: tendermint::Genesis = - tryhard::retry_fn(|| self.sequencer_client.genesis()) + tryhard::retry_fn(|| self.sequencer_http_client.genesis()) .with_config(retry_config) .await?; Ok(client_genesis.chain_id) @@ -460,23 +475,21 @@ impl Executor { } } -/// Queries the sequencer for the latest nonce with an exponential backoff -#[instrument(name = "get latest nonce", skip_all, fields(%address), err)] -async fn get_latest_nonce( - client: sequencer_client::HttpClient, +/// Queries the sequencer for the latest pending nonce with an exponential backoff +#[instrument(name = "get pending nonce", skip_all, fields(%address), err)] +async fn get_pending_nonce( + client: sequencer_service_client::SequencerServiceClient, address: Address, metrics: &Metrics, ) -> eyre::Result { - debug!("fetching latest nonce from sequencer"); + debug!("fetching pending nonce from sequencing"); let span = Span::current(); let start = Instant::now(); let retry_config = tryhard::RetryFutureConfig::new(1024) .exponential_backoff(Duration::from_millis(200)) .max_delay(Duration::from_secs(60)) .on_retry( - |attempt, - next_delay: Option, - err: &sequencer_client::extension_trait::Error| { + |attempt, next_delay: Option, err: &tonic::Status| { metrics.increment_nonce_fetch_failure_count(); let wait_duration = next_delay @@ -493,14 +506,22 @@ async fn get_latest_nonce( }, ); let res = tryhard::retry_fn(|| { - let client = client.clone(); - let span = info_span!(parent: span.clone(), "attempt get nonce"); + let mut client = client.clone(); + let span = info_span!(parent: span.clone(), "attempt get pending nonce"); metrics.increment_nonce_fetch_count(); - async move { client.get_latest_nonce(address).await.map(|rsp| rsp.nonce) }.instrument(span) + async move { + client + .get_pending_nonce(GetPendingNonceRequest { + address: Some(address.into_raw()), + }) + .await + .map(|rsp| rsp.into_inner().inner) + } + .instrument(span) }) .with_config(retry_config) .await - .wrap_err("failed getting latest nonce from sequencer after 1024 attempts"); + .wrap_err("failed getting pending nonce from sequencing after 1024 attempts"); metrics.record_nonce_fetch_latency(start.elapsed()); @@ -517,7 +538,7 @@ async fn get_latest_nonce( err, )] async fn submit_tx( - client: sequencer_client::HttpClient, + sequencer_http_client: sequencer_client::HttpClient, tx: SignedTransaction, metrics: &Metrics, ) -> eyre::Result { @@ -552,7 +573,7 @@ async fn submit_tx( }, ); let res = tryhard::retry_fn(|| { - let client = client.clone(); + let client = sequencer_http_client.clone(); let tx = tx.clone(); let span = info_span!(parent: span.clone(), "attempt send"); async move { client.submit_transaction_sync(tx).await }.instrument(span) @@ -637,7 +658,8 @@ pin_project! { /// If the sequencer returned a non-zero abci code (albeit not `INVALID_NONCE`), this future will return with /// that nonce it used to submit the non-zero abci code request. struct SubmitFut { - client: sequencer_client::HttpClient, + sequencer_http_client: sequencer_client::HttpClient, + sequencer_grpc_client: SequencerServiceClient, address: Address, chain_id: String, nonce: u32, @@ -688,7 +710,8 @@ impl Future for SubmitFut { "submitting transaction to sequencer", ); SubmitState::WaitingForSend { - fut: submit_tx(this.client.clone(), tx, self.metrics).boxed(), + fut: submit_tx(this.sequencer_http_client.clone(), tx, self.metrics) + .boxed(), } } @@ -716,8 +739,8 @@ impl Future for SubmitFut { new nonce" ); SubmitState::WaitingForNonce { - fut: get_latest_nonce( - this.client.clone(), + fut: get_pending_nonce( + this.sequencer_grpc_client.clone(), *this.address, self.metrics, ) @@ -761,7 +784,8 @@ impl Future for SubmitFut { "resubmitting transaction to sequencer with new nonce", ); SubmitState::WaitingForSend { - fut: submit_tx(this.client.clone(), tx, self.metrics).boxed(), + fut: submit_tx(this.sequencer_http_client.clone(), tx, self.metrics) + .boxed(), } } Err(error) => { diff --git a/crates/astria-composer/src/executor/tests.rs b/crates/astria-composer/src/executor/tests.rs index bf0f8d9ee9..445e14a5e3 100644 --- a/crates/astria-composer/src/executor/tests.rs +++ b/crates/astria-composer/src/executor/tests.rs @@ -78,7 +78,8 @@ static TELEMETRY: LazyLock<()> = LazyLock::new(|| { let config = Config { log: String::new(), api_listen_addr: SocketAddr::new(IpAddr::from([0, 0, 0, 0]), 0), - sequencer_url: String::new(), + sequencer_http_url: String::new(), + sequencer_grpc_url: String::new(), sequencer_chain_id: String::new(), rollups: String::new(), private_key_file: String::new(), @@ -133,7 +134,8 @@ async fn setup() -> (MockServer, Config, NamedTempFile) { log: String::new(), api_listen_addr: "127.0.0.1:0".parse().unwrap(), rollups: String::new(), - sequencer_url: server.uri(), + sequencer_http_url: server.uri(), + sequencer_grpc_url: server.uri(), sequencer_chain_id: "test-chain-1".to_string(), private_key_file: keyfile.path().to_string_lossy().to_string(), sequencer_address_prefix: "astria".into(), @@ -327,7 +329,8 @@ async fn full_bundle() { let metrics = Box::leak(Box::new(Metrics::noop_metrics(&cfg).unwrap())); mount_genesis(&sequencer, &cfg.sequencer_chain_id).await; let (executor, executor_handle) = executor::Builder { - sequencer_url: cfg.sequencer_url.clone(), + sequencer_http_url: cfg.sequencer_http_url.clone(), + sequencer_grpc_url: cfg.sequencer_grpc_url.clone(), sequencer_chain_id: cfg.sequencer_chain_id.clone(), private_key_file: cfg.private_key_file.clone(), sequencer_address_prefix: "astria".into(), @@ -418,7 +421,8 @@ async fn bundle_triggered_by_block_timer() { let metrics = Box::leak(Box::new(Metrics::noop_metrics(&cfg).unwrap())); mount_genesis(&sequencer, &cfg.sequencer_chain_id).await; let (executor, executor_handle) = executor::Builder { - sequencer_url: cfg.sequencer_url.clone(), + sequencer_http_url: cfg.sequencer_http_url.clone(), + sequencer_grpc_url: cfg.sequencer_grpc_url.clone(), sequencer_chain_id: cfg.sequencer_chain_id.clone(), private_key_file: cfg.private_key_file.clone(), sequencer_address_prefix: "astria".into(), @@ -506,7 +510,8 @@ async fn two_seq_actions_single_bundle() { let metrics = Box::leak(Box::new(Metrics::noop_metrics(&cfg).unwrap())); mount_genesis(&sequencer, &cfg.sequencer_chain_id).await; let (executor, executor_handle) = executor::Builder { - sequencer_url: cfg.sequencer_url.clone(), + sequencer_http_url: cfg.sequencer_http_url.clone(), + sequencer_grpc_url: cfg.sequencer_grpc_url.clone(), sequencer_chain_id: cfg.sequencer_chain_id.clone(), private_key_file: cfg.private_key_file.clone(), sequencer_address_prefix: "astria".into(), @@ -609,7 +614,8 @@ async fn chain_id_mismatch_returns_error() { // build the executor with the correct chain_id let (executor, _executor_handle) = executor::Builder { - sequencer_url: cfg.sequencer_url.clone(), + sequencer_http_url: cfg.sequencer_http_url.clone(), + sequencer_grpc_url: cfg.sequencer_grpc_url.clone(), sequencer_chain_id: cfg.sequencer_chain_id.clone(), private_key_file: cfg.private_key_file.clone(), sequencer_address_prefix: cfg.sequencer_address_prefix.clone(), diff --git a/crates/astria-composer/tests/blackbox/geth_collector.rs b/crates/astria-composer/tests/blackbox/geth_collector.rs index e6bce832e9..08df4079a0 100644 --- a/crates/astria-composer/tests/blackbox/geth_collector.rs +++ b/crates/astria-composer/tests/blackbox/geth_collector.rs @@ -1,9 +1,6 @@ use std::time::Duration; -use astria_core::{ - generated::protocol::accounts::v1alpha1::NonceResponse, - primitive::v1::RollupId, -}; +use astria_core::primitive::v1::RollupId; use ethers::types::Transaction; use crate::helper::{ @@ -19,12 +16,6 @@ async fn tx_from_one_rollup_is_received_by_sequencer() { // Spawn a composer with a mock sequencer and a mock rollup node // Initial nonce is 0 let test_composer = spawn_composer(&["test1"]).await; - tokio::time::timeout( - Duration::from_millis(100), - test_composer.setup_guard.wait_until_satisfied(), - ) - .await - .expect("composer and sequencer should have been setup successfully"); let expected_rollup_ids = vec![RollupId::from_unhashed_bytes("test1")]; let mock_guard = @@ -47,12 +38,6 @@ async fn collector_restarts_after_exit() { // Spawn a composer with a mock sequencer and a mock rollup node // Initial nonce is 0 let test_composer = spawn_composer(&["test1"]).await; - tokio::time::timeout( - Duration::from_millis(100), - test_composer.setup_guard.wait_until_satisfied(), - ) - .await - .expect("composer and sequencer should have been setup successfully"); // get rollup node let rollup_node = test_composer.rollup_nodes.get("test1").unwrap(); @@ -84,17 +69,9 @@ async fn collector_restarts_after_exit() { #[tokio::test] async fn invalid_nonce_causes_resubmission_under_different_nonce() { - use crate::helper::mock_sequencer::mount_abci_query_mock; - // Spawn a composer with a mock sequencer and a mock rollup node // Initial nonce is 0 let test_composer = spawn_composer(&["test1"]).await; - tokio::time::timeout( - Duration::from_millis(100), - test_composer.setup_guard.wait_until_satisfied(), - ) - .await - .expect("composer and sequencer should have been setup successfully"); // Reject the first transaction for invalid nonce let invalid_nonce_guard = mount_broadcast_tx_sync_invalid_nonce_mock( @@ -103,22 +80,17 @@ async fn invalid_nonce_causes_resubmission_under_different_nonce() { ) .await; - // Mount a response of 0 to a nonce query - let nonce_refetch_guard = mount_abci_query_mock( - &test_composer.sequencer, - "accounts/nonce", - NonceResponse { - height: 0, - nonce: 1, - }, - ) - .await; - let expected_rollup_ids = vec![RollupId::from_unhashed_bytes("test1")]; // Expect nonce 1 again so that the resubmitted tx is accepted let valid_nonce_guard = mount_broadcast_tx_sync_mock(&test_composer.sequencer, expected_rollup_ids, vec![1]).await; + // Mount a response of 1 to a nonce query + test_composer + .sequencer_mock + .mount_pending_nonce_response(1, "setup correct nonce") + .await; + // Push a tx to the rollup node so that it is picked up by the composer and submitted with the // stored nonce of 0, triggering the nonce refetch process test_composer.rollup_nodes["test1"] @@ -127,19 +99,12 @@ async fn invalid_nonce_causes_resubmission_under_different_nonce() { // wait for 1 sequencer block time to make sure the bundle is preempted tokio::time::timeout( - Duration::from_millis(test_composer.cfg.block_time_ms), + Duration::from_millis(test_composer.cfg.block_time_ms + 1000), invalid_nonce_guard.wait_until_satisfied(), ) .await .expect("sequencer tx should have been rejected due to invalid nonce"); - tokio::time::timeout( - Duration::from_millis(100), - nonce_refetch_guard.wait_until_satisfied(), - ) - .await - .expect("new nonce should have been fetched from the sequencer"); - tokio::time::timeout( Duration::from_millis(100), valid_nonce_guard.wait_until_satisfied(), @@ -153,12 +118,6 @@ async fn single_rollup_tx_payload_integrity() { // Spawn a composer with a mock sequencer and a mock rollup node // Initial nonce is 0 let test_composer = spawn_composer(&["test1"]).await; - tokio::time::timeout( - Duration::from_millis(100), - test_composer.setup_guard.wait_until_satisfied(), - ) - .await - .expect("composer and sequencer should have been setup successfully"); let tx: Transaction = serde_json::from_str(TEST_ETH_TX_JSON).unwrap(); let mock_guard = diff --git a/crates/astria-composer/tests/blackbox/grpc_collector.rs b/crates/astria-composer/tests/blackbox/grpc_collector.rs index 8a4a963acc..bc0586a073 100644 --- a/crates/astria-composer/tests/blackbox/grpc_collector.rs +++ b/crates/astria-composer/tests/blackbox/grpc_collector.rs @@ -1,12 +1,9 @@ use std::time::Duration; use astria_core::{ - generated::{ - composer::v1alpha1::{ - grpc_collector_service_client::GrpcCollectorServiceClient, - SubmitRollupTransactionRequest, - }, - protocol::accounts::v1alpha1::NonceResponse, + generated::composer::v1alpha1::{ + grpc_collector_service_client::GrpcCollectorServiceClient, + SubmitRollupTransactionRequest, }, primitive::v1::RollupId, }; @@ -24,13 +21,6 @@ use crate::helper::{ #[tokio::test] async fn tx_from_one_rollup_is_received_by_sequencer() { let test_composer = spawn_composer(&[]).await; - tokio::time::timeout( - Duration::from_millis(100), - test_composer.setup_guard.wait_until_satisfied(), - ) - .await - .expect("composer and sequencer were not setup successfully"); - let rollup_id = RollupId::from_unhashed_bytes("test1"); let expected_chain_ids = vec![rollup_id]; let mock_guard = @@ -63,33 +53,20 @@ async fn tx_from_one_rollup_is_received_by_sequencer() { #[tokio::test] async fn invalid_nonce_causes_resubmission_under_different_nonce() { - use crate::helper::mock_sequencer::mount_abci_query_mock; - // Spawn a composer with a mock sequencer and a mock rollup node // Initial nonce is 0 let rollup_id = RollupId::from_unhashed_bytes("test1"); let test_composer = spawn_composer(&[]).await; - tokio::time::timeout( - Duration::from_millis(100), - test_composer.setup_guard.wait_until_satisfied(), - ) - .await - .expect("composer and sequencer should have been setup successfully"); // Reject the first transaction for invalid nonce let invalid_nonce_guard = mount_broadcast_tx_sync_invalid_nonce_mock(&test_composer.sequencer, rollup_id).await; - // Mount a response of 0 to a nonce query - let nonce_refetch_guard = mount_abci_query_mock( - &test_composer.sequencer, - "accounts/nonce", - NonceResponse { - height: 0, - nonce: 1, - }, - ) - .await; + // Mount a response of 1 to a nonce query + test_composer + .sequencer_mock + .mount_pending_nonce_response(1, "setup correct nonce") + .await; let expected_chain_ids = vec![rollup_id]; // Expect nonce 1 again so that the resubmitted tx is accepted @@ -122,13 +99,6 @@ async fn invalid_nonce_causes_resubmission_under_different_nonce() { .await .expect("sequencer tx should have been rejected due to invalid nonce"); - tokio::time::timeout( - Duration::from_millis(100), - nonce_refetch_guard.wait_until_satisfied(), - ) - .await - .expect("new nonce should have been fetched from the sequencer"); - tokio::time::timeout( Duration::from_millis(100), valid_nonce_guard.wait_until_satisfied(), @@ -143,12 +113,6 @@ async fn single_rollup_tx_payload_integrity() { // Initial nonce is 0 let rollup_id = RollupId::from_unhashed_bytes("test1"); let test_composer = spawn_composer(&[]).await; - tokio::time::timeout( - Duration::from_millis(100), - test_composer.setup_guard.wait_until_satisfied(), - ) - .await - .expect("composer and sequencer should have been setup successfully"); let tx: Transaction = serde_json::from_str(TEST_ETH_TX_JSON).unwrap(); let mock_guard = diff --git a/crates/astria-composer/tests/blackbox/helper/mock_grpc_sequencer.rs b/crates/astria-composer/tests/blackbox/helper/mock_grpc_sequencer.rs new file mode 100644 index 0000000000..3a96d9165a --- /dev/null +++ b/crates/astria-composer/tests/blackbox/helper/mock_grpc_sequencer.rs @@ -0,0 +1,120 @@ +use std::{ + net::SocketAddr, + sync::Arc, +}; + +use astria_core::{ + self, + generated::sequencerblock::v1alpha1::{ + sequencer_service_server::{ + SequencerService, + SequencerServiceServer, + }, + FilteredSequencerBlock as RawFilteredSequencerBlock, + GetFilteredSequencerBlockRequest, + GetPendingNonceRequest, + GetPendingNonceResponse, + GetSequencerBlockRequest, + SequencerBlock as RawSequencerBlock, + }, +}; +use astria_eyre::eyre::{ + self, + WrapErr as _, +}; +use astria_grpc_mock::{ + matcher::message_type, + response::constant_response, + Mock, + MockServer, +}; +use tokio::task::JoinHandle; +use tonic::{ + transport::Server, + Request, + Response, + Status, +}; + +const GET_PENDING_NONCE_GRPC_NAME: &str = "get_pending_nonce"; + +pub struct MockGrpcSequencer { + _server: JoinHandle>, + pub(crate) mock_server: MockServer, + pub(crate) local_addr: SocketAddr, +} + +impl MockGrpcSequencer { + pub(crate) async fn spawn() -> Self { + use tokio_stream::wrappers::TcpListenerStream; + + let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); + let local_addr = listener.local_addr().unwrap(); + + let mock_server = MockServer::new(); + + let server = { + let sequencer_service = SequencerServiceImpl(mock_server.clone()); + tokio::spawn(async move { + Server::builder() + .add_service(SequencerServiceServer::new(sequencer_service)) + .serve_with_incoming(TcpListenerStream::new(listener)) + .await + .wrap_err("gRPC sequencer server failed") + }) + }; + Self { + _server: server, + mock_server, + local_addr, + } + } + + pub(crate) async fn mount_pending_nonce_response( + &self, + nonce_to_mount: u32, + debug_name: impl Into, + ) { + let resp = GetPendingNonceResponse { + inner: nonce_to_mount, + }; + Mock::for_rpc_given( + GET_PENDING_NONCE_GRPC_NAME, + message_type::(), + ) + .respond_with(constant_response(resp)) + .up_to_n_times(1) + .expect(1) + .with_name(debug_name) + .mount(&self.mock_server) + .await; + } +} + +struct SequencerServiceImpl(MockServer); + +#[tonic::async_trait] +impl SequencerService for SequencerServiceImpl { + async fn get_sequencer_block( + self: Arc, + _request: Request, + ) -> Result, Status> { + unimplemented!() + } + + async fn get_filtered_sequencer_block( + self: Arc, + _request: Request, + ) -> Result, Status> { + unimplemented!() + } + + async fn get_pending_nonce( + self: Arc, + request: Request, + ) -> Result, Status> { + self.0 + .handle_request(GET_PENDING_NONCE_GRPC_NAME, request) + .await + } +} diff --git a/crates/astria-composer/tests/blackbox/helper/mock_sequencer.rs b/crates/astria-composer/tests/blackbox/helper/mock_http_sequencer.rs similarity index 63% rename from crates/astria-composer/tests/blackbox/helper/mock_sequencer.rs rename to crates/astria-composer/tests/blackbox/helper/mock_http_sequencer.rs index da8dea87e0..28f5ca53a2 100644 --- a/crates/astria-composer/tests/blackbox/helper/mock_sequencer.rs +++ b/crates/astria-composer/tests/blackbox/helper/mock_http_sequencer.rs @@ -1,6 +1,5 @@ use std::time::Duration; -use prost::Message; use serde_json::json; use tendermint::{ consensus::{ @@ -13,63 +12,17 @@ use tendermint::{ Genesis, Time, }; -use tendermint_rpc::{ - response, - Id, -}; use wiremock::{ - matchers::{ - body_partial_json, - body_string_contains, - }, + matchers::body_partial_json, Mock, - MockGuard, MockServer, ResponseTemplate, }; -pub async fn start() -> (MockServer, MockGuard) { - use astria_core::generated::protocol::accounts::v1alpha1::NonceResponse; +pub async fn start() -> MockServer { let server = MockServer::start().await; - let startup_guard = mount_abci_query_mock( - &server, - "accounts/nonce", - NonceResponse { - height: 0, - nonce: 0, - }, - ) - .await; mount_genesis(&server, "test-chain-1").await; - (server, startup_guard) -} - -pub async fn mount_abci_query_mock( - server: &MockServer, - query_path: &str, - response: impl Message, -) -> MockGuard { - let expected_body = json!({ - "method": "abci_query" - }); - let response = tendermint_rpc::endpoint::abci_query::Response { - response: tendermint_rpc::endpoint::abci_query::AbciQuery { - value: response.encode_to_vec(), - ..Default::default() - }, - }; - let wrapper = response::Wrapper::new_with_id(Id::Num(1), Some(response), None); - Mock::given(body_partial_json(&expected_body)) - .and(body_string_contains(query_path)) - .respond_with( - ResponseTemplate::new(200) - .set_body_json(&wrapper) - .append_header("Content-Type", "application/json"), - ) - .up_to_n_times(1) - .expect(1) - .mount_as_scoped(server) - .await + server } async fn mount_genesis(server: &MockServer, mock_sequencer_chain_id: &str) { diff --git a/crates/astria-composer/tests/blackbox/helper/mod.rs b/crates/astria-composer/tests/blackbox/helper/mod.rs index ecd5090441..b5d8528e63 100644 --- a/crates/astria-composer/tests/blackbox/helper/mod.rs +++ b/crates/astria-composer/tests/blackbox/helper/mod.rs @@ -29,6 +29,7 @@ use astria_core::{ }; use astria_eyre::eyre; use ethers::prelude::Transaction; +use mock_grpc_sequencer::MockGrpcSequencer; use telemetry::metrics; use tempfile::NamedTempFile; use tendermint_rpc::{ @@ -48,7 +49,8 @@ use wiremock::{ ResponseTemplate, }; -pub mod mock_sequencer; +pub mod mock_grpc_sequencer; +pub mod mock_http_sequencer; static TELEMETRY: LazyLock<()> = LazyLock::new(|| { // This config can be meaningless - it's only used inside `try_init` to init the metrics, but we @@ -56,7 +58,8 @@ static TELEMETRY: LazyLock<()> = LazyLock::new(|| { let config = Config { log: String::new(), api_listen_addr: SocketAddr::new(IpAddr::from([0, 0, 0, 0]), 0), - sequencer_url: String::new(), + sequencer_http_url: String::new(), + sequencer_grpc_url: String::new(), sequencer_chain_id: String::new(), rollups: String::new(), private_key_file: String::new(), @@ -96,7 +99,7 @@ pub struct TestComposer { pub composer: JoinHandle>, pub rollup_nodes: HashMap, pub sequencer: wiremock::MockServer, - pub setup_guard: MockGuard, + pub sequencer_mock: MockGrpcSequencer, pub grpc_collector_addr: SocketAddr, pub metrics_handle: metrics::Handle, } @@ -117,7 +120,8 @@ pub async fn spawn_composer(rollup_ids: &[&str]) -> TestComposer { rollup_nodes.insert((*id).to_string(), geth); rollups.push_str(&format!("{id}::{execution_url},")); } - let (sequencer, sequencer_setup_guard) = mock_sequencer::start().await; + let sequencer = mock_http_sequencer::start().await; + let grpc_server = mock_grpc_sequencer::MockGrpcSequencer::spawn().await; let sequencer_url = sequencer.uri(); let keyfile = NamedTempFile::new().unwrap(); (&keyfile) @@ -128,7 +132,8 @@ pub async fn spawn_composer(rollup_ids: &[&str]) -> TestComposer { api_listen_addr: "127.0.0.1:0".parse().unwrap(), sequencer_chain_id: "test-chain-1".to_string(), rollups, - sequencer_url, + sequencer_http_url: sequencer_url.to_string(), + sequencer_grpc_url: format!("http://{}", grpc_server.local_addr), private_key_file: keyfile.path().to_string_lossy().to_string(), sequencer_address_prefix: "astria".into(), block_time_ms: 2000, @@ -149,6 +154,11 @@ pub async fn spawn_composer(rollup_ids: &[&str]) -> TestComposer { .unwrap(); let metrics = Box::leak(Box::new(metrics)); + // prepare get nonce response + grpc_server + .mount_pending_nonce_response(0, "startup::wait_for_mempool()") + .await; + let (composer_addr, grpc_collector_addr, composer_handle) = { let composer = Composer::from_config(&config, metrics).await.unwrap(); let composer_addr = composer.local_addr(); @@ -163,7 +173,7 @@ pub async fn spawn_composer(rollup_ids: &[&str]) -> TestComposer { composer: composer_handle, rollup_nodes, sequencer, - setup_guard: sequencer_setup_guard, + sequencer_mock: grpc_server, grpc_collector_addr, metrics_handle, }