diff --git a/crates/astria-auctioneer/src/auctioneer/inner/auction/factory.rs b/crates/astria-auctioneer/src/auctioneer/inner/auction/factory.rs index 959c5c75d0..18a5465e79 100644 --- a/crates/astria-auctioneer/src/auctioneer/inner/auction/factory.rs +++ b/crates/astria-auctioneer/src/auctioneer/inner/auction/factory.rs @@ -2,8 +2,8 @@ /// associated handles. use astria_core::{ primitive::v1::{ - asset, RollupId, + asset, }, sequencerblock::v1::block::FilteredSequencerBlock, }; @@ -11,6 +11,7 @@ use tokio::sync::{ mpsc, oneshot, }; +use tokio_util::sync::CancellationToken; use super::{ Auction, @@ -29,6 +30,7 @@ pub(in crate::auctioneer::inner) struct Factory { pub(in crate::auctioneer::inner) sequencer_chain_id: String, pub(in crate::auctioneer::inner) rollup_id: RollupId, pub(in crate::auctioneer::inner) pending_nonce: PendingNonceSubscriber, + pub(in crate::auctioneer::inner) cancellation_token: CancellationToken, } impl Factory { @@ -45,6 +47,7 @@ impl Factory { let (start_timer_tx, start_timer_rx) = oneshot::channel(); let (bundles_tx, bundles_rx) = mpsc::unbounded_channel(); + let cancellation_token = self.cancellation_token.child_token(); let auction = Worker { sequencer_abci_client: self.sequencer_abci_client.clone(), start_bids: Some(start_bids_rx), @@ -57,6 +60,7 @@ impl Factory { sequencer_chain_id: self.sequencer_chain_id.clone(), rollup_id: self.rollup_id, pending_nonce: self.pending_nonce.clone(), + cancellation_token: cancellation_token.clone(), }; Auction { @@ -67,6 +71,7 @@ impl Factory { start_bids: Some(start_bids_tx), start_timer: Some(start_timer_tx), bundles: bundles_tx, + cancellation_token, worker: tokio::task::spawn(auction.run()), } } diff --git a/crates/astria-auctioneer/src/auctioneer/inner/auction/mod.rs b/crates/astria-auctioneer/src/auctioneer/inner/auction/mod.rs index c6d6b9e9f4..3ca9275ab8 100644 --- a/crates/astria-auctioneer/src/auctioneer/inner/auction/mod.rs +++ b/crates/astria-auctioneer/src/auctioneer/inner/auction/mod.rs @@ -37,7 +37,10 @@ //! as it received the signal to start the timer. This corresponds to the sequencer block being //! committed, thus providing the latest pending nonce. -use std::sync::Arc; +use std::{ + fmt::Display, + sync::Arc, +}; use astria_core::{ self, @@ -45,15 +48,16 @@ use astria_core::{ }; use astria_eyre::eyre::{ self, + Context, bail, ensure, eyre, - Context, }; use futures::{ Future, FutureExt as _, }; +use sequencer_client::tendermint_rpc::endpoint::broadcast::tx_sync; use telemetry::display::base64; use tokio::{ sync::{ @@ -62,13 +66,13 @@ use tokio::{ }, task::JoinHandle, }; +use tokio_util::sync::CancellationToken; use tracing::instrument; use super::PendingNonceSubscriber; use crate::{ block::Commitment, bundle::Bundle, - flatten_join_result, sequencer_key::SequencerKey, }; @@ -105,7 +109,8 @@ pub(super) struct Auction { start_bids: Option>, start_timer: Option>, bundles: mpsc::UnboundedSender>, - worker: JoinHandle>, + cancellation_token: CancellationToken, + worker: JoinHandle>, } impl Auction { @@ -113,6 +118,10 @@ impl Auction { self.worker.abort(); } + pub(super) fn cancel(&self) { + self.cancellation_token.cancel() + } + pub(in crate::auctioneer::inner) fn id(&self) -> &Id { &self.id } @@ -206,13 +215,36 @@ impl Auction { } impl Future for Auction { - type Output = (Id, eyre::Result<()>); + type Output = (Id, Result<(), tokio::task::JoinError>); fn poll( mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> std::task::Poll { let res = std::task::ready!(self.worker.poll_unpin(cx)); - std::task::Poll::Ready((self.id, flatten_join_result(res))) + std::task::Poll::Ready((self.id, res.map(|_| ()))) + } +} + +pub(super) enum Summary { + CancelledDuringAuction, + NoBids, + Submitted(tx_sync::Response), +} + +impl Display for Summary { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Summary::CancelledDuringAuction => { + f.write_str("received cancellation signal during auction loop") + } + Summary::NoBids => f.write_str("auction finished without bids"), + Summary::Submitted(rsp) => write!( + f, + "auction winner submitted; Sequencer responed with ABCI code `{}`, log `{}`", + rsp.code.value(), + rsp.log, + ), + } } } diff --git a/crates/astria-auctioneer/src/auctioneer/inner/auction/worker.rs b/crates/astria-auctioneer/src/auctioneer/inner/auction/worker.rs index 7f1bd5c179..891c619143 100644 --- a/crates/astria-auctioneer/src/auctioneer/inner/auction/worker.rs +++ b/crates/astria-auctioneer/src/auctioneer/inner/auction/worker.rs @@ -43,29 +43,30 @@ use std::{ }; use astria_core::primitive::v1::{ - asset, RollupId, + asset, }; use astria_eyre::eyre::{ self, - bail, Context, - OptionExt as _, + bail, }; use sequencer_client::SequencerClientExt; use tokio::{ select, sync::oneshot, }; +use tokio_util::sync::CancellationToken; use tracing::{ - error, + Level, info, instrument, }; use super::{ - allocation_rule::FirstPrice, PendingNonceSubscriber, + Summary, + allocation_rule::FirstPrice, }; use crate::{ bundle::Bundle, @@ -92,37 +93,119 @@ pub(super) struct Worker { /// Rollup ID to submit the auction result to pub(super) rollup_id: RollupId, pub(super) pending_nonce: PendingNonceSubscriber, + pub(super) cancellation_token: CancellationToken, } impl Worker { - #[instrument(skip_all, fields(id = %self.id))] - pub(super) async fn run(mut self) -> eyre::Result<()> { + // FIXME: consider using Valuable for the return case. + // See this discussion: https://github.com/tokio-rs/tracing/discussions/1906 + #[instrument( + skip_all, + fields(id = %self.id), + err(level = Level::WARN, Display), + ret(Display), + )] + pub(super) async fn run(mut self) -> eyre::Result { + let Some(auction_result) = self + .cancellation_token + .clone() + .run_until_cancelled(self.run_auction_loop()) + .await + else { + return Ok(Summary::CancelledDuringAuction); + }; + let Some(winner) = auction_result.wrap_err("auction failed while waiting for bids")? else { + return Ok(Summary::NoBids); + }; + + // TODO: report the pending nonce that we ended up using. + let transaction = Arc::unwrap_or_clone(winner) + .into_transaction_body( + self.pending_nonce.get(), + self.rollup_id, + &self.sequencer_key, + self.fee_asset_denomination.clone(), + self.sequencer_chain_id, + ) + .sign(self.sequencer_key.signing_key()); + + // NOTE: Submit fire-and-forget style. If the submission didn't make it in time, + // it's likey lost. + // TODO: We can consider providing a very tight retry mechanism. Maybe resubmit once + // if the response didn't take too long? But it's probably a bad idea to even try. + // Can we detect if a submission failed due to a bad nonce? In this case, we could + // immediately ("optimistically") submit with the most recent pending nonce (if the + // publisher updated it in the meantime) or just nonce + 1 (if it didn't yet update)? + + let submission_fut = { + let client = self.sequencer_abci_client.clone(); + tokio::time::timeout(Duration::from_secs(30), async move { + client + .submit_transaction_sync(transaction) + .await + .wrap_err("submission request failed") + }) + }; + tokio::pin!(submission_fut); + + loop { + select!( + () = self.cancellation_token.clone().cancelled_owned(), if !self.cancellation_token.is_cancelled() => { + info!( + "received cancellation token while waiting for Sequencer to respond to \ + transaction submission; still waiting for submission until timeout" + ); + } + + res = &mut submission_fut => { + break match res + .wrap_err("submission of auction winner timed out before receiving a response from Sequencer") + { + Ok(Ok(rsp)) => Ok(Summary::Submitted(rsp)), + Err(err) | Ok(Err(err)) => Err(err), + } + } + ) + } + } + + async fn run_auction_loop(&mut self) -> eyre::Result>> { let mut latency_margin_timer = None; // TODO: do we want to make this configurable to allow for more complex allocation rules? let mut allocation_rule = FirstPrice::new(); let mut auction_is_open = false; - let auction_result = loop { + loop { select! { biased; _ = async { latency_margin_timer.as_mut().unwrap() }, if latency_margin_timer.is_some() => { info!("timer is up; bids left unprocessed: {}", self.bundles.len()); - break allocation_rule.winner(); + break Ok(allocation_rule.winner()); } Ok(()) = async { self.start_bids.as_mut().unwrap().await }, if self.start_bids.is_some() => { - let mut channel = self.start_bids.take().expect("inside an arm that that checks start_bids == Some"); + let mut channel = self + .start_bids + .take() + .expect("inside an arm that that checks start_bids == Some"); channel.close(); // TODO: if the timer is already running, report how much time is left for the bids auction_is_open = true; } Ok(()) = async { self.start_timer.as_mut().unwrap().await }, if self.start_timer.is_some() => { - let mut channel = self.start_timer.take().expect("inside an arm that checks start_immer == Some"); + let mut channel = self + .start_timer + .take() + .expect("inside an arm that checks start_timer == Some"); channel.close(); if !auction_is_open { - info!("received signal to start the auction timer before signal to process bids; that's ok but eats into the time allotment of the auction"); + info!( + "received signal to start the auction timer before signal to start \ + processing bids; that's ok but eats into the time allotment of the \ + auction" + ); } // TODO: Emit an event to report start and endpoint of the auction. @@ -135,52 +218,9 @@ impl Worker { } else => { - bail!("all channels are closed; this auction cannot continue") + bail!("all channels are closed; the auction cannot continue") } } - }; - - let Some(winner) = auction_result else { - info!("auction ended with no winning bid"); - return Ok(()); - }; - - // TODO: report the pending nonce that we ended up using. - let transaction = Arc::unwrap_or_clone(winner) - .into_transaction_body( - self.pending_nonce.get(), - self.rollup_id, - &self.sequencer_key, - self.fee_asset_denomination.clone(), - self.sequencer_chain_id, - ) - .sign(self.sequencer_key.signing_key()); - - // NOTE: Submit fire-and-forget style. If the submission didn't make it in time, - // it's likey lost. - // TODO: We can consider providing a very tight retry mechanism. Maybe resubmit once - // if the response didn't take too long? But it's probably a bad idea to even try. - // Can we detect if a submission failed due to a bad nonce? In this case, we could - // immediately ("optimistically") submit with the most recent pending nonce (if the - // publisher updated it in the meantime) or just nonce + 1 (if it didn't yet update)? - match self - .sequencer_abci_client - .submit_transaction_sync(transaction) - .await - .wrap_err("submission of the auction failed; it's likely lost") - { - Ok(resp) => { - // TODO: provide tx_sync response hash? Does it have extra meaning? - info!( - response.log = %resp.log, - response.code = resp.code.value(), - "auction winner submitted to sequencer", - ); - } - Err(error) => { - error!(%error, "failed to submit auction winner to sequencer; it's likely lost"); - } } - Ok(()) } } diff --git a/crates/astria-auctioneer/src/auctioneer/inner/mod.rs b/crates/astria-auctioneer/src/auctioneer/inner/mod.rs index 986dabfe81..d97af156d8 100644 --- a/crates/astria-auctioneer/src/auctioneer/inner/mod.rs +++ b/crates/astria-auctioneer/src/auctioneer/inner/mod.rs @@ -12,14 +12,14 @@ use astria_core::{ }; use astria_eyre::eyre::{ self, - bail, OptionExt as _, WrapErr as _, + bail, }; use futures::{ - stream::FuturesUnordered, Future, StreamExt as _, + stream::FuturesUnordered, }; use tokio::{ select, @@ -27,17 +27,19 @@ use tokio::{ }; use tokio_util::sync::CancellationToken; use tracing::{ + Level, + Span, error, field, info, info_span, instrument, warn, - Level, - Span, }; use crate::{ + Config, + Metrics, rollup_channel::{ BundleStream, ExecuteOptimisticBlockStream, @@ -48,8 +50,6 @@ use crate::{ SequencerChannel, }, sequencer_key::SequencerKey, - Config, - Metrics, }; mod auction; @@ -116,6 +116,7 @@ impl Inner { sequencer_chain_id, rollup_id, pending_nonce: pending_nonce.subscribe(), + cancellation_token: shutdown_token.child_token(), }; Ok(Self { @@ -198,26 +199,30 @@ impl Inner { /// Handles the result of an auction running to completion. /// - /// This method only exists to emit the auction result (only error right now) under a span. + /// This method only exists to ensure that panicking auctions receive an event. + /// It is assumed that auctions that ran to completion (returnin a success or failure) + /// will emit an event in their own span. #[instrument(skip_all, fields(%auction_id), err)] fn handle_completed_auction( &mut self, auction_id: auction::Id, - res: eyre::Result<()>, - ) -> eyre::Result<()> { + res: Result<(), tokio::task::JoinError>, + ) -> Result<(), tokio::task::JoinError> { self.running_auction.take(); res } /// Handles the result of cancelled auctions. /// - /// This method only exists to emit the auction result (only error right now) under a span. + /// This method only exists to ensure that panicking auctions receive an event. + /// It is assumed that auctions that ran to completion (returnin a success or failure) + /// will emit an event in their own span. #[instrument(skip_all, fields(%auction_id), err(level = Level::INFO))] fn handle_cancelled_auction( &self, auction_id: auction::Id, - res: eyre::Result<()>, - ) -> eyre::Result<()> { + res: Result<(), tokio::task::JoinError>, + ) -> Result<(), tokio::task::JoinError> { res } @@ -235,8 +240,8 @@ impl Inner { info!(auction_id = %new_auction.id(), "started new auction"); if let Some(old_auction) = self.running_auction.replace(new_auction) { - old_auction.abort(); - info!(auction_id = %old_auction.id(), "cancelled old auction"); + old_auction.cancel(); + info!(auction_id = %old_auction.id(), "cancelled running auction"); self.cancelled_auctions.push(old_auction); } @@ -378,8 +383,8 @@ impl PendingNoncePublisher { fn new(channel: SequencerChannel, address: Address) -> Self { use tokio::time::{ - timeout_at, MissedTickBehavior, + timeout_at, }; // TODO: make this configurable. Right now they assume a Sequencer block time of 2s, // so this is fetching nonce up to 4 times a block.