diff --git a/crates/astria-sequencer-relayer/src/relayer/mod.rs b/crates/astria-sequencer-relayer/src/relayer/mod.rs index e4e58e0cb7..6f7afb299c 100644 --- a/crates/astria-sequencer-relayer/src/relayer/mod.rs +++ b/crates/astria-sequencer-relayer/src/relayer/mod.rs @@ -83,6 +83,9 @@ use crate::{ IncludeRollup, }; +type ForwardFut<'a> = + Fuse>>>>; + pub(crate) struct Relayer { /// A token to notify relayer that it should shut down. #[expect( @@ -172,7 +175,7 @@ impl Relayer { // future to forward a sequencer block to the celestia-submission-task. // gets set in the select-loop if the task is at capacity. let mut forward_once_free: Fuse< - BoxFuture>>, + BoxFuture>>>, > = Fuse::terminated(); self.state.set_ready(); @@ -282,9 +285,7 @@ impl Relayer { block: SequencerBlock, block_stream: &mut read::BlockStream, submitter: write::BlobSubmitterHandle, - forward: &mut Fuse< - BoxFuture>>, - >, + forward: &mut ForwardFut, ) -> eyre::Result<()> { assert!( forward.is_terminated(), @@ -292,7 +293,7 @@ impl Relayer { congested and this future is in-flight", ); - if let Err(error) = submitter.try_send(block) { + if let Err(error) = submitter.try_send(Box::new(block)) { debug!( // Just print the error directly: TrySendError has no cause chain. %error, diff --git a/crates/astria-sequencer-relayer/src/relayer/write/mod.rs b/crates/astria-sequencer-relayer/src/relayer/write/mod.rs index bed9880a20..aebb0debd4 100644 --- a/crates/astria-sequencer-relayer/src/relayer/write/mod.rs +++ b/crates/astria-sequencer-relayer/src/relayer/write/mod.rs @@ -86,7 +86,7 @@ struct StartedSubmissionAndFee { #[derive(Clone)] pub(super) struct BlobSubmitterHandle { - tx: mpsc::Sender, + tx: mpsc::Sender>, } impl BlobSubmitterHandle { @@ -95,8 +95,8 @@ impl BlobSubmitterHandle { /// This is a thin wrapper around [`mpsc::Sender::try_send`]. pub(super) fn try_send( &self, - block: SequencerBlock, - ) -> Result<(), TrySendError> { + block: Box, + ) -> Result<(), TrySendError>> { self.tx.try_send(block) } @@ -105,8 +105,8 @@ impl BlobSubmitterHandle { /// This is a thin wrapper around [`mpsc::Sender::send`]. pub(super) async fn send( &self, - block: SequencerBlock, - ) -> Result<(), SendError> { + block: Box, + ) -> Result<(), SendError>> { self.tx.send(block).await } } @@ -116,7 +116,7 @@ pub(super) struct BlobSubmitter { client_builder: CelestiaClientBuilder, /// The channel over which sequencer blocks are received. - blocks: mpsc::Receiver, + blocks: mpsc::Receiver>, /// The accumulator of all data that will be submitted to Celestia on the next submission. next_submission: NextSubmission, @@ -253,7 +253,7 @@ impl BlobSubmitter { sequencer_height = %block.height(), "skipping sequencer block as already included in previous submission" )); - } else if let Err(error) = self.add_sequencer_block_to_next_submission(block) { + } else if let Err(error) = self.add_sequencer_block_to_next_submission(*block) { break Err(error).wrap_err( "critically failed adding Sequencer block to next submission" );