From 1b4c56a221ef74cca5cba9c8b157e434410494e6 Mon Sep 17 00:00:00 2001 From: eitanm-starkware Date: Tue, 10 Dec 2024 11:50:00 +0200 Subject: [PATCH 1/3] chore(papyrus_p2p_sync): add block data receiver to p2psyncclient --- Cargo.lock | 1 + crates/papyrus_node/src/run.rs | 2 ++ crates/papyrus_p2p_sync/Cargo.toml | 1 + crates/papyrus_p2p_sync/src/client/mod.rs | 8 +++++++- crates/papyrus_p2p_sync/src/client/test_utils.rs | 2 ++ crates/starknet_state_sync/src/runner/mod.rs | 3 ++- 6 files changed, 15 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 17601d068f..cd1a9911a9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7575,6 +7575,7 @@ dependencies = [ "serde", "starknet-types-core", "starknet_api", + "starknet_state_sync_types", "static_assertions", "thiserror", "tokio", diff --git a/crates/papyrus_node/src/run.rs b/crates/papyrus_node/src/run.rs index e26b379759..6cf1b19356 100644 --- a/crates/papyrus_node/src/run.rs +++ b/crates/papyrus_node/src/run.rs @@ -7,6 +7,7 @@ use std::process::exit; use std::sync::Arc; use std::time::Duration; +use futures::StreamExt; use papyrus_base_layer::ethereum_base_layer_contract::EthereumBaseLayerConfig; use papyrus_common::metrics::COLLECT_PROFILING_METRICS; use papyrus_common::pending_classes::PendingClasses; @@ -304,6 +305,7 @@ async fn spawn_sync_client( storage_reader, storage_writer, p2p_sync_client_channels, + futures::stream::pending().boxed(), ); tokio::spawn(async move { Ok(p2p_sync.run().await?) }) } diff --git a/crates/papyrus_p2p_sync/Cargo.toml b/crates/papyrus_p2p_sync/Cargo.toml index 653fbd259b..f39a8bc76b 100644 --- a/crates/papyrus_p2p_sync/Cargo.toml +++ b/crates/papyrus_p2p_sync/Cargo.toml @@ -22,6 +22,7 @@ papyrus_storage.workspace = true rand.workspace = true serde.workspace = true starknet_api.workspace = true +starknet_state_sync_types.workspace = true starknet-types-core.workspace = true thiserror.workspace = true tokio.workspace = true diff --git a/crates/papyrus_p2p_sync/src/client/mod.rs b/crates/papyrus_p2p_sync/src/client/mod.rs index 01e13465f7..0ff1dd955e 100644 --- a/crates/papyrus_p2p_sync/src/client/mod.rs +++ b/crates/papyrus_p2p_sync/src/client/mod.rs @@ -19,6 +19,7 @@ use std::time::Duration; use class::ClassStreamBuilder; use futures::channel::mpsc::SendError; +use futures::stream::BoxStream; use futures::Stream; use header::HeaderStreamBuilder; use papyrus_common::pending_classes::ApiContractClass; @@ -40,11 +41,13 @@ use serde::{Deserialize, Serialize}; use starknet_api::block::BlockNumber; use starknet_api::core::ClassHash; use starknet_api::transaction::FullTransaction; +use starknet_state_sync_types::state_sync_types::SyncBlock; use state_diff::StateDiffStreamBuilder; use stream_builder::{DataStreamBuilder, DataStreamResult}; use tokio_stream::StreamExt; use tracing::instrument; use transaction::TransactionStreamFactory; + const STEP: u64 = 1; const ALLOWED_SIGNATURES_LENGTH: usize = 1; @@ -226,6 +229,8 @@ pub struct P2PSyncClient { storage_reader: StorageReader, storage_writer: StorageWriter, p2p_sync_channels: P2PSyncClientChannels, + #[allow(dead_code)] + internal_blocks_receiver: BoxStream<'static, (BlockNumber, SyncBlock)>, } impl P2PSyncClient { @@ -234,8 +239,9 @@ impl P2PSyncClient { storage_reader: StorageReader, storage_writer: StorageWriter, p2p_sync_channels: P2PSyncClientChannels, + internal_blocks_receiver: BoxStream<'static, (BlockNumber, SyncBlock)>, ) -> Self { - Self { config, storage_reader, storage_writer, p2p_sync_channels } + Self { config, storage_reader, storage_writer, p2p_sync_channels, internal_blocks_receiver } } #[instrument(skip(self), level = "debug", err)] diff --git a/crates/papyrus_p2p_sync/src/client/test_utils.rs b/crates/papyrus_p2p_sync/src/client/test_utils.rs index 006ce510f9..038dd78191 100644 --- a/crates/papyrus_p2p_sync/src/client/test_utils.rs +++ b/crates/papyrus_p2p_sync/src/client/test_utils.rs @@ -112,6 +112,7 @@ pub fn setup() -> TestArgs { storage_reader.clone(), storage_writer, p2p_sync_channels, + futures::stream::pending().boxed(), ); TestArgs { p2p_sync, @@ -194,6 +195,7 @@ pub async fn run_test(max_query_lengths: HashMap, actions: Vec Date: Tue, 10 Dec 2024 15:21:27 +0200 Subject: [PATCH 2/3] feat(starknet_state_sync): pass new internal blocks from state sync to p2p sync client --- crates/starknet_state_sync/src/lib.rs | 20 +++++++++++++++---- crates/starknet_state_sync/src/runner/mod.rs | 12 ++++++++--- .../starknet_state_sync_types/src/errors.rs | 2 ++ 3 files changed, 27 insertions(+), 7 deletions(-) diff --git a/crates/starknet_state_sync/src/lib.rs b/crates/starknet_state_sync/src/lib.rs index 1f86107806..8c371ca7ef 100644 --- a/crates/starknet_state_sync/src/lib.rs +++ b/crates/starknet_state_sync/src/lib.rs @@ -2,6 +2,8 @@ pub mod config; pub mod runner; use async_trait::async_trait; +use futures::channel::mpsc::{channel, Sender}; +use futures::SinkExt; use papyrus_storage::body::BodyStorageReader; use papyrus_storage::state::StateStorageReader; use papyrus_storage::StorageReader; @@ -12,18 +14,23 @@ use starknet_state_sync_types::communication::{ StateSyncResponse, StateSyncResult, }; +use starknet_state_sync_types::errors::StateSyncError; use starknet_state_sync_types::state_sync_types::SyncBlock; use crate::config::StateSyncConfig; use crate::runner::StateSyncRunner; +const BUFFER_SIZE: usize = 100000; + pub fn create_state_sync_and_runner(config: StateSyncConfig) -> (StateSync, StateSyncRunner) { - let (state_sync_runner, storage_reader) = StateSyncRunner::new(config); - (StateSync { storage_reader }, state_sync_runner) + let (new_block_sender, new_block_receiver) = channel(BUFFER_SIZE); + let (state_sync_runner, storage_reader) = StateSyncRunner::new(config, new_block_receiver); + (StateSync { storage_reader, new_block_sender }, state_sync_runner) } pub struct StateSync { storage_reader: StorageReader, + new_block_sender: Sender<(BlockNumber, SyncBlock)>, } // TODO(shahak): Have StateSyncRunner call StateSync instead of the opposite once we stop supporting @@ -35,8 +42,13 @@ impl ComponentRequestHandler for StateSync StateSyncRequest::GetBlock(block_number) => { StateSyncResponse::GetBlock(self.get_block(block_number)) } - StateSyncRequest::AddNewBlock(_block_number, _sync_block) => { - todo!() + StateSyncRequest::AddNewBlock(block_number, sync_block) => { + StateSyncResponse::AddNewBlock( + self.new_block_sender + .send((block_number, sync_block)) + .await + .map_err(|_| StateSyncError::P2PSyncClientError), + ) } } } diff --git a/crates/starknet_state_sync/src/runner/mod.rs b/crates/starknet_state_sync/src/runner/mod.rs index 6e333adcff..7733d42b8a 100644 --- a/crates/starknet_state_sync/src/runner/mod.rs +++ b/crates/starknet_state_sync/src/runner/mod.rs @@ -2,15 +2,18 @@ mod test; use async_trait::async_trait; +use futures::channel::mpsc::Receiver; use futures::future::BoxFuture; -use futures::{FutureExt, StreamExt}; +use futures::FutureExt; use papyrus_network::network_manager::{self, NetworkError}; use papyrus_p2p_sync::client::{P2PSyncClient, P2PSyncClientChannels, P2PSyncClientError}; use papyrus_p2p_sync::server::{P2PSyncServer, P2PSyncServerChannels}; use papyrus_p2p_sync::{Protocol, BUFFER_SIZE}; use papyrus_storage::{open_storage, StorageReader}; +use starknet_api::block::BlockNumber; use starknet_sequencer_infra::component_definitions::ComponentStarter; use starknet_sequencer_infra::errors::ComponentError; +use starknet_state_sync_types::state_sync_types::SyncBlock; use crate::config::StateSyncConfig; @@ -37,7 +40,10 @@ impl ComponentStarter for StateSyncRunner { } impl StateSyncRunner { - pub fn new(config: StateSyncConfig) -> (Self, StorageReader) { + pub fn new( + config: StateSyncConfig, + new_block_receiver: Receiver<(BlockNumber, SyncBlock)>, + ) -> (Self, StorageReader) { let (storage_reader, storage_writer) = open_storage(config.storage_config).expect("StateSyncRunner failed opening storage"); @@ -65,7 +71,7 @@ impl StateSyncRunner { storage_reader.clone(), storage_writer, p2p_sync_client_channels, - futures::stream::pending().boxed(), + Box::pin(new_block_receiver), ); let header_server_receiver = network_manager diff --git a/crates/starknet_state_sync_types/src/errors.rs b/crates/starknet_state_sync_types/src/errors.rs index f5842256ac..f37d039aa9 100644 --- a/crates/starknet_state_sync_types/src/errors.rs +++ b/crates/starknet_state_sync_types/src/errors.rs @@ -10,6 +10,8 @@ pub enum StateSyncError { // We put the string of the error instead. #[error("Unexpected storage error: {0}")] StorageError(String), + #[error("Failed to send message to P2pSyncClient")] + P2PSyncClientError, } impl From for StateSyncError { From 365db407b55aa06b16fbc95e5c11b0ce00303fbd Mon Sep 17 00:00:00 2001 From: Alon Lukatch Date: Tue, 10 Dec 2024 18:15:56 +0200 Subject: [PATCH 3/3] chore: fix CR comments --- Cargo.lock | 1 + crates/starknet_state_sync/src/lib.rs | 2 +- crates/starknet_state_sync/src/runner/mod.rs | 4 ++-- crates/starknet_state_sync_types/Cargo.toml | 1 + crates/starknet_state_sync_types/src/errors.rs | 11 +++++++++-- 5 files changed, 14 insertions(+), 5 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index cd1a9911a9..d90e52ebda 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10737,6 +10737,7 @@ name = "starknet_state_sync_types" version = "0.0.0" dependencies = [ "async-trait", + "futures", "papyrus_proc_macros", "papyrus_storage", "serde", diff --git a/crates/starknet_state_sync/src/lib.rs b/crates/starknet_state_sync/src/lib.rs index 8c371ca7ef..2aa67cd9f5 100644 --- a/crates/starknet_state_sync/src/lib.rs +++ b/crates/starknet_state_sync/src/lib.rs @@ -47,7 +47,7 @@ impl ComponentRequestHandler for StateSync self.new_block_sender .send((block_number, sync_block)) .await - .map_err(|_| StateSyncError::P2PSyncClientError), + .map_err(StateSyncError::from), ) } } diff --git a/crates/starknet_state_sync/src/runner/mod.rs b/crates/starknet_state_sync/src/runner/mod.rs index 7733d42b8a..ee32c8e897 100644 --- a/crates/starknet_state_sync/src/runner/mod.rs +++ b/crates/starknet_state_sync/src/runner/mod.rs @@ -4,7 +4,7 @@ mod test; use async_trait::async_trait; use futures::channel::mpsc::Receiver; use futures::future::BoxFuture; -use futures::FutureExt; +use futures::{FutureExt, StreamExt}; use papyrus_network::network_manager::{self, NetworkError}; use papyrus_p2p_sync::client::{P2PSyncClient, P2PSyncClientChannels, P2PSyncClientError}; use papyrus_p2p_sync::server::{P2PSyncServer, P2PSyncServerChannels}; @@ -71,7 +71,7 @@ impl StateSyncRunner { storage_reader.clone(), storage_writer, p2p_sync_client_channels, - Box::pin(new_block_receiver), + new_block_receiver.boxed(), ); let header_server_receiver = network_manager diff --git a/crates/starknet_state_sync_types/Cargo.toml b/crates/starknet_state_sync_types/Cargo.toml index f9204e6338..095cc47ad4 100644 --- a/crates/starknet_state_sync_types/Cargo.toml +++ b/crates/starknet_state_sync_types/Cargo.toml @@ -10,6 +10,7 @@ workspace = true [dependencies] async-trait.workspace = true +futures.workspace = true papyrus_proc_macros.workspace = true papyrus_storage.workspace = true serde = { workspace = true, features = ["derive"] } diff --git a/crates/starknet_state_sync_types/src/errors.rs b/crates/starknet_state_sync_types/src/errors.rs index f37d039aa9..15406dd103 100644 --- a/crates/starknet_state_sync_types/src/errors.rs +++ b/crates/starknet_state_sync_types/src/errors.rs @@ -1,3 +1,4 @@ +use futures::channel::mpsc::SendError; use papyrus_storage::StorageError; use serde::{Deserialize, Serialize}; use thiserror::Error; @@ -10,8 +11,8 @@ pub enum StateSyncError { // We put the string of the error instead. #[error("Unexpected storage error: {0}")] StorageError(String), - #[error("Failed to send message to P2pSyncClient")] - P2PSyncClientError, + #[error("Error while sending SyncBlock from StateSync to P2pSyncClient")] + SendError(String), } impl From for StateSyncError { @@ -19,3 +20,9 @@ impl From for StateSyncError { StateSyncError::StorageError(error.to_string()) } } + +impl From for StateSyncError { + fn from(error: SendError) -> Self { + StateSyncError::SendError(error.to_string()) + } +}