From eb62875f2a32d1760b10e6979c727e85fee42714 Mon Sep 17 00:00:00 2001 From: eitanm-starkware Date: Tue, 10 Dec 2024 11:50:00 +0200 Subject: [PATCH 1/3] chore(papyrus_p2p_sync): add block data receiver to p2psyncclient --- Cargo.lock | 1 + crates/papyrus_node/src/run.rs | 2 ++ crates/papyrus_p2p_sync/Cargo.toml | 1 + crates/papyrus_p2p_sync/src/client/mod.rs | 8 +++++++- crates/papyrus_p2p_sync/src/client/test_utils.rs | 2 ++ crates/starknet_state_sync/src/runner/mod.rs | 3 ++- 6 files changed, 15 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 849e52446a..cff774fa61 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7575,6 +7575,7 @@ dependencies = [ "serde", "starknet-types-core", "starknet_api", + "starknet_state_sync_types", "static_assertions", "thiserror", "tokio", diff --git a/crates/papyrus_node/src/run.rs b/crates/papyrus_node/src/run.rs index e26b379759..6cf1b19356 100644 --- a/crates/papyrus_node/src/run.rs +++ b/crates/papyrus_node/src/run.rs @@ -7,6 +7,7 @@ use std::process::exit; use std::sync::Arc; use std::time::Duration; +use futures::StreamExt; use papyrus_base_layer::ethereum_base_layer_contract::EthereumBaseLayerConfig; use papyrus_common::metrics::COLLECT_PROFILING_METRICS; use papyrus_common::pending_classes::PendingClasses; @@ -304,6 +305,7 @@ async fn spawn_sync_client( storage_reader, storage_writer, p2p_sync_client_channels, + futures::stream::pending().boxed(), ); tokio::spawn(async move { Ok(p2p_sync.run().await?) }) } diff --git a/crates/papyrus_p2p_sync/Cargo.toml b/crates/papyrus_p2p_sync/Cargo.toml index 653fbd259b..f39a8bc76b 100644 --- a/crates/papyrus_p2p_sync/Cargo.toml +++ b/crates/papyrus_p2p_sync/Cargo.toml @@ -22,6 +22,7 @@ papyrus_storage.workspace = true rand.workspace = true serde.workspace = true starknet_api.workspace = true +starknet_state_sync_types.workspace = true starknet-types-core.workspace = true thiserror.workspace = true tokio.workspace = true diff --git a/crates/papyrus_p2p_sync/src/client/mod.rs b/crates/papyrus_p2p_sync/src/client/mod.rs index 01e13465f7..0ff1dd955e 100644 --- a/crates/papyrus_p2p_sync/src/client/mod.rs +++ b/crates/papyrus_p2p_sync/src/client/mod.rs @@ -19,6 +19,7 @@ use std::time::Duration; use class::ClassStreamBuilder; use futures::channel::mpsc::SendError; +use futures::stream::BoxStream; use futures::Stream; use header::HeaderStreamBuilder; use papyrus_common::pending_classes::ApiContractClass; @@ -40,11 +41,13 @@ use serde::{Deserialize, Serialize}; use starknet_api::block::BlockNumber; use starknet_api::core::ClassHash; use starknet_api::transaction::FullTransaction; +use starknet_state_sync_types::state_sync_types::SyncBlock; use state_diff::StateDiffStreamBuilder; use stream_builder::{DataStreamBuilder, DataStreamResult}; use tokio_stream::StreamExt; use tracing::instrument; use transaction::TransactionStreamFactory; + const STEP: u64 = 1; const ALLOWED_SIGNATURES_LENGTH: usize = 1; @@ -226,6 +229,8 @@ pub struct P2PSyncClient { storage_reader: StorageReader, storage_writer: StorageWriter, p2p_sync_channels: P2PSyncClientChannels, + #[allow(dead_code)] + internal_blocks_receiver: BoxStream<'static, (BlockNumber, SyncBlock)>, } impl P2PSyncClient { @@ -234,8 +239,9 @@ impl P2PSyncClient { storage_reader: StorageReader, storage_writer: StorageWriter, p2p_sync_channels: P2PSyncClientChannels, + internal_blocks_receiver: BoxStream<'static, (BlockNumber, SyncBlock)>, ) -> Self { - Self { config, storage_reader, storage_writer, p2p_sync_channels } + Self { config, storage_reader, storage_writer, p2p_sync_channels, internal_blocks_receiver } } #[instrument(skip(self), level = "debug", err)] diff --git a/crates/papyrus_p2p_sync/src/client/test_utils.rs b/crates/papyrus_p2p_sync/src/client/test_utils.rs index 006ce510f9..038dd78191 100644 --- a/crates/papyrus_p2p_sync/src/client/test_utils.rs +++ b/crates/papyrus_p2p_sync/src/client/test_utils.rs @@ -112,6 +112,7 @@ pub fn setup() -> TestArgs { storage_reader.clone(), storage_writer, p2p_sync_channels, + futures::stream::pending().boxed(), ); TestArgs { p2p_sync, @@ -194,6 +195,7 @@ pub async fn run_test(max_query_lengths: HashMap, actions: Vec Date: Mon, 9 Dec 2024 15:26:35 +0200 Subject: [PATCH 2/3] feat(papyrus_p2p_sync): handle internal block in datastreambuilder --- .../src/client/stream_builder.rs | 57 +++++++++++++++---- 1 file changed, 46 insertions(+), 11 deletions(-) diff --git a/crates/papyrus_p2p_sync/src/client/stream_builder.rs b/crates/papyrus_p2p_sync/src/client/stream_builder.rs index 36718a939d..257bcb2cd0 100644 --- a/crates/papyrus_p2p_sync/src/client/stream_builder.rs +++ b/crates/papyrus_p2p_sync/src/client/stream_builder.rs @@ -1,11 +1,12 @@ use std::cmp::min; +use std::collections::HashMap; use std::time::Duration; use async_stream::stream; use futures::channel::mpsc::Receiver; use futures::future::BoxFuture; use futures::stream::BoxStream; -use futures::StreamExt; +use futures::{FutureExt, StreamExt}; use papyrus_network::network_manager::{ClientResponsesManager, SqmrClientSender}; use papyrus_protobuf::converters::ProtobufConversionError; use papyrus_protobuf::sync::{BlockHashOrNumber, DataOrFin, Direction, Query}; @@ -53,10 +54,35 @@ where fn get_start_block_number(storage_reader: &StorageReader) -> Result; + fn get_internal_blocks( + internal_blocks_received: &mut HashMap, + internal_block_receiver: &mut Option>, + current_block_number: BlockNumber, + ) -> Option { + if let Some(block) = internal_blocks_received.remove(¤t_block_number) { + return Some(block); + } + if let Some(internal_block_receiver) = internal_block_receiver { + while let Some((block_number, block_data)) = internal_block_receiver + .next() + .now_or_never() + .map(|now_or_never_res| now_or_never_res.expect("internal block receiver closed")) + { + if block_number >= current_block_number { + if block_number == current_block_number { + return Some(block_data); + } + internal_blocks_received.insert(block_number, block_data); + } + } + } + None + } + fn create_stream( mut sqmr_sender: SqmrClientSender>, storage_reader: StorageReader, - _internal_block_receiver: Option>, + mut internal_block_receiver: Option>, wait_period_for_new_data: Duration, num_blocks_per_query: u64, stop_sync_at_block_number: Option, @@ -67,6 +93,7 @@ where { stream! { let mut current_block_number = Self::get_start_block_number(&storage_reader)?; + let mut internal_blocks_received = HashMap::new(); 'send_query_and_parse_responses: loop { let limit = match Self::BLOCK_NUMBER_LIMIT { BlockNumberLimit::Unlimited => num_blocks_per_query, @@ -92,6 +119,16 @@ where current_block_number.0, end_block_number, ); + let end_block_number = min(end_block_number, stop_sync_at_block_number.map(|block_number| block_number.0).unwrap_or(end_block_number)); + while current_block_number.0 < end_block_number { + if let Some(block) = Self::get_internal_blocks(&mut internal_blocks_received, &mut internal_block_receiver, current_block_number) + { + yield Ok(Box::::from(Box::new(block))); + current_block_number = current_block_number.unchecked_next(); + } else { + break; + } + } // TODO(shahak): Use the report callback. let mut client_response_manager = sqmr_sender .send_new_query( @@ -101,9 +138,7 @@ where limit, step: STEP, }) - ) - .await?; - + ).await?; while current_block_number.0 < end_block_number { match Self::parse_data_for_block( &mut client_response_manager, current_block_number, &storage_reader @@ -134,12 +169,12 @@ where } info!("Added {:?} for block {}.", Self::TYPE_DESCRIPTION, current_block_number); current_block_number = current_block_number.unchecked_next(); - if stop_sync_at_block_number.is_some_and(|stop_sync_at_block_number| { - current_block_number >= stop_sync_at_block_number - }) { - info!("{:?} hit the stop sync block number.", Self::TYPE_DESCRIPTION); - return; - } + } + if stop_sync_at_block_number.is_some_and(|stop_sync_at_block_number| { + current_block_number >= stop_sync_at_block_number + }) { + info!("{:?} hit the stop sync block number.", Self::TYPE_DESCRIPTION); + return; } // Consume the None message signaling the end of the query. From 5b59b7d4e500441f533a754a4d3d738c574936c5 Mon Sep 17 00:00:00 2001 From: eitanm-starkware Date: Tue, 10 Dec 2024 19:16:59 +0200 Subject: [PATCH 3/3] chore(papyrus_p2p_sync): create internal blocks senders and receivers structs --- crates/papyrus_p2p_sync/src/client/mod.rs | 77 ++++++++++++++++++++--- 1 file changed, 69 insertions(+), 8 deletions(-) diff --git a/crates/papyrus_p2p_sync/src/client/mod.rs b/crates/papyrus_p2p_sync/src/client/mod.rs index 0ff1dd955e..d085683ca9 100644 --- a/crates/papyrus_p2p_sync/src/client/mod.rs +++ b/crates/papyrus_p2p_sync/src/client/mod.rs @@ -18,7 +18,7 @@ use std::collections::BTreeMap; use std::time::Duration; use class::ClassStreamBuilder; -use futures::channel::mpsc::SendError; +use futures::channel::mpsc::{Receiver, SendError, Sender}; use futures::stream::BoxStream; use futures::Stream; use header::HeaderStreamBuilder; @@ -38,8 +38,9 @@ use papyrus_protobuf::sync::{ }; use papyrus_storage::{StorageError, StorageReader, StorageWriter}; use serde::{Deserialize, Serialize}; -use starknet_api::block::BlockNumber; +use starknet_api::block::{BlockBody, BlockNumber}; use starknet_api::core::ClassHash; +use starknet_api::state::{DeclaredClasses, DeprecatedDeclaredClasses, ThinStateDiff}; use starknet_api::transaction::FullTransaction; use starknet_state_sync_types::state_sync_types::SyncBlock; use state_diff::StateDiffStreamBuilder; @@ -179,15 +180,16 @@ impl P2PSyncClientChannels { ) -> Self { Self { header_sender, state_diff_sender, transaction_sender, class_sender } } - pub(crate) fn create_stream( + fn create_stream( self, storage_reader: StorageReader, config: P2PSyncClientConfig, + internal_blocks_receivers: InternalBlocksReceivers, ) -> impl Stream + Send + 'static { let header_stream = HeaderStreamBuilder::create_stream( self.header_sender, storage_reader.clone(), - None, + Some(internal_blocks_receivers.header_receiver), config.wait_period_for_new_data, config.num_headers_per_query, config.stop_sync_at_block_number, @@ -196,7 +198,7 @@ impl P2PSyncClientChannels { let state_diff_stream = StateDiffStreamBuilder::create_stream( self.state_diff_sender, storage_reader.clone(), - None, + Some(internal_blocks_receivers.state_diff_receiver), config.wait_period_for_new_data, config.num_block_state_diffs_per_query, config.stop_sync_at_block_number, @@ -205,7 +207,7 @@ impl P2PSyncClientChannels { let transaction_stream = TransactionStreamFactory::create_stream( self.transaction_sender, storage_reader.clone(), - None, + Some(internal_blocks_receivers.transaction_receiver), config.wait_period_for_new_data, config.num_block_transactions_per_query, config.stop_sync_at_block_number, @@ -246,12 +248,71 @@ impl P2PSyncClient { #[instrument(skip(self), level = "debug", err)] pub async fn run(mut self) -> Result<(), P2PSyncClientError> { - let mut data_stream = - self.p2p_sync_channels.create_stream(self.storage_reader.clone(), self.config); + let internal_blocks_channels = InternalBlocksChannels::new(); + self.create_internal_blocks_sender_task(internal_blocks_channels.senders); + let mut data_stream = self.p2p_sync_channels.create_stream( + self.storage_reader.clone(), + self.config, + internal_blocks_channels.receivers, + ); loop { let data = data_stream.next().await.expect("Sync data stream should never end")?; data.write_to_storage(&mut self.storage_writer)?; } } + + fn create_internal_blocks_sender_task( + &self, + #[allow(unused_variables)] internal_blocks_senders: InternalBlocksSenders, + ) -> tokio::task::JoinHandle<()> { + tokio::spawn(async move {}) + } +} + +struct InternalBlocksReceivers { + header_receiver: Receiver<(BlockNumber, SignedBlockHeader)>, + state_diff_receiver: Receiver<(BlockNumber, (ThinStateDiff, BlockNumber))>, + transaction_receiver: Receiver<(BlockNumber, (BlockBody, BlockNumber))>, + #[allow(dead_code)] + class_receiver: + Receiver<(BlockNumber, (DeclaredClasses, DeprecatedDeclaredClasses, BlockNumber))>, +} + +#[allow(dead_code)] +struct InternalBlocksSenders { + header_sender: Sender<(BlockNumber, SignedBlockHeader)>, + state_diff_sender: Sender<(BlockNumber, (ThinStateDiff, BlockNumber))>, + transaction_sender: Sender<(BlockNumber, (BlockBody, BlockNumber))>, + #[allow(dead_code)] + class_sender: Sender<(BlockNumber, (DeclaredClasses, DeprecatedDeclaredClasses, BlockNumber))>, +} + +struct InternalBlocksChannels { + receivers: InternalBlocksReceivers, + senders: InternalBlocksSenders, +} + +impl InternalBlocksChannels { + pub fn new() -> Self { + let (header_sender, header_receiver) = futures::channel::mpsc::channel(100); + let (state_diff_sender, state_diff_receiver) = futures::channel::mpsc::channel(100); + let (transaction_sender, transaction_receiver) = futures::channel::mpsc::channel(100); + let (class_sender, class_receiver) = futures::channel::mpsc::channel(100); + + Self { + receivers: InternalBlocksReceivers { + header_receiver, + state_diff_receiver, + transaction_receiver, + class_receiver, + }, + senders: InternalBlocksSenders { + header_sender, + state_diff_sender, + transaction_sender, + class_sender, + }, + } + } }