Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(starknet_l1_provider): use client #2649

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions crates/starknet_batcher/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ papyrus_storage.workspace = true
serde.workspace = true
starknet_api.workspace = true
starknet_batcher_types.workspace = true
starknet_l1_provider_types.workspace = true
starknet_mempool_types.workspace = true
starknet_sequencer_infra.workspace = true
thiserror.workspace = true
Expand All @@ -35,4 +36,5 @@ mockall.workspace = true
rstest.workspace = true
starknet-types-core.workspace = true
starknet_api = { workspace = true, features = ["testing"] }
starknet_l1_provider_types = { workspace = true, features = ["testing"] }
starknet_mempool_types = { workspace = true, features = ["testing"] }
23 changes: 13 additions & 10 deletions crates/starknet_batcher/src/batcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use starknet_batcher_types::batcher_types::{
ValidateBlockInput,
};
use starknet_batcher_types::errors::BatcherError;
use starknet_l1_provider_types::SharedL1ProviderClient;
use starknet_mempool_types::communication::SharedMempoolClient;
use starknet_mempool_types::mempool_types::CommitBlockArgs;
use starknet_sequencer_infra::component_definitions::ComponentStarter;
Expand All @@ -48,11 +49,7 @@ use crate::proposal_manager::{
ProposalManagerTrait,
ProposalOutput,
};
use crate::transaction_provider::{
DummyL1ProviderClient,
ProposeTransactionProvider,
ValidateTransactionProvider,
};
use crate::transaction_provider::{ProposeTransactionProvider, ValidateTransactionProvider};

