From 58ac5fd2828e554365b7279a98946d6adb797587 Mon Sep 17 00:00:00 2001 From: itamar Date: Mon, 4 Nov 2024 19:14:52 -0500 Subject: [PATCH] transaction submission logic --- crates/astria-auctioneer/src/auction/mod.rs | 154 ++++++++++++------ .../astria-auctioneer/src/auctioneer/inner.rs | 1 - .../src/block/executed_stream.rs | 1 + crates/astria-auctioneer/src/bundle/mod.rs | 3 + 4 files changed, 104 insertions(+), 55 deletions(-) diff --git a/crates/astria-auctioneer/src/auction/mod.rs b/crates/astria-auctioneer/src/auction/mod.rs index af89c3a517..01a51a0d0b 100644 --- a/crates/astria-auctioneer/src/auction/mod.rs +++ b/crates/astria-auctioneer/src/auction/mod.rs @@ -20,7 +20,11 @@ use astria_eyre::eyre::{ OptionExt as _, }; pub(crate) use builder::Builder; -use sequencer_client::Address; +use sequencer_client::{ + tendermint_rpc::endpoint::broadcast::tx_sync, + Address, + SequencerClientExt, +}; use telemetry::display::base64; use tokio::{ select, @@ -31,13 +35,12 @@ use tokio::{ }; use tokio_util::sync::CancellationToken; use tracing::{ + error, info, instrument, warn, Instrument, - Span, }; -use tryhard::backoff_strategies::ExponentialBackoff; use crate::{ bundle::Bundle, @@ -189,10 +192,12 @@ impl Auction { // set the timer latency_margin_timer = Some(tokio::time::sleep(self.latency_margin)); - // TODO: also want to fetch the pending nonce here (we wait for commit because we want the pending nonce from after the commit) - nonce_fetch = Some(tokio::task::spawn(async { - // TODO: fetch the pending nonce using the sequencer client with tryhard - Ok(0) + let client = self.sequencer_grpc_client.clone(); + let address = self.sequencer_key.address().clone(); + // we wait for commit because we want the pending nonce from after the commit + // TODO: fix lifetime issue with passing metrics here + nonce_fetch = Some(tokio::task::spawn(async move { + get_pending_nonce(client, address).await })); } @@ -215,9 +220,14 @@ impl Auction { // handle auction result let transaction_body = auction_result - .wrap_err("")? + .wrap_err("auction failed unexpectedly")? .ok_or_eyre("auction ended with no winning bid")? - .into_transaction_body(nonce, self.rollup_id, self.fee_asset_denomination.clone()); + .into_transaction_body( + nonce, + self.rollup_id, + self.fee_asset_denomination.clone(), + self.sequencer_chain_id, + ); let transaction = transaction_body.sign(self.sequencer_key.signing_key()); @@ -228,56 +238,32 @@ impl Auction { () = self.shutdown_token.cancelled() => Err(eyre!("received shutdown signal")), // submit the transaction to the sequencer - result = self.submit_transaction(transaction) => { + result = submit_transaction(self.sequencer_abci_client.clone(), transaction, self.metrics) => { // TODO: handle submission failure better? - result + match result { + Ok(resp) => { + // TODO: handle failed submission instead of just logging the result + info!(auction.id = %base64(self.auction_id), auction.result = %resp.log, "auction result submitted to sequencer"); + Ok(()) + }, + Err(e) => { + error!(auction.id = %base64(self.auction_id), err = %e, "failed to submit auction result to sequencer"); + Err(e).wrap_err("failed to submit auction result to sequencer") + }, + } } }; - submission_result } - - #[instrument(skip_all, fields(auction.id = %base64(self.auction_id), %address, err))] - async fn get_pending_nonce(&self, address: Address) -> eyre::Result { - let span = tracing::Span::current(); - let retry_cfg = make_retry_cfg("get pending nonce".into(), span); - let client = self.sequencer_grpc_client.clone(); - - let nonce = tryhard::retry_fn(|| { - let mut client = client.clone(); - let address = address.clone(); - - async move { - client - .get_pending_nonce(GetPendingNonceRequest { - address: Some(address.into_raw()), - }) - .await - } - }) - .with_config(retry_cfg) - .in_current_span() - .await - .wrap_err("failed to get pending nonce")? - .into_inner() - .inner; - - Ok(nonce) - } - - async fn submit_transaction(&self, _transaction: Transaction) -> eyre::Result<()> { - unimplemented!() - } } -fn make_retry_cfg( - msg: String, - span: Span, -) -> tryhard::RetryFutureConfig< - ExponentialBackoff, - impl Fn(u32, Option, &tonic::Status) -> futures::future::Ready<()>, -> { - tryhard::RetryFutureConfig::new(1024) +#[instrument(skip_all, fields(%address, err))] +async fn get_pending_nonce( + client: SequencerServiceClient, + address: Address, +) -> eyre::Result { + let span = tracing::Span::current(); + let retry_cfg = tryhard::RetryFutureConfig::new(1024) .exponential_backoff(Duration::from_millis(100)) .max_delay(Duration::from_secs(2)) .on_retry( @@ -290,9 +276,69 @@ fn make_retry_cfg( attempt, wait_duration, error = error as &dyn std::error::Error, - "attempt to {msg} failed; retrying after backoff", + "attempt to get pending nonce failed; retrying after backoff", + ); + futures::future::ready(()) + }, + ); + + let nonce = tryhard::retry_fn(|| { + let mut client = client.clone(); + let address = address.clone(); + + async move { + client + .get_pending_nonce(GetPendingNonceRequest { + address: Some(address.into_raw()), + }) + .await + } + }) + .with_config(retry_cfg) + .in_current_span() + .await + .wrap_err("failed to get pending nonce")? + .into_inner() + .inner; + + Ok(nonce) +} + +async fn submit_transaction( + client: sequencer_client::HttpClient, + transaction: Transaction, + _metrics: &'static Metrics, +) -> eyre::Result { + let span = tracing::Span::current(); + let retry_cfg = tryhard::RetryFutureConfig::new(1024) + .exponential_backoff(Duration::from_millis(100)) + .max_delay(Duration::from_secs(2)) + .on_retry( + move |attempt: u32, + next_delay: Option, + error: &sequencer_client::extension_trait::Error| { + let wait_duration = next_delay + .map(humantime::format_duration) + .map(tracing::field::display); + warn!( + parent: &span, + attempt, + wait_duration, + error = error as &dyn std::error::Error, + "attempt to submit transaction failed; retrying after backoff", ); futures::future::ready(()) }, - ) + ); + + tryhard::retry_fn(|| { + let client = client.clone(); + let transaction = transaction.clone(); + + async move { client.submit_transaction_sync(transaction).await } + }) + .with_config(retry_cfg) + .in_current_span() + .await + .wrap_err("failed to submit transaction") } diff --git a/crates/astria-auctioneer/src/auctioneer/inner.rs b/crates/astria-auctioneer/src/auctioneer/inner.rs index 577a238c87..f34d44bf49 100644 --- a/crates/astria-auctioneer/src/auctioneer/inner.rs +++ b/crates/astria-auctioneer/src/auctioneer/inner.rs @@ -36,7 +36,6 @@ pub(super) struct Auctioneer { } impl Auctioneer { - const AUCTION_DRIVER: &'static str = "auction_driver"; const OPTIMISTIC_EXECUTOR: &'static str = "optimistic_executor"; const _BUNDLE_COLLECTOR: &'static str = "bundle_collector"; diff --git a/crates/astria-auctioneer/src/block/executed_stream.rs b/crates/astria-auctioneer/src/block/executed_stream.rs index 725e8e5f5e..9bcee7cddb 100644 --- a/crates/astria-auctioneer/src/block/executed_stream.rs +++ b/crates/astria-auctioneer/src/block/executed_stream.rs @@ -95,6 +95,7 @@ pub(crate) fn make_execution_requests_stream( let (blocks_to_execute_tx, blocks_to_execute_rx) = mpsc::channel(16); let blocks_to_execute_stream_rx = ReceiverStream::new(blocks_to_execute_rx); + // TODO: dont skip empty blocks let requests = blocks_to_execute_stream_rx.filter_map(move |block: Optimistic| async move { let base_block = block .try_into_base_block(rollup_id) diff --git a/crates/astria-auctioneer/src/bundle/mod.rs b/crates/astria-auctioneer/src/bundle/mod.rs index 5758504387..4029b5ab59 100644 --- a/crates/astria-auctioneer/src/bundle/mod.rs +++ b/crates/astria-auctioneer/src/bundle/mod.rs @@ -71,8 +71,10 @@ impl Bundle { nonce: u32, rollup_id: RollupId, fee_asset: asset::Denom, + chain_id: String, ) -> TransactionBody { let data = self.into_raw().encode_to_vec(); + // TODO: sign the bundle data and put it in a `SignedBundle` message or something TransactionBody::builder() .actions(vec![ @@ -84,6 +86,7 @@ impl Bundle { .into(), ]) .nonce(nonce) + .chain_id(chain_id) .try_build() .expect("failed to build transaction body") }