From 2fc75332781d31ca73630c506a34fae1055bc176 Mon Sep 17 00:00:00 2001 From: ethanoroshiba Date: Fri, 6 Dec 2024 14:59:38 -0600 Subject: [PATCH 1/2] box large enum variant --- crates/astria-conductor/src/celestia/mod.rs | 26 +++++++++++---------- crates/astria-conductor/src/executor/mod.rs | 8 +++---- 2 files changed, 18 insertions(+), 16 deletions(-) diff --git a/crates/astria-conductor/src/celestia/mod.rs b/crates/astria-conductor/src/celestia/mod.rs index 85146dc645..9900820853 100644 --- a/crates/astria-conductor/src/celestia/mod.rs +++ b/crates/astria-conductor/src/celestia/mod.rs @@ -515,18 +515,20 @@ impl RunningReader { match self.executor.try_send_firm_block(block) { Ok(()) => self.advance_reference_celestia_height(celestia_height), Err(FirmTrySendError::Channel { - source: mpsc::error::TrySendError::Full(block), - }) => { - trace!( - "executor channel is full; rescheduling block fetch until the channel opens up" - ); - self.enqueued_block = enqueue_block(self.executor.clone(), block).boxed().fuse(); - } - - Err(FirmTrySendError::Channel { - source: mpsc::error::TrySendError::Closed(_), - }) => bail!("exiting because executor channel is closed"), - + source, + }) => match *source { + mpsc::error::TrySendError::Full(block) => { + trace!( + "executor channel is full; rescheduling block fetch until the channel \ + opens up" + ); + self.enqueued_block = + enqueue_block(self.executor.clone(), block).boxed().fuse(); + } + mpsc::error::TrySendError::Closed(_) => { + bail!("exiting because executor channel is closed"); + } + }, Err(FirmTrySendError::NotSet) => bail!( "exiting because executor was configured without firm commitments; this Celestia \ reader should have never been started" diff --git a/crates/astria-conductor/src/executor/mod.rs b/crates/astria-conductor/src/executor/mod.rs index 8c3155ca8b..27e26de835 100644 --- a/crates/astria-conductor/src/executor/mod.rs +++ b/crates/astria-conductor/src/executor/mod.rs @@ -74,7 +74,7 @@ pub(crate) enum FirmSendError { #[error("failed sending blocks to executor")] Channel { #[from] - source: mpsc::error::SendError, + source: Box>, }, } @@ -85,7 +85,7 @@ pub(crate) enum FirmTrySendError { #[error("failed sending blocks to executor")] Channel { #[from] - source: mpsc::error::TrySendError, + source: Box>, }, } @@ -149,7 +149,7 @@ impl Handle { block: ReconstructedBlock, ) -> Result<(), FirmSendError> { let sender = self.firm_blocks.as_ref().ok_or(FirmSendError::NotSet)?; - sender.send(block).await?; + sender.send(block).await.map_err(Box::new)?; Ok(()) } @@ -158,7 +158,7 @@ impl Handle { block: ReconstructedBlock, ) -> Result<(), FirmTrySendError> { let sender = self.firm_blocks.as_ref().ok_or(FirmTrySendError::NotSet)?; - sender.try_send(block)?; + sender.try_send(block).map_err(Box::new)?; Ok(()) } From 09f24b2df5d2fd06059747c4a98d62c6a24bdf19 Mon Sep 17 00:00:00 2001 From: ethanoroshiba Date: Mon, 9 Dec 2024 13:40:39 -0600 Subject: [PATCH 2/2] requested changes --- crates/astria-conductor/src/celestia/mod.rs | 8 +++---- .../astria-conductor/src/executor/builder.rs | 3 ++- crates/astria-conductor/src/executor/mod.rs | 22 +++++++++---------- 3 files changed, 16 insertions(+), 17 deletions(-) diff --git a/crates/astria-conductor/src/celestia/mod.rs b/crates/astria-conductor/src/celestia/mod.rs index 9900820853..0c94659853 100644 --- a/crates/astria-conductor/src/celestia/mod.rs +++ b/crates/astria-conductor/src/celestia/mod.rs @@ -512,18 +512,18 @@ impl RunningReader { #[instrument(skip_all)] fn forward_block_to_executor(&mut self, block: ReconstructedBlock) -> eyre::Result<()> { let celestia_height = block.celestia_height; - match self.executor.try_send_firm_block(block) { + match self.executor.try_send_firm_block(Box::new(block)) { Ok(()) => self.advance_reference_celestia_height(celestia_height), Err(FirmTrySendError::Channel { source, - }) => match *source { + }) => match source { mpsc::error::TrySendError::Full(block) => { trace!( "executor channel is full; rescheduling block fetch until the channel \ opens up" ); self.enqueued_block = - enqueue_block(self.executor.clone(), block).boxed().fuse(); + enqueue_block(self.executor.clone(), *block).boxed().fuse(); } mpsc::error::TrySendError::Closed(_) => { bail!("exiting because executor channel is closed"); @@ -667,7 +667,7 @@ async fn enqueue_block( block: ReconstructedBlock, ) -> Result { let celestia_height = block.celestia_height; - executor.send_firm_block(block).await?; + executor.send_firm_block(Box::new(block)).await?; Ok(celestia_height) } diff --git a/crates/astria-conductor/src/executor/builder.rs b/crates/astria-conductor/src/executor/builder.rs index 62559f30b9..daf53e5835 100644 --- a/crates/astria-conductor/src/executor/builder.rs +++ b/crates/astria-conductor/src/executor/builder.rs @@ -11,6 +11,7 @@ use super::{ state, Executor, Handle, + ReconstructedBlock, StateNotInit, }; use crate::{ @@ -44,7 +45,7 @@ impl Builder { let mut firm_block_tx = None; let mut firm_block_rx = None; if mode.is_with_firm() { - let (tx, rx) = mpsc::channel(16); + let (tx, rx) = mpsc::channel::>(16); firm_block_tx = Some(tx); firm_block_rx = Some(rx); } diff --git a/crates/astria-conductor/src/executor/mod.rs b/crates/astria-conductor/src/executor/mod.rs index 27e26de835..85f188ff8a 100644 --- a/crates/astria-conductor/src/executor/mod.rs +++ b/crates/astria-conductor/src/executor/mod.rs @@ -74,7 +74,7 @@ pub(crate) enum FirmSendError { #[error("failed sending blocks to executor")] Channel { #[from] - source: Box>, + source: mpsc::error::SendError>, }, } @@ -85,7 +85,7 @@ pub(crate) enum FirmTrySendError { #[error("failed sending blocks to executor")] Channel { #[from] - source: Box>, + source: mpsc::error::TrySendError>, }, } @@ -115,7 +115,7 @@ pub(crate) enum SoftTrySendError { /// information. #[derive(Debug, Clone)] pub(crate) struct Handle { - firm_blocks: Option>, + firm_blocks: Option>>, soft_blocks: Option>, state: StateReceiver, _state_init: TStateInit, @@ -146,20 +146,18 @@ impl Handle { #[instrument(skip_all, err)] pub(crate) async fn send_firm_block( self, - block: ReconstructedBlock, + block: Box, ) -> Result<(), FirmSendError> { let sender = self.firm_blocks.as_ref().ok_or(FirmSendError::NotSet)?; - sender.send(block).await.map_err(Box::new)?; - Ok(()) + Ok(sender.send(block).await?) } pub(crate) fn try_send_firm_block( &self, - block: ReconstructedBlock, + block: Box, ) -> Result<(), FirmTrySendError> { let sender = self.firm_blocks.as_ref().ok_or(FirmTrySendError::NotSet)?; - sender.try_send(block).map_err(Box::new)?; - Ok(()) + Ok(sender.try_send(block)?) } #[instrument(skip_all, err)] @@ -226,7 +224,7 @@ pub(crate) struct Executor { /// The channel of which this executor receives blocks for executing /// firm commitments. /// Only set if `mode` is `FirmOnly` or `SoftAndFirm`. - firm_blocks: Option>, + firm_blocks: Option>>, /// The channel of which this executor receives blocks for executing /// soft commitments. @@ -456,9 +454,9 @@ impl Executor { block.height = block.sequencer_height().value(), err, ))] - async fn execute_firm(&mut self, block: ReconstructedBlock) -> eyre::Result<()> { + async fn execute_firm(&mut self, block: Box) -> eyre::Result<()> { let celestia_height = block.celestia_height; - let executable_block = ExecutableBlock::from_reconstructed(block); + let executable_block = ExecutableBlock::from_reconstructed(*block); let expected_height = self.state.next_expected_firm_sequencer_height(); let block_height = executable_block.height; ensure!(