Skip to content

Commit

Permalink
requested changes
Browse files Browse the repository at this point in the history
  • Loading branch information
ethanoroshiba committed Dec 9, 2024
1 parent a7e6ef5 commit d113119
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 12 deletions.
13 changes: 9 additions & 4 deletions crates/astria-sequencer-relayer/src/relayer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ impl Relayer {
// future to forward a sequencer block to the celestia-submission-task.
// gets set in the select-loop if the task is at capacity.
let mut forward_once_free: Fuse<
BoxFuture<Result<(), tokio::sync::mpsc::error::SendError<SequencerBlock>>>,
BoxFuture<Result<(), tokio::sync::mpsc::error::SendError<Box<SequencerBlock>>>>,
> = Fuse::terminated();

self.state.set_ready();
Expand Down Expand Up @@ -276,14 +276,19 @@ impl Relayer {
}

#[instrument(skip_all, fields(%height))]
#[expect(
clippy::type_complexity,
reason = "complexity comes from the error type, which is boxed to avoid a large Err \
variant of the result"
)]
fn forward_block_for_submission(
&self,
height: SequencerHeight,
block: SequencerBlock,
block_stream: &mut read::BlockStream,
submitter: write::BlobSubmitterHandle,
forward: &mut Fuse<
BoxFuture<Result<(), tokio::sync::mpsc::error::SendError<SequencerBlock>>>,
BoxFuture<Result<(), tokio::sync::mpsc::error::SendError<Box<SequencerBlock>>>>,
>,
) -> eyre::Result<()> {
assert!(
Expand All @@ -292,7 +297,7 @@ impl Relayer {
congested and this future is in-flight",
);

if let Err(error) = submitter.try_send(block) {
if let Err(error) = submitter.try_send(Box::new(block)) {
debug!(
// Just print the error directly: TrySendError has no cause chain.
%error,
Expand All @@ -302,7 +307,7 @@ impl Relayer {
block_stream.pause();
debug!("block stream paused");

match *error {
match error {
TrySendError::Full(block) => {
*forward = async move { submitter.send(block).await }.boxed().fuse();
}
Expand Down
16 changes: 8 additions & 8 deletions crates/astria-sequencer-relayer/src/relayer/write/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ struct StartedSubmissionAndFee {

#[derive(Clone)]
pub(super) struct BlobSubmitterHandle {
tx: mpsc::Sender<SequencerBlock>,
tx: mpsc::Sender<Box<SequencerBlock>>,
}

impl BlobSubmitterHandle {
Expand All @@ -95,18 +95,18 @@ impl BlobSubmitterHandle {
/// This is a thin wrapper around [`mpsc::Sender::try_send`].
pub(super) fn try_send(
&self,
block: SequencerBlock,
) -> Result<(), Box<TrySendError<SequencerBlock>>> {
self.tx.try_send(block).map_err(Box::new)
block: Box<SequencerBlock>,
) -> Result<(), TrySendError<Box<SequencerBlock>>> {
self.tx.try_send(block)
}

/// Sends a block to the blob submitter.
///
/// This is a thin wrapper around [`mpsc::Sender::send`].
pub(super) async fn send(
&self,
block: SequencerBlock,
) -> Result<(), SendError<SequencerBlock>> {
block: Box<SequencerBlock>,
) -> Result<(), SendError<Box<SequencerBlock>>> {
self.tx.send(block).await
}
}
Expand All @@ -116,7 +116,7 @@ pub(super) struct BlobSubmitter {
client_builder: CelestiaClientBuilder,

/// The channel over which sequencer blocks are received.
blocks: mpsc::Receiver<SequencerBlock>,
blocks: mpsc::Receiver<Box<SequencerBlock>>,

/// The accumulator of all data that will be submitted to Celestia on the next submission.
next_submission: NextSubmission,
Expand Down Expand Up @@ -253,7 +253,7 @@ impl BlobSubmitter {
sequencer_height = %block.height(),
"skipping sequencer block as already included in previous submission"
));
} else if let Err(error) = self.add_sequencer_block_to_next_submission(block) {
} else if let Err(error) = self.add_sequencer_block_to_next_submission(*block) {
break Err(error).wrap_err(
"critically failed adding Sequencer block to next submission"
);
Expand Down

0 comments on commit d113119

Please sign in to comment.