Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(sequencer-relayer): change blob submitter to use boxed blocks #1863

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions crates/astria-sequencer-relayer/src/relayer/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{

Check warning on line 1 in crates/astria-sequencer-relayer/src/relayer/mod.rs

View workflow job for this annotation

GitHub Actions / Code Freeze

Code Freeze in Effect - Bypassed

This file is under code freeze.
path::PathBuf,
sync::Arc,
time::Duration,
Expand Down Expand Up @@ -83,6 +83,9 @@
IncludeRollup,
};

type ForwardFut<'a> =
Fuse<BoxFuture<'a, Result<(), tokio::sync::mpsc::error::SendError<Box<SequencerBlock>>>>>;

pub(crate) struct Relayer {
/// A token to notify relayer that it should shut down.
#[expect(
Expand Down Expand Up @@ -172,7 +175,7 @@
// future to forward a sequencer block to the celestia-submission-task.
// gets set in the select-loop if the task is at capacity.
let mut forward_once_free: Fuse<
BoxFuture<Result<(), tokio::sync::mpsc::error::SendError<SequencerBlock>>>,
BoxFuture<Result<(), tokio::sync::mpsc::error::SendError<Box<SequencerBlock>>>>,
> = Fuse::terminated();

self.state.set_ready();
Expand Down Expand Up @@ -282,17 +285,15 @@
block: SequencerBlock,
block_stream: &mut read::BlockStream,
submitter: write::BlobSubmitterHandle,
forward: &mut Fuse<
BoxFuture<Result<(), tokio::sync::mpsc::error::SendError<SequencerBlock>>>,
>,
forward: &mut ForwardFut,
) -> eyre::Result<()> {
assert!(
forward.is_terminated(),
"block stream must be paused and not yield blocks when the blob submitter is \
congested and this future is in-flight",
);

if let Err(error) = submitter.try_send(block) {
if let Err(error) = submitter.try_send(Box::new(block)) {
debug!(
// Just print the error directly: TrySendError has no cause chain.
%error,
Expand Down
14 changes: 7 additions & 7 deletions crates/astria-sequencer-relayer/src/relayer/write/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
//! A task writing sequencer blocks to Celestia.

Check warning on line 1 in crates/astria-sequencer-relayer/src/relayer/write/mod.rs

View workflow job for this annotation

GitHub Actions / Code Freeze

Code Freeze in Effect - Bypassed

This file is under code freeze.
//!
//! [`BlobSubmitter`] receives [`SequencerBlock`]s over a channel,
//! converts them to Celestia [`Blob`]s, and writes them to Celestia
Expand Down Expand Up @@ -86,7 +86,7 @@

#[derive(Clone)]
pub(super) struct BlobSubmitterHandle {
tx: mpsc::Sender<SequencerBlock>,
tx: mpsc::Sender<Box<SequencerBlock>>,
}

impl BlobSubmitterHandle {
Expand All @@ -95,8 +95,8 @@
/// This is a thin wrapper around [`mpsc::Sender::try_send`].
pub(super) fn try_send(
&self,
block: SequencerBlock,
) -> Result<(), TrySendError<SequencerBlock>> {
block: Box<SequencerBlock>,
) -> Result<(), TrySendError<Box<SequencerBlock>>> {
self.tx.try_send(block)
}

Expand All @@ -105,8 +105,8 @@
/// This is a thin wrapper around [`mpsc::Sender::send`].
pub(super) async fn send(
&self,
block: SequencerBlock,
) -> Result<(), SendError<SequencerBlock>> {
block: Box<SequencerBlock>,
) -> Result<(), SendError<Box<SequencerBlock>>> {
self.tx.send(block).await
}
}
Expand All @@ -116,7 +116,7 @@
client_builder: CelestiaClientBuilder,

/// The channel over which sequencer blocks are received.
blocks: mpsc::Receiver<SequencerBlock>,
blocks: mpsc::Receiver<Box<SequencerBlock>>,

/// The accumulator of all data that will be submitted to Celestia on the next submission.
next_submission: NextSubmission,
Expand Down Expand Up @@ -253,7 +253,7 @@
sequencer_height = %block.height(),
"skipping sequencer block as already included in previous submission"
));
} else if let Err(error) = self.add_sequencer_block_to_next_submission(block) {
} else if let Err(error) = self.add_sequencer_block_to_next_submission(*block) {
break Err(error).wrap_err(
"critically failed adding Sequencer block to next submission"
);
Expand Down
Loading