Skip to content

Commit

Permalink
doc strings and minor cleanups
Browse files Browse the repository at this point in the history
  • Loading branch information
itamarreif committed Nov 8, 2024
1 parent de15c44 commit 2f17c08
Show file tree
Hide file tree
Showing 10 changed files with 319 additions and 187 deletions.
5 changes: 4 additions & 1 deletion crates/astria-auctioneer/src/auction/allocation_rule.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
//! The allocation rule is the mechanism by which the auction processes incoming bids and determines
//! the winner.
use super::Bundle;

pub(super) struct FirstPrice {
Expand All @@ -23,7 +25,8 @@ impl FirstPrice {
}
}

pub(crate) fn highest_bid(self) -> Option<Bundle> {
/// Returns the winner of the auction, if one exists.
pub(crate) fn winner(self) -> Option<Bundle> {
self.highest_bid
}
}
19 changes: 5 additions & 14 deletions crates/astria-auctioneer/src/auction/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,7 @@ use astria_core::{
RollupId,
},
};
use tokio::sync::{
mpsc,
oneshot,
};
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;

use super::{
Expand Down Expand Up @@ -59,20 +56,16 @@ impl Builder {
sequencer_chain_id,
} = self;

let (executed_block_tx, executed_block_rx) = oneshot::channel();
let (block_commitment_tx, block_commitment_rx) = oneshot::channel();
let (reorg_tx, reorg_rx) = oneshot::channel();
// TODO: get the capacity from config or something instead of using a magic number
// TODO: get the capacities from config or something instead of using a magic number
let (commands_tx, commands_rx) = mpsc::channel(16);
let (new_bundles_tx, new_bundles_rx) = mpsc::channel(16);

let auction = Auction {
metrics,
shutdown_token,
sequencer_grpc_client,
sequencer_abci_client,
start_processing_bids_rx: executed_block_rx,
start_timer_rx: block_commitment_rx,
abort_rx: reorg_rx,
commands_rx,
new_bundles_rx,
auction_id,
latency_margin,
Expand All @@ -85,9 +78,7 @@ impl Builder {
(
Handle {
new_bundles_tx,
start_processing_bids_tx: Some(executed_block_tx),
start_timer_tx: Some(block_commitment_tx),
abort_tx: Some(reorg_tx),
commands_tx,
},
auction,
)
Expand Down
16 changes: 9 additions & 7 deletions crates/astria-auctioneer/src/auction/manager.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
/// The auction Manager is responsible for managing running auction futures and their
/// associated handles.
use std::collections::HashMap;

use astria_core::{
Expand Down Expand Up @@ -110,7 +112,6 @@ pub(crate) struct Manager {
latency_margin: std::time::Duration,
running_auctions: JoinMap<Id, eyre::Result<()>>,
auction_handles: HashMap<Id, Handle>,
// TODO: hold the bundle stream here?
sequencer_key: SequencerKey,
fee_asset_denomination: asset::Denom,
sequencer_chain_id: String,
Expand Down Expand Up @@ -140,13 +141,14 @@ impl Manager {
}

pub(crate) fn abort_auction(&mut self, auction_id: Id) -> eyre::Result<()> {
// TODO: this should return an option in case the auction returned before being aborted
let handle = self
.auction_handles
.get_mut(&auction_id)
.ok_or_eyre("unable to get handle for the given auction")?;

handle.abort().expect("should only abort once per auction");
handle
.try_abort()
.wrap_err("failed to send command to abort auction")?;
Ok(())
}

Expand All @@ -159,7 +161,7 @@ impl Manager {

handle
.start_timer()
.expect("should only start timer once per auction");
.wrap_err("failed to send command to start timer to auction")?;

Ok(())
}
Expand All @@ -173,7 +175,7 @@ impl Manager {

handle
.start_processing_bids()
.expect("should only start processing bids once per auction");
.wrap_err("failed to send command to start processing bids")?;
Ok(())
}

Expand All @@ -187,10 +189,10 @@ impl Manager {

pub(crate) async fn join_next(&mut self) -> Option<(Id, eyre::Result<()>)> {
if let Some((auction_id, result)) = self.running_auctions.join_next().await {
// TODO: get rid of this expect?
// TODO: get rid of this expect somehow
self.auction_handles
.remove(&auction_id)
.expect("unable to get handle for the given auction");
.expect("handle should always exist for running auction");

Some((auction_id, flatten_result(result)))
} else {
Expand Down
149 changes: 86 additions & 63 deletions crates/astria-auctioneer/src/auction/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,42 @@
//! The Auction is repsonsible for running an auction for a given block. An auction advances through
//! the following states, controlled via the `commands_rx` channel received:
//! 1. The auction is initialized but not yet started (i.e. no commands have been received).
//! 2. After receiving a `Command::StartProcessingBids`, the auction will start processing incoming
//! bundles from `new_bundles_rx`.
//! 3. After receiving a `Command::StartTimer`, the auction will set a timer for `latency_margin`
//! (denominated in milliseconds).
//! 4. Once the timer expires, the auction will choose a winner based on its `AllocationRule` and
//! submit it to the sequencer.
//!
//! ## Aborting an Auction
//! The auction may also be aborted at any point before the timer expires by receiving a
//! `Command::Abort`. This will cause the auction to return early without submitting a winner,
//! effectively discarding any bundles that were processed.
//! This is used for leveraging optimsitic execution, running an auction for block data that has
//! been proposed in the sequencer network's CometBFT but not yet finalized.
//! We assume that most proposals are adopted in CometBFT, allowing us to buy a few hundred
//! milliseconds before they are finalized. However, if multiple rounds of voting invalidate a
//! proposal, we can abort the auction and avoid submitting a potentially invalid bundle. In this
//! case, the auction will abort and a new one will be created for the newly processed proposal
//! (which will be received by the Optimistic Executor via the optimistic block stream).
//!
//! ## Auction Result
//! The auction result is a `Bundle` that is signed by the Auctioneer and submitted to the rollup
//! via the sequencer. The rollup defines a trusted Auctioneer address that it allows to submit
//! bundles, and thus must verify the Auctioneer's signature over this bundle.
//!
//! Since the sequencer does not include the transaction signer's metadata with the `RollupData`
//! events that it saves in its block data, the Auctioneer must include this metadata in its
//! `RollupDataSubmission`s. This is done by wrapping the winning `Bundle` object in an
//! `AuctionResult` object, which is then serialized into the `RollupDataSubmission`.
//!
//! ## Submission to Sequencer
//! The auction will submit the winning bundle to the sequencer via the `broadcast_tx_sync` ABCI(?)
//! endpoint.
//! In order to save time on fetching a nonce, the auction will fetch the next pending nonce as soon
//! as it received the signal to start the timer. This corresponds to the sequencer block being
//! committed, thus providing the latest pending nonce.
mod builder;
use std::time::Duration;

Expand All @@ -17,7 +56,6 @@ use astria_eyre::eyre::{
self,
eyre,
Context,
ContextCompat,
OptionExt as _,
};
pub(crate) use builder::Builder;
Expand All @@ -29,10 +67,7 @@ use sequencer_client::{
use telemetry::display::base64;
use tokio::{
select,
sync::{
mpsc,
oneshot,
},
sync::mpsc,
};
use tokio_util::sync::CancellationToken;
use tracing::{
Expand Down Expand Up @@ -79,37 +114,32 @@ enum Command {

pub(crate) struct Handle {
commands_tx: mpsc::Sender<Command>,
start_processing_bids_tx: Option<oneshot::Sender<()>>,
start_timer_tx: Option<oneshot::Sender<()>>,
abort_tx: Option<oneshot::Sender<()>>,
new_bundles_tx: mpsc::Sender<Bundle>,
}

impl Handle {
pub(crate) fn abort(&mut self) -> eyre::Result<()> {
pub(crate) fn try_abort(&mut self) -> eyre::Result<()> {
let _ = self
.abort_tx
.take()
.ok_or_eyre("should only send reorg signal to a given auction once")?;
.commands_tx
.try_send(Command::Abort)
.wrap_err("unable to send abort command to auction")?;

Ok(())
}

pub(crate) fn start_processing_bids(&mut self) -> eyre::Result<()> {
let _ = self
.start_processing_bids_tx
.take()
.ok_or_eyre("should only send executed signal to a given auction once")?
.send(());
.commands_tx
.try_send(Command::StartProcessingBids)
.wrap_err("unable to send command to start processing bids to auction")?;
Ok(())
}

pub(crate) fn start_timer(&mut self) -> eyre::Result<()> {
let _ = self
.start_timer_tx
.take()
.ok_or_eyre("should only send block commitment signal to a given auction once")?
.send(());
.commands_tx
.try_send(Command::StartTimer)
.wrap_err("unable to send command to start time to auction")?;

Ok(())
}
Expand All @@ -123,7 +153,6 @@ impl Handle {
}
}

// TODO: should this be the same object as the auction?
pub(crate) struct Auction {
#[allow(dead_code)]
metrics: &'static Metrics,
Expand All @@ -133,12 +162,8 @@ pub(crate) struct Auction {
sequencer_grpc_client: SequencerServiceClient<tonic::transport::Channel>,
/// The sequencer's ABCI client, used for submitting transactions
sequencer_abci_client: sequencer_client::HttpClient,
/// Channel for receiving the executed block signal to start processing bundles
start_processing_bids_rx: oneshot::Receiver<()>,
/// Channel for receiving the block commitment signal to start the latency margin timer
start_timer_rx: oneshot::Receiver<()>,
/// Channel for receiving the reorg signal
abort_rx: oneshot::Receiver<()>,
/// Channel for receiving commands sent via the handle
commands_rx: mpsc::Receiver<Command>,
/// Channel for receiving new bundles
new_bundles_rx: mpsc::Receiver<Bundle>,
/// The time between receiving a block commitment
Expand All @@ -158,6 +183,7 @@ pub(crate) struct Auction {
impl Auction {
pub(crate) async fn run(mut self) -> eyre::Result<()> {
let mut latency_margin_timer = None;
// TODO: do we want to make this configurable to allow for more complex allocation rules?
let mut allocation_rule = FirstPrice::new();
let mut auction_is_open = false;

Expand All @@ -169,46 +195,40 @@ impl Auction {

() = self.shutdown_token.cancelled() => break Err(eyre!("received shutdown signal")),

signal = &mut self.abort_rx => {
match signal {
Ok(()) => {
break Err(eyre!("reorg signal received"))
}
Err(_) => {
return Err(eyre!("reorg signal channel closed"));
}
}
//
}

// get the auction winner when the timer expires
_ = async { latency_margin_timer.as_mut().unwrap() }, if latency_margin_timer.is_some() => {
break Ok(allocation_rule.highest_bid());
break Ok(allocation_rule.winner());
}

signal = &mut self.start_processing_bids_rx, if !auction_is_open => {
if let Err(e) = signal {
break Err(e).wrap_err("exec signal channel closed");
}
// set auction to open so it starts collecting bids
auction_is_open = true;
}

signal = &mut self.start_timer_rx, if auction_is_open => {
if let Err(e) = signal {
break Err(e).wrap_err("commit signal channel closed");
Some(cmd) = self.commands_rx.recv() => {
match cmd {
Command::Abort => {
// abort the auction early
// TODO: should this be an error?
break Err(eyre!("auction {id} received abort signal", id = base64(&self.auction_id)));
},
Command::StartProcessingBids => {
if auction_is_open {
break Err(eyre!("auction received signal to start processing bids twice"));
}
auction_is_open = true;
},
Command::StartTimer => {
if !auction_is_open {
break Err(eyre!("auction received signal to start timer before signal to start processing bids"));
}

// set the timer
latency_margin_timer = Some(tokio::time::sleep(self.latency_margin));

// we wait for commit because we want the pending nonce from the committed block
nonce_fetch = {
let client = self.sequencer_grpc_client.clone();
let address = self.sequencer_key.address().clone();
Some(tokio::task::spawn(async move { get_pending_nonce(client, address, self.metrics).await }))
};
}
}
// set the timer
latency_margin_timer = Some(tokio::time::sleep(self.latency_margin));

let client = self.sequencer_grpc_client.clone();
let address = self.sequencer_key.address().clone();

// we wait for commit because we want the pending nonce from after the commit
// TODO: fix lifetime issue with passing metrics here?
nonce_fetch = Some(tokio::task::spawn(async move {
get_pending_nonce(client, address).await
}));
}

Some(bundle) = self.new_bundles_rx.recv(), if auction_is_open => {
Expand Down Expand Up @@ -284,6 +304,8 @@ impl Auction {
async fn get_pending_nonce(
client: SequencerServiceClient<tonic::transport::Channel>,
address: Address,
// TODO: emit metrics here
#[allow(unused_variables)] metrics: &'static Metrics,
) -> eyre::Result<u32> {
let span = tracing::Span::current();
let retry_cfg = tryhard::RetryFutureConfig::new(1024)
Expand Down Expand Up @@ -330,7 +352,8 @@ async fn get_pending_nonce(
async fn submit_transaction(
client: sequencer_client::HttpClient,
transaction: Transaction,
_metrics: &'static Metrics,
// TODO: emit metrics here
#[allow(unused_variables)] metrics: &'static Metrics,
) -> eyre::Result<tx_sync::Response> {
let span = tracing::Span::current();
let retry_cfg = tryhard::RetryFutureConfig::new(1024)
Expand Down
Loading

0 comments on commit 2f17c08

Please sign in to comment.