Skip to content

Commit

Permalink
feat(starknet_state_sync): pass new internal blocks from state sync t…
Browse files Browse the repository at this point in the history
…o p2p sync client
  • Loading branch information
AlonLStarkWare committed Dec 10, 2024
1 parent 6a4a5d2 commit 6d8f02a
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 7 deletions.
20 changes: 16 additions & 4 deletions crates/starknet_state_sync/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ pub mod config;
pub mod runner;

use async_trait::async_trait;
use futures::channel::mpsc::{channel, Sender};
use futures::SinkExt;
use papyrus_storage::body::BodyStorageReader;
use papyrus_storage::state::StateStorageReader;
use papyrus_storage::StorageReader;
Expand All @@ -12,18 +14,23 @@ use starknet_state_sync_types::communication::{
StateSyncResponse,
StateSyncResult,
};
use starknet_state_sync_types::errors::StateSyncError;
use starknet_state_sync_types::state_sync_types::SyncBlock;

use crate::config::StateSyncConfig;
use crate::runner::StateSyncRunner;

const BUFFER_SIZE: usize = 100000;

pub fn create_state_sync_and_runner(config: StateSyncConfig) -> (StateSync, StateSyncRunner) {
let (state_sync_runner, storage_reader) = StateSyncRunner::new(config);
(StateSync { storage_reader }, state_sync_runner)
let (new_block_sender, new_block_receiver) = channel(BUFFER_SIZE);
let (state_sync_runner, storage_reader) = StateSyncRunner::new(config, new_block_receiver);
(StateSync { storage_reader, new_block_sender }, state_sync_runner)
}

pub struct StateSync {
storage_reader: StorageReader,
new_block_sender: Sender<(BlockNumber, SyncBlock)>,
}

// TODO(shahak): Have StateSyncRunner call StateSync instead of the opposite once we stop supporting
Expand All @@ -35,8 +42,13 @@ impl ComponentRequestHandler<StateSyncRequest, StateSyncResponse> for StateSync
StateSyncRequest::GetBlock(block_number) => {
StateSyncResponse::GetBlock(self.get_block(block_number))
}
StateSyncRequest::AddNewBlock(_block_number, _sync_block) => {
todo!()
StateSyncRequest::AddNewBlock(block_number, sync_block) => {
StateSyncResponse::AddNewBlock(
self.new_block_sender
.send((block_number, sync_block))
.await
.map_err(|_| StateSyncError::P2PSyncClientError),
)
}
}
}
Expand Down
12 changes: 9 additions & 3 deletions crates/starknet_state_sync/src/runner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,18 @@
mod test;

use async_trait::async_trait;
use futures::channel::mpsc::Receiver;
use futures::future::BoxFuture;
use futures::{FutureExt, StreamExt};
use futures::FutureExt;
use papyrus_network::network_manager::{self, NetworkError};
use papyrus_p2p_sync::client::{P2PSyncClient, P2PSyncClientChannels, P2PSyncClientError};
use papyrus_p2p_sync::server::{P2PSyncServer, P2PSyncServerChannels};
use papyrus_p2p_sync::{Protocol, BUFFER_SIZE};
use papyrus_storage::{open_storage, StorageReader};
use starknet_api::block::BlockNumber;
use starknet_sequencer_infra::component_definitions::ComponentStarter;
use starknet_sequencer_infra::errors::ComponentError;
use starknet_state_sync_types::state_sync_types::SyncBlock;

use crate::config::StateSyncConfig;

Expand All @@ -37,7 +40,10 @@ impl ComponentStarter for StateSyncRunner {
}

impl StateSyncRunner {
pub fn new(config: StateSyncConfig) -> (Self, StorageReader) {
pub fn new(
config: StateSyncConfig,
new_block_receiver: Receiver<(BlockNumber, SyncBlock)>,
) -> (Self, StorageReader) {
let (storage_reader, storage_writer) =
open_storage(config.storage_config).expect("StateSyncRunner failed opening storage");

Expand Down Expand Up @@ -65,7 +71,7 @@ impl StateSyncRunner {
storage_reader.clone(),
storage_writer,
p2p_sync_client_channels,
futures::stream::pending().boxed(),
Box::pin(new_block_receiver),
);

let header_server_receiver = network_manager
Expand Down
2 changes: 2 additions & 0 deletions crates/starknet_state_sync_types/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ pub enum StateSyncError {
// We put the string of the error instead.
#[error("Unexpected storage error: {0}")]
StorageError(String),
#[error("Failed to send message to P2pSyncClient")]
P2PSyncClientError,
}

impl From<StorageError> for StateSyncError {
Expand Down

0 comments on commit 6d8f02a

Please sign in to comment.