type OutputStreamReceiver = tokio::sync::mpsc::UnboundedReceiver<Transaction>;
type InputStreamSender = tokio::sync::mpsc::Sender<Transaction>;
Expand All @@ -61,6 +58,7 @@ pub struct Batcher {
pub config: BatcherConfig,
pub storage_reader: Arc<dyn BatcherStorageReaderTrait>,
pub storage_writer: Box<dyn BatcherStorageWriterTrait>,
pub l1_provider_client: SharedL1ProviderClient,
pub mempool_client: SharedMempoolClient,

active_height: Option<BlockNumber>,
Expand All @@ -76,6 +74,7 @@ impl Batcher {
config: BatcherConfig,
storage_reader: Arc<dyn BatcherStorageReaderTrait>,
storage_writer: Box<dyn BatcherStorageWriterTrait>,
l1_provider_client: SharedL1ProviderClient,
mempool_client: SharedMempoolClient,
block_builder_factory: Box<dyn BlockBuilderFactoryTrait>,
proposal_manager: Box<dyn ProposalManagerTrait>,
Expand All @@ -84,6 +83,7 @@ impl Batcher {
config: config.clone(),
storage_reader,
storage_writer,
l1_provider_client,
mempool_client,
active_height: None,
block_builder_factory,
Expand Down Expand Up @@ -139,8 +139,7 @@ impl Batcher {

let tx_provider = ProposeTransactionProvider::new(
self.mempool_client.clone(),
// TODO: use a real L1 provider client.
Arc::new(DummyL1ProviderClient),
self.l1_provider_client.clone(),
self.config.max_l1_handler_txs_per_block_proposal,
);

Expand Down Expand Up @@ -189,8 +188,7 @@ impl Batcher {

let tx_provider = ValidateTransactionProvider {
tx_receiver: input_tx_receiver,
// TODO: use a real L1 provider client.
l1_provider_client: Arc::new(DummyL1ProviderClient),
l1_provider_client: self.l1_provider_client.clone(),
};

let (block_builder, abort_signal_sender) = self
Expand Down Expand Up @@ -381,7 +379,11 @@ impl Batcher {
}
}

pub fn create_batcher(config: BatcherConfig, mempool_client: SharedMempoolClient) -> Batcher {
pub fn create_batcher(
config: BatcherConfig,
mempool_client: SharedMempoolClient,
l1_provider_client: SharedL1ProviderClient,
) -> Batcher {
let (storage_reader, storage_writer) = papyrus_storage::open_storage(config.storage.clone())
.expect("Failed to open batcher's storage");

Expand All @@ -397,6 +399,7 @@ pub fn create_batcher(config: BatcherConfig, mempool_client: SharedMempoolClient
config,
storage_reader,
storage_writer,
l1_provider_client,
mempool_client,
block_builder_factory,
proposal_manager,
Expand Down
4 changes: 4 additions & 0 deletions crates/starknet_batcher/src/batcher_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use starknet_batcher_types::batcher_types::{
ValidateBlockInput,
};
use starknet_batcher_types::errors::BatcherError;
use starknet_l1_provider_types::MockL1ProviderClient;
use starknet_mempool_types::communication::MockMempoolClient;
use starknet_mempool_types::mempool_types::CommitBlockArgs;

Expand Down Expand Up @@ -100,6 +101,7 @@ struct MockDependencies {
storage_reader: MockBatcherStorageReaderTrait,
storage_writer: MockBatcherStorageWriterTrait,
mempool_client: MockMempoolClient,
l1_provider_client: MockL1ProviderClient,
proposal_manager: MockProposalManagerTraitWrapper,
block_builder_factory: MockBlockBuilderFactoryTrait,
}
Expand All @@ -111,6 +113,7 @@ impl Default for MockDependencies {
Self {
storage_reader,
storage_writer: MockBatcherStorageWriterTrait::new(),
l1_provider_client: MockL1ProviderClient::new(),
mempool_client: MockMempoolClient::new(),
proposal_manager: MockProposalManagerTraitWrapper::new(),
block_builder_factory: MockBlockBuilderFactoryTrait::new(),
Expand All @@ -123,6 +126,7 @@ fn create_batcher(mock_dependencies: MockDependencies) -> Batcher {
BatcherConfig { outstream_content_buffer_size: STREAMING_CHUNK_SIZE, ..Default::default() },
Arc::new(mock_dependencies.storage_reader),
Box::new(mock_dependencies.storage_writer),
Arc::new(mock_dependencies.l1_provider_client),
Arc::new(mock_dependencies.mempool_client),
Box::new(mock_dependencies.block_builder_factory),
Box::new(mock_dependencies.proposal_manager),
Expand Down
52 changes: 20 additions & 32 deletions crates/starknet_batcher/src/transaction_provider.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,24 @@
use std::cmp::min;
use std::sync::Arc;
use std::vec;

use async_trait::async_trait;
#[cfg(test)]
use mockall::automock;
use starknet_api::executable_transaction::{L1HandlerTransaction, Transaction};
use starknet_api::executable_transaction::Transaction;
use starknet_api::transaction::TransactionHash;
use starknet_l1_provider_types::errors::L1ProviderClientError;
use starknet_l1_provider_types::{SharedL1ProviderClient, ValidationStatus as L1ValidationStatus};
use starknet_mempool_types::communication::{MempoolClientError, SharedMempoolClient};
use thiserror::Error;
use tracing::warn;

#[derive(Clone, Debug, Error)]
pub enum TransactionProviderError {
#[error(transparent)]
MempoolError(#[from] MempoolClientError),
#[error("L1Handler transaction validation failed for tx with hash {0}.")]
L1HandlerTransactionValidationFailed(TransactionHash),
#[error(transparent)]
L1ProviderError(#[from] L1ProviderClientError),
}

#[derive(Debug, PartialEq)]
Expand Down Expand Up @@ -62,8 +64,17 @@ impl ProposeTransactionProvider {
}
}

fn get_l1_handler_txs(&mut self, n_txs: usize) -> Vec<Transaction> {
self.l1_provider_client.get_txs(n_txs).into_iter().map(Transaction::L1Handler).collect()
async fn get_l1_handler_txs(
&mut self,
n_txs: usize,
) -> Result<Vec<Transaction>, TransactionProviderError> {
Ok(self
.l1_provider_client
.get_txs(n_txs)
.await?
.into_iter()
.map(Transaction::L1Handler)
.collect())
}

async fn get_mempool_txs(
Expand All @@ -88,7 +99,7 @@ impl TransactionProvider for ProposeTransactionProvider {
if self.phase == TxProviderPhase::L1 {
let n_l1handler_txs_to_get =
min(self.max_l1_handler_txs_per_block - self.n_l1handler_txs_so_far, n_txs);
let mut l1handler_txs = self.get_l1_handler_txs(n_l1handler_txs_to_get);
let mut l1handler_txs = self.get_l1_handler_txs(n_l1handler_txs_to_get).await?;
self.n_l1handler_txs_so_far += l1handler_txs.len();

// Determine whether we need to switch to mempool phase.
Expand Down Expand Up @@ -129,7 +140,9 @@ impl TransactionProvider for ValidateTransactionProvider {
}
for tx in &buffer {
if let Transaction::L1Handler(tx) = tx {
if !self.l1_provider_client.validate(tx) {
let l1_validation_status = self.l1_provider_client.validate(tx.tx_hash).await?;
if l1_validation_status != L1ValidationStatus::Validated {
// TODO: add the validation status into the error.
return Err(TransactionProviderError::L1HandlerTransactionValidationFailed(
tx.tx_hash,
));
Expand All @@ -139,28 +152,3 @@ impl TransactionProvider for ValidateTransactionProvider {
Ok(NextTxs::Txs(buffer))
}
}

// TODO: Remove L1Provider code when the communication module of l1_provider is added.
#[cfg_attr(test, automock)]
#[async_trait]
pub trait L1ProviderClient: Send + Sync {
fn get_txs(&self, n_txs: usize) -> Vec<L1HandlerTransaction>;
fn validate(&self, tx: &L1HandlerTransaction) -> bool;
}

pub type SharedL1ProviderClient = Arc<dyn L1ProviderClient>;

pub struct DummyL1ProviderClient;

#[async_trait]
impl L1ProviderClient for DummyL1ProviderClient {
fn get_txs(&self, _n_txs: usize) -> Vec<L1HandlerTransaction> {
warn!("Dummy L1 provider client is used, no L1 transactions are provided.");
vec![]
}

fn validate(&self, _tx: &L1HandlerTransaction) -> bool {
warn!("Dummy L1 provider client is used, tx is not really validated.");
true
}
}
15 changes: 8 additions & 7 deletions crates/starknet_batcher/src/transaction_provider_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ use rstest::{fixture, rstest};
use starknet_api::executable_transaction::{L1HandlerTransaction, Transaction};
use starknet_api::test_utils::invoke::{executable_invoke_tx, InvokeTxArgs};
use starknet_api::tx_hash;
use starknet_l1_provider_types::{MockL1ProviderClient, ValidationStatus as L1ValidationStatus};
use starknet_mempool_types::communication::MockMempoolClient;

use crate::transaction_provider::{
MockL1ProviderClient,
NextTxs,
ProposeTransactionProvider,
TransactionProvider,
Expand All @@ -33,7 +33,7 @@ impl MockDependencies {
self.l1_provider_client
.expect_get_txs()
.with(eq(n_to_request))
.returning(move |_| vec![L1HandlerTransaction::default(); n_to_return]);
.returning(move |_| Ok(vec![L1HandlerTransaction::default(); n_to_return]));
}

fn expect_get_mempool_txs(&mut self, n_to_request: usize) {
Expand All @@ -42,11 +42,11 @@ impl MockDependencies {
});
}

fn expect_validate_l1handler(&mut self, tx: L1HandlerTransaction, result: bool) {
fn expect_validate_l1handler(&mut self, tx: L1HandlerTransaction, result: L1ValidationStatus) {
self.l1_provider_client
.expect_validate()
.withf(move |tx_arg| tx_arg == &tx)
.returning(move |_| result);
.withf(move |tx_arg| tx_arg == &tx.tx_hash)
.returning(move |_| Ok(result));
}

async fn simulate_input_txs(&mut self, txs: Vec<Transaction>) {
Expand Down Expand Up @@ -163,7 +163,7 @@ async fn no_more_l1_handler(mut mock_dependencies: MockDependencies) {
#[tokio::test]
async fn validate_flow(mut mock_dependencies: MockDependencies) {
let test_tx = test_l1handler_tx();
mock_dependencies.expect_validate_l1handler(test_tx.clone(), true);
mock_dependencies.expect_validate_l1handler(test_tx.clone(), L1ValidationStatus::Validated);
mock_dependencies
.simulate_input_txs(vec![
Transaction::L1Handler(test_tx),
Expand All @@ -183,7 +183,8 @@ async fn validate_flow(mut mock_dependencies: MockDependencies) {
#[tokio::test]
async fn validate_fails(mut mock_dependencies: MockDependencies) {
let test_tx = test_l1handler_tx();
mock_dependencies.expect_validate_l1handler(test_tx.clone(), false);
mock_dependencies
.expect_validate_l1handler(test_tx.clone(), L1ValidationStatus::AlreadyIncludedOnL2);
mock_dependencies
.simulate_input_txs(vec![
Transaction::L1Handler(test_tx),
Expand Down
1 change: 1 addition & 0 deletions crates/starknet_integration_tests/src/config_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ pub async fn get_http_only_component_config(gateway_socket: SocketAddr) -> Compo
mempool: get_disabled_component_config(),
mempool_p2p: get_disabled_component_config(),
state_sync: get_disabled_component_config(),
l1_provider: get_disabled_component_config(),
}
}

Expand Down
8 changes: 8 additions & 0 deletions crates/starknet_l1_provider/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,20 @@ edition.workspace = true
repository.workspace = true
license.workspace = true

[features]
testing = []

[dependencies]
async-trait.workspace = true
indexmap.workspace = true
papyrus_base_layer.workspace = true
serde.workspace = true
starknet_api.workspace = true
starknet_l1_provider_types.workspace = true
starknet_sequencer_infra.workspace = true
thiserror.workspace = true
tracing.workspace = true
validator.workspace = true

[dev-dependencies]
assert_matches.workspace = true
Expand Down
31 changes: 31 additions & 0 deletions crates/starknet_l1_provider/src/communication.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
use async_trait::async_trait;
use starknet_l1_provider_types::{L1ProviderRequest, L1ProviderResponse};
use starknet_sequencer_infra::component_client::{LocalComponentClient, RemoteComponentClient};
use starknet_sequencer_infra::component_definitions::{
ComponentRequestAndResponseSender,
ComponentRequestHandler,
};
use starknet_sequencer_infra::component_server::{LocalComponentServer, RemoteComponentServer};
use tracing::instrument;

use crate::L1Provider;

pub type LocalL1ProviderServer =
LocalComponentServer<L1Provider, L1ProviderRequest, L1ProviderResponse>;
pub type RemoteL1ProviderServer = RemoteComponentServer<L1ProviderRequest, L1ProviderResponse>;
pub type L1ProviderRequestAndResponseSender =
ComponentRequestAndResponseSender<L1ProviderRequest, L1ProviderResponse>;
pub type LocalL1ProviderClient = LocalComponentClient<L1ProviderRequest, L1ProviderResponse>;
pub type RemoteL1ProviderClient = RemoteComponentClient<L1ProviderRequest, L1ProviderResponse>;

#[async_trait]
impl ComponentRequestHandler<L1ProviderRequest, L1ProviderResponse> for L1Provider {
#[instrument(skip(self))]
async fn handle_request(&mut self, request: L1ProviderRequest) -> L1ProviderResponse {
match request {
L1ProviderRequest::GetTransactions(n_txs) => {
L1ProviderResponse::GetTransactions(self.get_txs(n_txs))
}
}
}
}
Loading
Loading