Skip to content

Commit

Permalink
cancel auctions instead of aborting them
Browse files Browse the repository at this point in the history
  • Loading branch information
SuperFluffy committed Dec 11, 2024
1 parent 82e401a commit 779b36f
Show file tree
Hide file tree
Showing 4 changed files with 160 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,16 @@
/// associated handles.
use astria_core::{
primitive::v1::{
asset,
RollupId,
asset,
},
sequencerblock::v1::block::FilteredSequencerBlock,
};
use tokio::sync::{
mpsc,
oneshot,
};
use tokio_util::sync::CancellationToken;

use super::{
Auction,
Expand All @@ -29,6 +30,7 @@ pub(in crate::auctioneer::inner) struct Factory {
pub(in crate::auctioneer::inner) sequencer_chain_id: String,
pub(in crate::auctioneer::inner) rollup_id: RollupId,
pub(in crate::auctioneer::inner) pending_nonce: PendingNonceSubscriber,
pub(in crate::auctioneer::inner) cancellation_token: CancellationToken,
}

impl Factory {
Expand All @@ -45,6 +47,7 @@ impl Factory {
let (start_timer_tx, start_timer_rx) = oneshot::channel();
let (bundles_tx, bundles_rx) = mpsc::unbounded_channel();

let cancellation_token = self.cancellation_token.child_token();
let auction = Worker {
sequencer_abci_client: self.sequencer_abci_client.clone(),
start_bids: Some(start_bids_rx),
Expand All @@ -57,6 +60,7 @@ impl Factory {
sequencer_chain_id: self.sequencer_chain_id.clone(),
rollup_id: self.rollup_id,
pending_nonce: self.pending_nonce.clone(),
cancellation_token: cancellation_token.clone(),
};

Auction {
Expand All @@ -67,6 +71,7 @@ impl Factory {
start_bids: Some(start_bids_tx),
start_timer: Some(start_timer_tx),
bundles: bundles_tx,
cancellation_token,
worker: tokio::task::spawn(auction.run()),
}
}
Expand Down
44 changes: 38 additions & 6 deletions crates/astria-auctioneer/src/auctioneer/inner/auction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,23 +37,27 @@
//! as it received the signal to start the timer. This corresponds to the sequencer block being
//! committed, thus providing the latest pending nonce.
use std::sync::Arc;
use std::{
fmt::Display,
sync::Arc,
};

use astria_core::{
self,
sequencerblock::v1::block::BlockHash,
};
use astria_eyre::eyre::{
self,
Context,
bail,
ensure,
eyre,
Context,
};
use futures::{
Future,
FutureExt as _,
};
use sequencer_client::tendermint_rpc::endpoint::broadcast::tx_sync;
use telemetry::display::base64;
use tokio::{
sync::{
Expand All @@ -62,13 +66,13 @@ use tokio::{
},
task::JoinHandle,
};
use tokio_util::sync::CancellationToken;
use tracing::instrument;

use super::PendingNonceSubscriber;
use crate::{
block::Commitment,
bundle::Bundle,
flatten_join_result,
sequencer_key::SequencerKey,
};

Expand Down Expand Up @@ -105,14 +109,19 @@ pub(super) struct Auction {
start_bids: Option<oneshot::Sender<()>>,
start_timer: Option<oneshot::Sender<()>>,
bundles: mpsc::UnboundedSender<Arc<Bundle>>,
worker: JoinHandle<eyre::Result<()>>,
cancellation_token: CancellationToken,
worker: JoinHandle<eyre::Result<Summary>>,
}

impl Auction {
pub(super) fn abort(&self) {
self.worker.abort();
}

pub(super) fn cancel(&self) {
self.cancellation_token.cancel()
}

pub(in crate::auctioneer::inner) fn id(&self) -> &Id {
&self.id
}
Expand Down Expand Up @@ -206,13 +215,36 @@ impl Auction {
}

impl Future for Auction {
type Output = (Id, eyre::Result<()>);
type Output = (Id, Result<(), tokio::task::JoinError>);

fn poll(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
let res = std::task::ready!(self.worker.poll_unpin(cx));
std::task::Poll::Ready((self.id, flatten_join_result(res)))
std::task::Poll::Ready((self.id, res.map(|_| ())))
}
}

pub(super) enum Summary {
CancelledDuringAuction,
NoBids,
Submitted(tx_sync::Response),
}

impl Display for Summary {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Summary::CancelledDuringAuction => {
f.write_str("received cancellation signal during auction loop")
}
Summary::NoBids => f.write_str("auction finished without bids"),
Summary::Submitted(rsp) => write!(
f,
"auction winner submitted; Sequencer responed with ABCI code `{}`, log `{}`",
rsp.code.value(),
rsp.log,
),
}
}
}
152 changes: 96 additions & 56 deletions crates/astria-auctioneer/src/auctioneer/inner/auction/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,29 +43,30 @@ use std::{
};

use astria_core::primitive::v1::{
asset,
RollupId,
asset,
};
use astria_eyre::eyre::{
self,
bail,
Context,
OptionExt as _,
bail,
};
use sequencer_client::SequencerClientExt;
use tokio::{
select,
sync::oneshot,
};
use tokio_util::sync::CancellationToken;
use tracing::{
error,
Level,
info,
instrument,
};

use super::{
allocation_rule::FirstPrice,
PendingNonceSubscriber,
Summary,
allocation_rule::FirstPrice,
};
use crate::{
bundle::Bundle,
Expand All @@ -92,37 +93,119 @@ pub(super) struct Worker {
/// Rollup ID to submit the auction result to
pub(super) rollup_id: RollupId,
pub(super) pending_nonce: PendingNonceSubscriber,
pub(super) cancellation_token: CancellationToken,
}

impl Worker {
#[instrument(skip_all, fields(id = %self.id))]
pub(super) async fn run(mut self) -> eyre::Result<()> {
// FIXME: consider using Valuable for the return case.
// See this discussion: https://github.com/tokio-rs/tracing/discussions/1906
#[instrument(
skip_all,
fields(id = %self.id),
err(level = Level::WARN, Display),
ret(Display),
)]
pub(super) async fn run(mut self) -> eyre::Result<Summary> {
let Some(auction_result) = self
.cancellation_token
.clone()
.run_until_cancelled(self.run_auction_loop())
.await
else {
return Ok(Summary::CancelledDuringAuction);
};
let Some(winner) = auction_result.wrap_err("auction failed while waiting for bids")? else {
return Ok(Summary::NoBids);
};

// TODO: report the pending nonce that we ended up using.
let transaction = Arc::unwrap_or_clone(winner)
.into_transaction_body(
self.pending_nonce.get(),
self.rollup_id,
&self.sequencer_key,
self.fee_asset_denomination.clone(),
self.sequencer_chain_id,
)
.sign(self.sequencer_key.signing_key());

// NOTE: Submit fire-and-forget style. If the submission didn't make it in time,
// it's likey lost.
// TODO: We can consider providing a very tight retry mechanism. Maybe resubmit once
// if the response didn't take too long? But it's probably a bad idea to even try.
// Can we detect if a submission failed due to a bad nonce? In this case, we could
// immediately ("optimistically") submit with the most recent pending nonce (if the
// publisher updated it in the meantime) or just nonce + 1 (if it didn't yet update)?

let submission_fut = {
let client = self.sequencer_abci_client.clone();
tokio::time::timeout(Duration::from_secs(30), async move {
client
.submit_transaction_sync(transaction)
.await
.wrap_err("submission request failed")
})
};
tokio::pin!(submission_fut);

loop {
select!(
() = self.cancellation_token.clone().cancelled_owned(), if !self.cancellation_token.is_cancelled() => {
info!(
"received cancellation token while waiting for Sequencer to respond to \
transaction submission; still waiting for submission until timeout"
);
}

res = &mut submission_fut => {
break match res
.wrap_err("submission of auction winner timed out before receiving a response from Sequencer")
{
Ok(Ok(rsp)) => Ok(Summary::Submitted(rsp)),
Err(err) | Ok(Err(err)) => Err(err),
}
}
)
}
}

async fn run_auction_loop(&mut self) -> eyre::Result<Option<Arc<Bundle>>> {
let mut latency_margin_timer = None;
// TODO: do we want to make this configurable to allow for more complex allocation rules?
let mut allocation_rule = FirstPrice::new();
let mut auction_is_open = false;

let auction_result = loop {
loop {
select! {
biased;

_ = async { latency_margin_timer.as_mut().unwrap() }, if latency_margin_timer.is_some() => {
info!("timer is up; bids left unprocessed: {}", self.bundles.len());
break allocation_rule.winner();
break Ok(allocation_rule.winner());
}

Ok(()) = async { self.start_bids.as_mut().unwrap().await }, if self.start_bids.is_some() => {
let mut channel = self.start_bids.take().expect("inside an arm that that checks start_bids == Some");
let mut channel = self
.start_bids
.take()
.expect("inside an arm that that checks start_bids == Some");
channel.close();
// TODO: if the timer is already running, report how much time is left for the bids
auction_is_open = true;
}

Ok(()) = async { self.start_timer.as_mut().unwrap().await }, if self.start_timer.is_some() => {
let mut channel = self.start_timer.take().expect("inside an arm that checks start_immer == Some");
let mut channel = self
.start_timer
.take()
.expect("inside an arm that checks start_timer == Some");
channel.close();
if !auction_is_open {
info!("received signal to start the auction timer before signal to process bids; that's ok but eats into the time allotment of the auction");
info!(
"received signal to start the auction timer before signal to start \
processing bids; that's ok but eats into the time allotment of the \
auction"
);
}

// TODO: Emit an event to report start and endpoint of the auction.
Expand All @@ -135,52 +218,9 @@ impl Worker {
}

else => {
bail!("all channels are closed; this auction cannot continue")
bail!("all channels are closed; the auction cannot continue")
}
}
};

let Some(winner) = auction_result else {
info!("auction ended with no winning bid");
return Ok(());
};

// TODO: report the pending nonce that we ended up using.
let transaction = Arc::unwrap_or_clone(winner)
.into_transaction_body(
self.pending_nonce.get(),
self.rollup_id,
&self.sequencer_key,
self.fee_asset_denomination.clone(),
self.sequencer_chain_id,
)
.sign(self.sequencer_key.signing_key());

// NOTE: Submit fire-and-forget style. If the submission didn't make it in time,
// it's likey lost.
// TODO: We can consider providing a very tight retry mechanism. Maybe resubmit once
// if the response didn't take too long? But it's probably a bad idea to even try.
// Can we detect if a submission failed due to a bad nonce? In this case, we could
// immediately ("optimistically") submit with the most recent pending nonce (if the
// publisher updated it in the meantime) or just nonce + 1 (if it didn't yet update)?
match self
.sequencer_abci_client
.submit_transaction_sync(transaction)
.await
.wrap_err("submission of the auction failed; it's likely lost")
{
Ok(resp) => {
// TODO: provide tx_sync response hash? Does it have extra meaning?
info!(
response.log = %resp.log,
response.code = resp.code.value(),
"auction winner submitted to sequencer",
);
}
Err(error) => {
error!(%error, "failed to submit auction winner to sequencer; it's likely lost");
}
}
Ok(())
}
}
Loading

0 comments on commit 779b36f

Please sign in to comment.