Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(conductor): send boxed objects over channels #1865

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 16 additions & 14 deletions crates/astria-conductor/src/celestia/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{

Check warning on line 1 in crates/astria-conductor/src/celestia/mod.rs

View workflow job for this annotation

GitHub Actions / Code Freeze

Code Freeze in Effect - Bypassed

This file is under code freeze.
cmp::max,
sync::Arc,
time::Duration,
Expand Down Expand Up @@ -512,21 +512,23 @@
#[instrument(skip_all)]
fn forward_block_to_executor(&mut self, block: ReconstructedBlock) -> eyre::Result<()> {
let celestia_height = block.celestia_height;
match self.executor.try_send_firm_block(block) {
match self.executor.try_send_firm_block(Box::new(block)) {
Ok(()) => self.advance_reference_celestia_height(celestia_height),
Err(FirmTrySendError::Channel {
source: mpsc::error::TrySendError::Full(block),
}) => {
trace!(
"executor channel is full; rescheduling block fetch until the channel opens up"
);
self.enqueued_block = enqueue_block(self.executor.clone(), block).boxed().fuse();
}

Err(FirmTrySendError::Channel {
source: mpsc::error::TrySendError::Closed(_),
}) => bail!("exiting because executor channel is closed"),

source,
}) => match source {
mpsc::error::TrySendError::Full(block) => {
trace!(
"executor channel is full; rescheduling block fetch until the channel \
opens up"
);
self.enqueued_block =
enqueue_block(self.executor.clone(), *block).boxed().fuse();
}
mpsc::error::TrySendError::Closed(_) => {
bail!("exiting because executor channel is closed");
}
},
Err(FirmTrySendError::NotSet) => bail!(
"exiting because executor was configured without firm commitments; this Celestia \
reader should have never been started"
Expand Down Expand Up @@ -665,7 +667,7 @@
block: ReconstructedBlock,
) -> Result<u64, FirmSendError> {
let celestia_height = block.celestia_height;
executor.send_firm_block(block).await?;
executor.send_firm_block(Box::new(block)).await?;
Ok(celestia_height)
}

Expand Down
3 changes: 2 additions & 1 deletion crates/astria-conductor/src/executor/builder.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::collections::HashMap;

Check warning on line 1 in crates/astria-conductor/src/executor/builder.rs

View workflow job for this annotation

GitHub Actions / Code Freeze

Code Freeze in Effect - Bypassed

This file is under code freeze.

use astria_eyre::eyre::{
self,
Expand All @@ -11,6 +11,7 @@
state,
Executor,
Handle,
ReconstructedBlock,
StateNotInit,
};
use crate::{
Expand Down Expand Up @@ -44,7 +45,7 @@
let mut firm_block_tx = None;
let mut firm_block_rx = None;
if mode.is_with_firm() {
let (tx, rx) = mpsc::channel(16);
let (tx, rx) = mpsc::channel::<Box<ReconstructedBlock>>(16);
firm_block_tx = Some(tx);
firm_block_rx = Some(rx);
}
Expand Down
22 changes: 10 additions & 12 deletions crates/astria-conductor/src/executor/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::collections::HashMap;

Check warning on line 1 in crates/astria-conductor/src/executor/mod.rs

View workflow job for this annotation

GitHub Actions / Code Freeze

Code Freeze in Effect - Bypassed

This file is under code freeze.

use astria_core::{
execution::v1::{
Expand Down Expand Up @@ -74,7 +74,7 @@
#[error("failed sending blocks to executor")]
Channel {
#[from]
source: mpsc::error::SendError<ReconstructedBlock>,
source: mpsc::error::SendError<Box<ReconstructedBlock>>,
},
}

Expand All @@ -85,7 +85,7 @@
#[error("failed sending blocks to executor")]
Channel {
#[from]
source: mpsc::error::TrySendError<ReconstructedBlock>,
source: mpsc::error::TrySendError<Box<ReconstructedBlock>>,
},
}

Expand Down Expand Up @@ -115,7 +115,7 @@
/// information.
#[derive(Debug, Clone)]
pub(crate) struct Handle<TStateInit = StateNotInit> {
firm_blocks: Option<mpsc::Sender<ReconstructedBlock>>,
firm_blocks: Option<mpsc::Sender<Box<ReconstructedBlock>>>,
soft_blocks: Option<channel::Sender<FilteredSequencerBlock>>,
state: StateReceiver,
_state_init: TStateInit,
Expand Down Expand Up @@ -146,20 +146,18 @@
#[instrument(skip_all, err)]
pub(crate) async fn send_firm_block(
self,
block: ReconstructedBlock,
block: Box<ReconstructedBlock>,
) -> Result<(), FirmSendError> {
let sender = self.firm_blocks.as_ref().ok_or(FirmSendError::NotSet)?;
sender.send(block).await?;
Ok(())
Ok(sender.send(block).await?)
}

pub(crate) fn try_send_firm_block(
&self,
block: ReconstructedBlock,
block: Box<ReconstructedBlock>,
) -> Result<(), FirmTrySendError> {
let sender = self.firm_blocks.as_ref().ok_or(FirmTrySendError::NotSet)?;
sender.try_send(block)?;
Ok(())
Ok(sender.try_send(block)?)
}

#[instrument(skip_all, err)]
Expand Down Expand Up @@ -226,7 +224,7 @@
/// The channel of which this executor receives blocks for executing
/// firm commitments.
/// Only set if `mode` is `FirmOnly` or `SoftAndFirm`.
firm_blocks: Option<mpsc::Receiver<ReconstructedBlock>>,
firm_blocks: Option<mpsc::Receiver<Box<ReconstructedBlock>>>,

/// The channel of which this executor receives blocks for executing
/// soft commitments.
Expand Down Expand Up @@ -456,9 +454,9 @@
block.height = block.sequencer_height().value(),
err,
))]
async fn execute_firm(&mut self, block: ReconstructedBlock) -> eyre::Result<()> {
async fn execute_firm(&mut self, block: Box<ReconstructedBlock>) -> eyre::Result<()> {
let celestia_height = block.celestia_height;
let executable_block = ExecutableBlock::from_reconstructed(block);
let executable_block = ExecutableBlock::from_reconstructed(*block);
let expected_height = self.state.next_expected_firm_sequencer_height();
let block_height = executable_block.height;
ensure!(
Expand Down
Loading