diff --git a/crates/astria-conductor/src/celestia/mod.rs b/crates/astria-conductor/src/celestia/mod.rs index 85146dc645..0c94659853 100644 --- a/crates/astria-conductor/src/celestia/mod.rs +++ b/crates/astria-conductor/src/celestia/mod.rs @@ -512,21 +512,23 @@ impl RunningReader { #[instrument(skip_all)] fn forward_block_to_executor(&mut self, block: ReconstructedBlock) -> eyre::Result<()> { let celestia_height = block.celestia_height; - match self.executor.try_send_firm_block(block) { + match self.executor.try_send_firm_block(Box::new(block)) { Ok(()) => self.advance_reference_celestia_height(celestia_height), Err(FirmTrySendError::Channel { - source: mpsc::error::TrySendError::Full(block), - }) => { - trace!( - "executor channel is full; rescheduling block fetch until the channel opens up" - ); - self.enqueued_block = enqueue_block(self.executor.clone(), block).boxed().fuse(); - } - - Err(FirmTrySendError::Channel { - source: mpsc::error::TrySendError::Closed(_), - }) => bail!("exiting because executor channel is closed"), - + source, + }) => match source { + mpsc::error::TrySendError::Full(block) => { + trace!( + "executor channel is full; rescheduling block fetch until the channel \ + opens up" + ); + self.enqueued_block = + enqueue_block(self.executor.clone(), *block).boxed().fuse(); + } + mpsc::error::TrySendError::Closed(_) => { + bail!("exiting because executor channel is closed"); + } + }, Err(FirmTrySendError::NotSet) => bail!( "exiting because executor was configured without firm commitments; this Celestia \ reader should have never been started" @@ -665,7 +667,7 @@ async fn enqueue_block( block: ReconstructedBlock, ) -> Result { let celestia_height = block.celestia_height; - executor.send_firm_block(block).await?; + executor.send_firm_block(Box::new(block)).await?; Ok(celestia_height) } diff --git a/crates/astria-conductor/src/executor/builder.rs b/crates/astria-conductor/src/executor/builder.rs index 62559f30b9..daf53e5835 100644 --- a/crates/astria-conductor/src/executor/builder.rs +++ b/crates/astria-conductor/src/executor/builder.rs @@ -11,6 +11,7 @@ use super::{ state, Executor, Handle, + ReconstructedBlock, StateNotInit, }; use crate::{ @@ -44,7 +45,7 @@ impl Builder { let mut firm_block_tx = None; let mut firm_block_rx = None; if mode.is_with_firm() { - let (tx, rx) = mpsc::channel(16); + let (tx, rx) = mpsc::channel::>(16); firm_block_tx = Some(tx); firm_block_rx = Some(rx); } diff --git a/crates/astria-conductor/src/executor/mod.rs b/crates/astria-conductor/src/executor/mod.rs index 8c3155ca8b..85f188ff8a 100644 --- a/crates/astria-conductor/src/executor/mod.rs +++ b/crates/astria-conductor/src/executor/mod.rs @@ -74,7 +74,7 @@ pub(crate) enum FirmSendError { #[error("failed sending blocks to executor")] Channel { #[from] - source: mpsc::error::SendError, + source: mpsc::error::SendError>, }, } @@ -85,7 +85,7 @@ pub(crate) enum FirmTrySendError { #[error("failed sending blocks to executor")] Channel { #[from] - source: mpsc::error::TrySendError, + source: mpsc::error::TrySendError>, }, } @@ -115,7 +115,7 @@ pub(crate) enum SoftTrySendError { /// information. #[derive(Debug, Clone)] pub(crate) struct Handle { - firm_blocks: Option>, + firm_blocks: Option>>, soft_blocks: Option>, state: StateReceiver, _state_init: TStateInit, @@ -146,20 +146,18 @@ impl Handle { #[instrument(skip_all, err)] pub(crate) async fn send_firm_block( self, - block: ReconstructedBlock, + block: Box, ) -> Result<(), FirmSendError> { let sender = self.firm_blocks.as_ref().ok_or(FirmSendError::NotSet)?; - sender.send(block).await?; - Ok(()) + Ok(sender.send(block).await?) } pub(crate) fn try_send_firm_block( &self, - block: ReconstructedBlock, + block: Box, ) -> Result<(), FirmTrySendError> { let sender = self.firm_blocks.as_ref().ok_or(FirmTrySendError::NotSet)?; - sender.try_send(block)?; - Ok(()) + Ok(sender.try_send(block)?) } #[instrument(skip_all, err)] @@ -226,7 +224,7 @@ pub(crate) struct Executor { /// The channel of which this executor receives blocks for executing /// firm commitments. /// Only set if `mode` is `FirmOnly` or `SoftAndFirm`. - firm_blocks: Option>, + firm_blocks: Option>>, /// The channel of which this executor receives blocks for executing /// soft commitments. @@ -456,9 +454,9 @@ impl Executor { block.height = block.sequencer_height().value(), err, ))] - async fn execute_firm(&mut self, block: ReconstructedBlock) -> eyre::Result<()> { + async fn execute_firm(&mut self, block: Box) -> eyre::Result<()> { let celestia_height = block.celestia_height; - let executable_block = ExecutableBlock::from_reconstructed(block); + let executable_block = ExecutableBlock::from_reconstructed(*block); let expected_height = self.state.next_expected_firm_sequencer_height(); let block_height = executable_block.height; ensure!(