diff --git a/Cargo.lock b/Cargo.lock index bdaba407de..994af10955 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10278,6 +10278,7 @@ dependencies = [ "starknet-types-core", "starknet_api", "starknet_batcher_types", + "starknet_l1_provider_types", "starknet_mempool_types", "starknet_sequencer_infra", "thiserror", @@ -10688,6 +10689,8 @@ dependencies = [ "starknet_gateway", "starknet_gateway_types", "starknet_http_server", + "starknet_l1_provider", + "starknet_l1_provider_types", "starknet_mempool", "starknet_mempool_p2p", "starknet_mempool_p2p_types", diff --git a/crates/starknet_batcher/Cargo.toml b/crates/starknet_batcher/Cargo.toml index 7f14eb059d..c9774a6e24 100644 --- a/crates/starknet_batcher/Cargo.toml +++ b/crates/starknet_batcher/Cargo.toml @@ -19,6 +19,7 @@ papyrus_storage.workspace = true serde.workspace = true starknet_api.workspace = true starknet_batcher_types.workspace = true +starknet_l1_provider_types.workspace = true starknet_mempool_types.workspace = true starknet_sequencer_infra.workspace = true thiserror.workspace = true @@ -35,4 +36,5 @@ mockall.workspace = true rstest.workspace = true starknet-types-core.workspace = true starknet_api = { workspace = true, features = ["testing"] } +starknet_l1_provider_types = { workspace = true, features = ["testing"] } starknet_mempool_types = { workspace = true, features = ["testing"] } diff --git a/crates/starknet_batcher/src/batcher.rs b/crates/starknet_batcher/src/batcher.rs index c6188a96de..faed2662dd 100644 --- a/crates/starknet_batcher/src/batcher.rs +++ b/crates/starknet_batcher/src/batcher.rs @@ -27,6 +27,7 @@ use starknet_batcher_types::batcher_types::{ ValidateBlockInput, }; use starknet_batcher_types::errors::BatcherError; +use starknet_l1_provider_types::SharedL1ProviderClient; use starknet_mempool_types::communication::SharedMempoolClient; use starknet_mempool_types::mempool_types::CommitBlockArgs; use starknet_sequencer_infra::component_definitions::ComponentStarter; @@ -48,11 +49,7 @@ use crate::proposal_manager::{ ProposalManagerTrait, ProposalOutput, }; -use crate::transaction_provider::{ - DummyL1ProviderClient, - ProposeTransactionProvider, - ValidateTransactionProvider, -}; +use crate::transaction_provider::{ProposeTransactionProvider, ValidateTransactionProvider}; type OutputStreamReceiver = tokio::sync::mpsc::UnboundedReceiver; type InputStreamSender = tokio::sync::mpsc::Sender; @@ -61,6 +58,7 @@ pub struct Batcher { pub config: BatcherConfig, pub storage_reader: Arc, pub storage_writer: Box, + pub l1_provider_client: SharedL1ProviderClient, pub mempool_client: SharedMempoolClient, active_height: Option, @@ -76,6 +74,7 @@ impl Batcher { config: BatcherConfig, storage_reader: Arc, storage_writer: Box, + l1_provider_client: SharedL1ProviderClient, mempool_client: SharedMempoolClient, block_builder_factory: Box, proposal_manager: Box, @@ -84,6 +83,7 @@ impl Batcher { config: config.clone(), storage_reader, storage_writer, + l1_provider_client, mempool_client, active_height: None, block_builder_factory, @@ -139,8 +139,7 @@ impl Batcher { let tx_provider = ProposeTransactionProvider::new( self.mempool_client.clone(), - // TODO: use a real L1 provider client. - Arc::new(DummyL1ProviderClient), + self.l1_provider_client.clone(), self.config.max_l1_handler_txs_per_block_proposal, ); @@ -189,8 +188,7 @@ impl Batcher { let tx_provider = ValidateTransactionProvider { tx_receiver: input_tx_receiver, - // TODO: use a real L1 provider client. - l1_provider_client: Arc::new(DummyL1ProviderClient), + l1_provider_client: self.l1_provider_client.clone(), }; let (block_builder, abort_signal_sender) = self @@ -381,7 +379,11 @@ impl Batcher { } } -pub fn create_batcher(config: BatcherConfig, mempool_client: SharedMempoolClient) -> Batcher { +pub fn create_batcher( + config: BatcherConfig, + mempool_client: SharedMempoolClient, + l1_provider_client: SharedL1ProviderClient, +) -> Batcher { let (storage_reader, storage_writer) = papyrus_storage::open_storage(config.storage.clone()) .expect("Failed to open batcher's storage"); @@ -397,6 +399,7 @@ pub fn create_batcher(config: BatcherConfig, mempool_client: SharedMempoolClient config, storage_reader, storage_writer, + l1_provider_client, mempool_client, block_builder_factory, proposal_manager, diff --git a/crates/starknet_batcher/src/batcher_test.rs b/crates/starknet_batcher/src/batcher_test.rs index 78ac8e0989..5be4238ef3 100644 --- a/crates/starknet_batcher/src/batcher_test.rs +++ b/crates/starknet_batcher/src/batcher_test.rs @@ -35,6 +35,7 @@ use starknet_batcher_types::batcher_types::{ ValidateBlockInput, }; use starknet_batcher_types::errors::BatcherError; +use starknet_l1_provider_types::MockL1ProviderClient; use starknet_mempool_types::communication::MockMempoolClient; use starknet_mempool_types::mempool_types::CommitBlockArgs; @@ -100,6 +101,7 @@ struct MockDependencies { storage_reader: MockBatcherStorageReaderTrait, storage_writer: MockBatcherStorageWriterTrait, mempool_client: MockMempoolClient, + l1_provider_client: MockL1ProviderClient, proposal_manager: MockProposalManagerTraitWrapper, block_builder_factory: MockBlockBuilderFactoryTrait, } @@ -111,6 +113,7 @@ impl Default for MockDependencies { Self { storage_reader, storage_writer: MockBatcherStorageWriterTrait::new(), + l1_provider_client: MockL1ProviderClient::new(), mempool_client: MockMempoolClient::new(), proposal_manager: MockProposalManagerTraitWrapper::new(), block_builder_factory: MockBlockBuilderFactoryTrait::new(), @@ -123,6 +126,7 @@ fn create_batcher(mock_dependencies: MockDependencies) -> Batcher { BatcherConfig { outstream_content_buffer_size: STREAMING_CHUNK_SIZE, ..Default::default() }, Arc::new(mock_dependencies.storage_reader), Box::new(mock_dependencies.storage_writer), + Arc::new(mock_dependencies.l1_provider_client), Arc::new(mock_dependencies.mempool_client), Box::new(mock_dependencies.block_builder_factory), Box::new(mock_dependencies.proposal_manager), diff --git a/crates/starknet_batcher/src/transaction_provider.rs b/crates/starknet_batcher/src/transaction_provider.rs index 402cd51317..009f9d90ea 100644 --- a/crates/starknet_batcher/src/transaction_provider.rs +++ b/crates/starknet_batcher/src/transaction_provider.rs @@ -1,15 +1,15 @@ use std::cmp::min; -use std::sync::Arc; use std::vec; use async_trait::async_trait; #[cfg(test)] use mockall::automock; -use starknet_api::executable_transaction::{L1HandlerTransaction, Transaction}; +use starknet_api::executable_transaction::Transaction; use starknet_api::transaction::TransactionHash; +use starknet_l1_provider_types::errors::L1ProviderClientError; +use starknet_l1_provider_types::{SharedL1ProviderClient, ValidationStatus as L1ValidationStatus}; use starknet_mempool_types::communication::{MempoolClientError, SharedMempoolClient}; use thiserror::Error; -use tracing::warn; type TransactionProviderResult = Result; @@ -19,6 +19,8 @@ pub enum TransactionProviderError { MempoolError(#[from] MempoolClientError), #[error("L1Handler transaction validation failed for tx with hash {0}.")] L1HandlerTransactionValidationFailed(TransactionHash), + #[error(transparent)] + L1ProviderError(#[from] L1ProviderClientError), } #[derive(Debug, PartialEq)] @@ -64,8 +66,17 @@ impl ProposeTransactionProvider { } } - fn get_l1_handler_txs(&mut self, n_txs: usize) -> Vec { - self.l1_provider_client.get_txs(n_txs).into_iter().map(Transaction::L1Handler).collect() + async fn get_l1_handler_txs( + &mut self, + n_txs: usize, + ) -> TransactionProviderResult> { + Ok(self + .l1_provider_client + .get_txs(n_txs) + .await? + .into_iter() + .map(Transaction::L1Handler) + .collect()) } async fn get_mempool_txs( @@ -90,7 +101,7 @@ impl TransactionProvider for ProposeTransactionProvider { if self.phase == TxProviderPhase::L1 { let n_l1handler_txs_to_get = min(self.max_l1_handler_txs_per_block - self.n_l1handler_txs_so_far, n_txs); - let mut l1handler_txs = self.get_l1_handler_txs(n_l1handler_txs_to_get); + let mut l1handler_txs = self.get_l1_handler_txs(n_l1handler_txs_to_get).await?; self.n_l1handler_txs_so_far += l1handler_txs.len(); // Determine whether we need to switch to mempool phase. @@ -131,7 +142,9 @@ impl TransactionProvider for ValidateTransactionProvider { } for tx in &buffer { if let Transaction::L1Handler(tx) = tx { - if !self.l1_provider_client.validate(tx) { + let l1_validation_status = self.l1_provider_client.validate(tx.tx_hash).await?; + if l1_validation_status != L1ValidationStatus::Validated { + // TODO: add the validation status into the error. return Err(TransactionProviderError::L1HandlerTransactionValidationFailed( tx.tx_hash, )); @@ -141,28 +154,3 @@ impl TransactionProvider for ValidateTransactionProvider { Ok(NextTxs::Txs(buffer)) } } - -// TODO: Remove L1Provider code when the communication module of l1_provider is added. -#[cfg_attr(test, automock)] -#[async_trait] -pub trait L1ProviderClient: Send + Sync { - fn get_txs(&self, n_txs: usize) -> Vec; - fn validate(&self, tx: &L1HandlerTransaction) -> bool; -} - -pub type SharedL1ProviderClient = Arc; - -pub struct DummyL1ProviderClient; - -#[async_trait] -impl L1ProviderClient for DummyL1ProviderClient { - fn get_txs(&self, _n_txs: usize) -> Vec { - warn!("Dummy L1 provider client is used, no L1 transactions are provided."); - vec![] - } - - fn validate(&self, _tx: &L1HandlerTransaction) -> bool { - warn!("Dummy L1 provider client is used, tx is not really validated."); - true - } -} diff --git a/crates/starknet_batcher/src/transaction_provider_test.rs b/crates/starknet_batcher/src/transaction_provider_test.rs index fc7a5f341f..27a250f770 100644 --- a/crates/starknet_batcher/src/transaction_provider_test.rs +++ b/crates/starknet_batcher/src/transaction_provider_test.rs @@ -6,10 +6,10 @@ use rstest::{fixture, rstest}; use starknet_api::executable_transaction::{L1HandlerTransaction, Transaction}; use starknet_api::test_utils::invoke::{executable_invoke_tx, InvokeTxArgs}; use starknet_api::tx_hash; +use starknet_l1_provider_types::{MockL1ProviderClient, ValidationStatus as L1ValidationStatus}; use starknet_mempool_types::communication::MockMempoolClient; use crate::transaction_provider::{ - MockL1ProviderClient, NextTxs, ProposeTransactionProvider, TransactionProvider, @@ -33,7 +33,7 @@ impl MockDependencies { self.l1_provider_client .expect_get_txs() .with(eq(n_to_request)) - .returning(move |_| vec![L1HandlerTransaction::default(); n_to_return]); + .returning(move |_| Ok(vec![L1HandlerTransaction::default(); n_to_return])); } fn expect_get_mempool_txs(&mut self, n_to_request: usize) { @@ -42,11 +42,11 @@ impl MockDependencies { }); } - fn expect_validate_l1handler(&mut self, tx: L1HandlerTransaction, result: bool) { + fn expect_validate_l1handler(&mut self, tx: L1HandlerTransaction, result: L1ValidationStatus) { self.l1_provider_client .expect_validate() - .withf(move |tx_arg| tx_arg == &tx) - .returning(move |_| result); + .withf(move |tx_arg| tx_arg == &tx.tx_hash) + .returning(move |_| Ok(result)); } async fn simulate_input_txs(&mut self, txs: Vec) { @@ -163,7 +163,7 @@ async fn no_more_l1_handler(mut mock_dependencies: MockDependencies) { #[tokio::test] async fn validate_flow(mut mock_dependencies: MockDependencies) { let test_tx = test_l1handler_tx(); - mock_dependencies.expect_validate_l1handler(test_tx.clone(), true); + mock_dependencies.expect_validate_l1handler(test_tx.clone(), L1ValidationStatus::Validated); mock_dependencies .simulate_input_txs(vec![ Transaction::L1Handler(test_tx), @@ -183,7 +183,8 @@ async fn validate_flow(mut mock_dependencies: MockDependencies) { #[tokio::test] async fn validate_fails(mut mock_dependencies: MockDependencies) { let test_tx = test_l1handler_tx(); - mock_dependencies.expect_validate_l1handler(test_tx.clone(), false); + mock_dependencies + .expect_validate_l1handler(test_tx.clone(), L1ValidationStatus::AlreadyIncludedOnL2); mock_dependencies .simulate_input_txs(vec![ Transaction::L1Handler(test_tx), diff --git a/crates/starknet_integration_tests/src/config_utils.rs b/crates/starknet_integration_tests/src/config_utils.rs index 6715c197b1..a3844c2c7c 100644 --- a/crates/starknet_integration_tests/src/config_utils.rs +++ b/crates/starknet_integration_tests/src/config_utils.rs @@ -138,6 +138,7 @@ pub async fn get_http_only_component_config(gateway_socket: SocketAddr) -> Compo mempool: get_disabled_component_config(), mempool_p2p: get_disabled_component_config(), state_sync: get_disabled_component_config(), + l1_provider: get_disabled_component_config(), } } diff --git a/crates/starknet_sequencer_node/Cargo.toml b/crates/starknet_sequencer_node/Cargo.toml index 91dbee0d88..c2707768ba 100644 --- a/crates/starknet_sequencer_node/Cargo.toml +++ b/crates/starknet_sequencer_node/Cargo.toml @@ -29,6 +29,8 @@ starknet_consensus_manager.workspace = true starknet_gateway.workspace = true starknet_gateway_types.workspace = true starknet_http_server.workspace = true +starknet_l1_provider.workspace = true +starknet_l1_provider_types.workspace = true starknet_mempool.workspace = true starknet_mempool_p2p.workspace = true starknet_mempool_p2p_types.workspace = true diff --git a/crates/starknet_sequencer_node/src/clients.rs b/crates/starknet_sequencer_node/src/clients.rs index 336bc32aec..fbd2021814 100644 --- a/crates/starknet_sequencer_node/src/clients.rs +++ b/crates/starknet_sequencer_node/src/clients.rs @@ -14,6 +14,8 @@ use starknet_gateway_types::communication::{ RemoteGatewayClient, SharedGatewayClient, }; +use starknet_l1_provider::communication::{LocalL1ProviderClient, RemoteL1ProviderClient}; +use starknet_l1_provider_types::{L1ProviderRequest, L1ProviderResponse, SharedL1ProviderClient}; use starknet_mempool_p2p_types::communication::{ LocalMempoolP2pPropagatorClient, MempoolP2pPropagatorRequest, @@ -49,6 +51,7 @@ pub struct SequencerNodeClients { mempool_p2p_propagator_client: Option>, state_sync_client: Option>, + l1_provider_client: Option>, } /// A macro to retrieve a shared client (either local or remote) from a specified field in a struct, @@ -141,6 +144,19 @@ impl SequencerNodeClients { } } + pub fn get_l1_provider_local_client( + &self, + ) -> Option> { + match &self.l1_provider_client { + Some(client) => client.get_local_client(), + None => None, + } + } + + pub fn get_l1_provider_shared_client(&self) -> Option { + get_shared_client!(self, l1_provider_client) + } + pub fn get_mempool_p2p_propagator_shared_client( &self, ) -> Option { @@ -280,11 +296,20 @@ pub fn create_node_clients( config.components.state_sync.remote_client_config ); + let l1_provider_client = create_client!( + &config.components.l1_provider.execution_mode, + LocalL1ProviderClient, + RemoteL1ProviderClient, + channels.take_l1_provider_tx(), + config.components.l1_provider.remote_client_config + ); + SequencerNodeClients { batcher_client, mempool_client, gateway_client, mempool_p2p_propagator_client, state_sync_client, + l1_provider_client, } } diff --git a/crates/starknet_sequencer_node/src/communication.rs b/crates/starknet_sequencer_node/src/communication.rs index 812b72ab99..dccec5ef94 100644 --- a/crates/starknet_sequencer_node/src/communication.rs +++ b/crates/starknet_sequencer_node/src/communication.rs @@ -1,5 +1,6 @@ use starknet_batcher_types::communication::BatcherRequestAndResponseSender; use starknet_gateway_types::communication::GatewayRequestAndResponseSender; +use starknet_l1_provider::communication::L1ProviderRequestAndResponseSender; use starknet_mempool_p2p_types::communication::MempoolP2pPropagatorRequestAndResponseSender; use starknet_mempool_types::communication::MempoolRequestAndResponseSender; use starknet_sequencer_infra::component_definitions::ComponentCommunication; @@ -9,6 +10,7 @@ use tokio::sync::mpsc::{channel, Receiver, Sender}; pub struct SequencerNodeCommunication { batcher_channel: ComponentCommunication, gateway_channel: ComponentCommunication, + l1_provider_channel: ComponentCommunication, mempool_channel: ComponentCommunication, mempool_p2p_propagator_channel: ComponentCommunication, @@ -32,6 +34,14 @@ impl SequencerNodeCommunication { self.gateway_channel.take_rx() } + pub fn take_l1_provider_tx(&mut self) -> Sender { + self.l1_provider_channel.take_tx() + } + + pub fn take_l1_provider_rx(&mut self) -> Receiver { + self.l1_provider_channel.take_rx() + } + pub fn take_mempool_p2p_propagator_tx( &mut self, ) -> Sender { @@ -68,6 +78,9 @@ pub fn create_node_channels() -> SequencerNodeCommunication { let (tx_gateway, rx_gateway) = channel::(DEFAULT_INVOCATIONS_QUEUE_SIZE); + let (tx_l1_provider, rx_l1_provider) = + channel::(DEFAULT_INVOCATIONS_QUEUE_SIZE); + let (tx_mempool, rx_mempool) = channel::(DEFAULT_INVOCATIONS_QUEUE_SIZE); @@ -80,6 +93,10 @@ pub fn create_node_channels() -> SequencerNodeCommunication { SequencerNodeCommunication { batcher_channel: ComponentCommunication::new(Some(tx_batcher), Some(rx_batcher)), gateway_channel: ComponentCommunication::new(Some(tx_gateway), Some(rx_gateway)), + l1_provider_channel: ComponentCommunication::new( + Some(tx_l1_provider), + Some(rx_l1_provider), + ), mempool_channel: ComponentCommunication::new(Some(tx_mempool), Some(rx_mempool)), mempool_p2p_propagator_channel: ComponentCommunication::new( Some(tx_mempool_p2p_propagator), diff --git a/crates/starknet_sequencer_node/src/components.rs b/crates/starknet_sequencer_node/src/components.rs index 7e110e1aea..6f8b1e011e 100644 --- a/crates/starknet_sequencer_node/src/components.rs +++ b/crates/starknet_sequencer_node/src/components.rs @@ -4,6 +4,7 @@ use starknet_batcher::batcher::{create_batcher, Batcher}; use starknet_consensus_manager::consensus_manager::ConsensusManager; use starknet_gateway::gateway::{create_gateway, Gateway}; use starknet_http_server::http_server::{create_http_server, HttpServer}; +use starknet_l1_provider::{create_l1_provider, L1Provider}; use starknet_mempool::communication::{create_mempool, MempoolCommunicationWrapper}; use starknet_mempool_p2p::create_p2p_propagator_and_runner; use starknet_mempool_p2p::propagator::MempoolP2pPropagator; @@ -29,6 +30,7 @@ pub struct SequencerNodeComponents { pub consensus_manager: Option, pub gateway: Option, pub http_server: Option, + pub l1_provider: Option, pub mempool: Option, pub monitoring_endpoint: Option, pub mempool_p2p_propagator: Option, @@ -46,7 +48,10 @@ pub fn create_node_components( | ReactiveComponentExecutionMode::LocalExecutionWithRemoteEnabled => { let mempool_client = clients.get_mempool_shared_client().expect("Mempool Client should be available"); - Some(create_batcher(config.batcher_config.clone(), mempool_client)) + let l1_provider_client = clients + .get_l1_provider_shared_client() + .expect("L1 Provider Client should be available"); + Some(create_batcher(config.batcher_config.clone(), mempool_client, l1_provider_client)) } ReactiveComponentExecutionMode::Disabled | ReactiveComponentExecutionMode::Remote => None, }; @@ -139,11 +144,20 @@ pub fn create_node_components( } }; + let l1_provider = match config.components.l1_provider.execution_mode { + ReactiveComponentExecutionMode::LocalExecutionWithRemoteDisabled + | ReactiveComponentExecutionMode::LocalExecutionWithRemoteEnabled => { + Some(create_l1_provider(config.l1_provider_config.clone())) + } + ReactiveComponentExecutionMode::Disabled | ReactiveComponentExecutionMode::Remote => None, + }; + SequencerNodeComponents { batcher, consensus_manager, gateway, http_server, + l1_provider, mempool, monitoring_endpoint, mempool_p2p_propagator, diff --git a/crates/starknet_sequencer_node/src/config/component_config.rs b/crates/starknet_sequencer_node/src/config/component_config.rs index 1cde477042..1b469221e1 100644 --- a/crates/starknet_sequencer_node/src/config/component_config.rs +++ b/crates/starknet_sequencer_node/src/config/component_config.rs @@ -24,6 +24,8 @@ pub struct ComponentConfig { pub mempool_p2p: ReactiveComponentExecutionConfig, #[validate] pub state_sync: ReactiveComponentExecutionConfig, + #[validate] + pub l1_provider: ReactiveComponentExecutionConfig, // Active component configs. #[validate] @@ -42,6 +44,7 @@ impl SerializeConfig for ComponentConfig { append_sub_config_name(self.gateway.dump(), "gateway"), append_sub_config_name(self.http_server.dump(), "http_server"), append_sub_config_name(self.mempool.dump(), "mempool"), + append_sub_config_name(self.l1_provider.dump(), "l1_provider"), append_sub_config_name(self.mempool_p2p.dump(), "mempool_p2p"), append_sub_config_name(self.monitoring_endpoint.dump(), "monitoring_endpoint"), append_sub_config_name(self.state_sync.dump(), "state_sync"), diff --git a/crates/starknet_sequencer_node/src/config/node_config.rs b/crates/starknet_sequencer_node/src/config/node_config.rs index 5bc3500f4c..66ae8edb10 100644 --- a/crates/starknet_sequencer_node/src/config/node_config.rs +++ b/crates/starknet_sequencer_node/src/config/node_config.rs @@ -23,6 +23,7 @@ use starknet_batcher::VersionedConstantsOverrides; use starknet_consensus_manager::config::ConsensusManagerConfig; use starknet_gateway::config::{GatewayConfig, RpcStateReaderConfig}; use starknet_http_server::config::HttpServerConfig; +use starknet_l1_provider::L1ProviderConfig; use starknet_mempool_p2p::config::MempoolP2pConfig; use starknet_monitoring_endpoint::config::MonitoringEndpointConfig; use starknet_sierra_compile::config::SierraToCasmCompilationConfig; @@ -123,6 +124,8 @@ pub struct SequencerNodeConfig { #[validate] pub compiler_config: SierraToCasmCompilationConfig, #[validate] + pub l1_provider_config: L1ProviderConfig, + #[validate] pub mempool_p2p_config: MempoolP2pConfig, #[validate] pub monitoring_endpoint_config: MonitoringEndpointConfig, diff --git a/crates/starknet_sequencer_node/src/servers.rs b/crates/starknet_sequencer_node/src/servers.rs index 690edc2fc8..8a70a300ae 100644 --- a/crates/starknet_sequencer_node/src/servers.rs +++ b/crates/starknet_sequencer_node/src/servers.rs @@ -6,6 +6,7 @@ use starknet_batcher::communication::{LocalBatcherServer, RemoteBatcherServer}; use starknet_consensus_manager::communication::ConsensusManagerServer; use starknet_gateway::communication::{LocalGatewayServer, RemoteGatewayServer}; use starknet_http_server::communication::HttpServer; +use starknet_l1_provider::communication::{LocalL1ProviderServer, RemoteL1ProviderServer}; use starknet_mempool::communication::{LocalMempoolServer, RemoteMempoolServer}; use starknet_mempool_p2p::propagator::{ LocalMempoolP2pPropagatorServer, @@ -35,6 +36,7 @@ use crate::config::node_config::SequencerNodeConfig; struct LocalServers { pub(crate) batcher: Option>, pub(crate) gateway: Option>, + pub(crate) l1_provider: Option>, pub(crate) mempool: Option>, pub(crate) mempool_p2p_propagator: Option>, } @@ -52,6 +54,7 @@ struct WrapperServers { pub struct RemoteServers { pub batcher: Option>, pub gateway: Option>, + pub l1_provider: Option>, pub mempool: Option>, pub mempool_p2p_propagator: Option>, } @@ -222,6 +225,11 @@ fn create_local_servers( components.gateway, communication.take_gateway_rx() ); + let l1_provider_server = create_local_server!( + &config.components.l1_provider.execution_mode, + components.l1_provider, + communication.take_l1_provider_rx() + ); let mempool_server = create_local_server!( &config.components.mempool.execution_mode, components.mempool, @@ -235,6 +243,7 @@ fn create_local_servers( LocalServers { batcher: batcher_server, gateway: gateway_server, + l1_provider: l1_provider_server, mempool: mempool_server, mempool_p2p_propagator: mempool_p2p_propagator_server, } @@ -258,6 +267,13 @@ pub fn create_remote_servers( config.components.gateway.remote_server_config ); + let l1_provider_client = clients.get_l1_provider_local_client(); + let l1_provider_server = create_remote_server!( + &config.components.l1_provider.execution_mode, + l1_provider_client, + config.components.l1_provider.remote_server_config + ); + let mempool_client = clients.get_mempool_local_client(); let mempool_server = create_remote_server!( &config.components.mempool.execution_mode, @@ -274,6 +290,7 @@ pub fn create_remote_servers( RemoteServers { batcher: batcher_server, gateway: gateway_server, + l1_provider: l1_provider_server, mempool: mempool_server, mempool_p2p_propagator: mempool_p2p_propagator_server, } @@ -355,6 +372,10 @@ pub async fn run_component_servers(servers: SequencerNodeServers) -> anyhow::Res // MempoolP2pRunner server. let mempool_p2p_runner_future = get_server_future(servers.wrapper_servers.mempool_p2p_runner); + // L1Provider server. + let local_l1_provider_future = get_server_future(servers.local_servers.l1_provider); + let remote_l1_provider_future = get_server_future(servers.remote_servers.l1_provider); + // Start servers. let local_batcher_handle = tokio::spawn(local_batcher_future); let remote_batcher_handle = tokio::spawn(remote_batcher_future); @@ -368,6 +389,8 @@ pub async fn run_component_servers(servers: SequencerNodeServers) -> anyhow::Res let local_mempool_p2p_propagator_handle = tokio::spawn(local_mempool_p2p_propagator_future); let remote_mempool_p2p_propagator_handle = tokio::spawn(remote_mempool_p2p_propagator_future); let mempool_p2p_runner_handle = tokio::spawn(mempool_p2p_runner_future); + let local_l1_provider_handle = tokio::spawn(local_l1_provider_future); + let remote_l1_provider_handle = tokio::spawn(remote_l1_provider_future); let result = tokio::select! { res = local_batcher_handle => { @@ -418,6 +441,14 @@ pub async fn run_component_servers(servers: SequencerNodeServers) -> anyhow::Res error!("Mempool P2P Runner Server stopped."); res? } + res = local_l1_provider_handle => { + error!("Local L1 Provider Server stopped."); + res? + } + res = remote_l1_provider_handle => { + error!("Remote L1 Provider Server stopped."); + res? + } }; error!("Servers ended with unexpected Ok.");