Skip to content

Commit

Permalink
transaction submission logic
Browse files Browse the repository at this point in the history
  • Loading branch information
itamarreif committed Nov 5, 2024
1 parent 22baca2 commit 58ac5fd
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 55 deletions.
154 changes: 100 additions & 54 deletions crates/astria-auctioneer/src/auction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,11 @@ use astria_eyre::eyre::{
OptionExt as _,
};
pub(crate) use builder::Builder;
use sequencer_client::Address;
use sequencer_client::{
tendermint_rpc::endpoint::broadcast::tx_sync,
Address,
SequencerClientExt,
};
use telemetry::display::base64;
use tokio::{
select,
Expand All @@ -31,13 +35,12 @@ use tokio::{
};
use tokio_util::sync::CancellationToken;
use tracing::{
error,
info,
instrument,
warn,
Instrument,
Span,
};
use tryhard::backoff_strategies::ExponentialBackoff;

use crate::{
bundle::Bundle,
Expand Down Expand Up @@ -189,10 +192,12 @@ impl Auction {
// set the timer
latency_margin_timer = Some(tokio::time::sleep(self.latency_margin));

// TODO: also want to fetch the pending nonce here (we wait for commit because we want the pending nonce from after the commit)
nonce_fetch = Some(tokio::task::spawn(async {
// TODO: fetch the pending nonce using the sequencer client with tryhard
Ok(0)
let client = self.sequencer_grpc_client.clone();
let address = self.sequencer_key.address().clone();
// we wait for commit because we want the pending nonce from after the commit
// TODO: fix lifetime issue with passing metrics here
nonce_fetch = Some(tokio::task::spawn(async move {
get_pending_nonce(client, address).await
}));
}

Expand All @@ -215,9 +220,14 @@ impl Auction {

// handle auction result
let transaction_body = auction_result
.wrap_err("")?
.wrap_err("auction failed unexpectedly")?
.ok_or_eyre("auction ended with no winning bid")?
.into_transaction_body(nonce, self.rollup_id, self.fee_asset_denomination.clone());
.into_transaction_body(
nonce,
self.rollup_id,
self.fee_asset_denomination.clone(),
self.sequencer_chain_id,
);

let transaction = transaction_body.sign(self.sequencer_key.signing_key());

Expand All @@ -228,56 +238,32 @@ impl Auction {
() = self.shutdown_token.cancelled() => Err(eyre!("received shutdown signal")),

// submit the transaction to the sequencer
result = self.submit_transaction(transaction) => {
result = submit_transaction(self.sequencer_abci_client.clone(), transaction, self.metrics) => {
// TODO: handle submission failure better?
result
match result {
Ok(resp) => {
// TODO: handle failed submission instead of just logging the result
info!(auction.id = %base64(self.auction_id), auction.result = %resp.log, "auction result submitted to sequencer");
Ok(())
},
Err(e) => {
error!(auction.id = %base64(self.auction_id), err = %e, "failed to submit auction result to sequencer");
Err(e).wrap_err("failed to submit auction result to sequencer")
},
}
}
};

submission_result
}

#[instrument(skip_all, fields(auction.id = %base64(self.auction_id), %address, err))]
async fn get_pending_nonce(&self, address: Address) -> eyre::Result<u32> {
let span = tracing::Span::current();
let retry_cfg = make_retry_cfg("get pending nonce".into(), span);
let client = self.sequencer_grpc_client.clone();

let nonce = tryhard::retry_fn(|| {
let mut client = client.clone();
let address = address.clone();

async move {
client
.get_pending_nonce(GetPendingNonceRequest {
address: Some(address.into_raw()),
})
.await
}
})
.with_config(retry_cfg)
.in_current_span()
.await
.wrap_err("failed to get pending nonce")?
.into_inner()
.inner;

Ok(nonce)
}

async fn submit_transaction(&self, _transaction: Transaction) -> eyre::Result<()> {
unimplemented!()
}
}

fn make_retry_cfg(
msg: String,
span: Span,
) -> tryhard::RetryFutureConfig<
ExponentialBackoff,
impl Fn(u32, Option<Duration>, &tonic::Status) -> futures::future::Ready<()>,
> {
tryhard::RetryFutureConfig::new(1024)
#[instrument(skip_all, fields(%address, err))]
async fn get_pending_nonce(
client: SequencerServiceClient<tonic::transport::Channel>,
address: Address,
) -> eyre::Result<u32> {
let span = tracing::Span::current();
let retry_cfg = tryhard::RetryFutureConfig::new(1024)
.exponential_backoff(Duration::from_millis(100))
.max_delay(Duration::from_secs(2))
.on_retry(
Expand All @@ -290,9 +276,69 @@ fn make_retry_cfg(
attempt,
wait_duration,
error = error as &dyn std::error::Error,
"attempt to {msg} failed; retrying after backoff",
"attempt to get pending nonce failed; retrying after backoff",
);
futures::future::ready(())
},
);

let nonce = tryhard::retry_fn(|| {
let mut client = client.clone();
let address = address.clone();

async move {
client
.get_pending_nonce(GetPendingNonceRequest {
address: Some(address.into_raw()),
})
.await
}
})
.with_config(retry_cfg)
.in_current_span()
.await
.wrap_err("failed to get pending nonce")?
.into_inner()
.inner;

Ok(nonce)
}

async fn submit_transaction(
client: sequencer_client::HttpClient,
transaction: Transaction,
_metrics: &'static Metrics,
) -> eyre::Result<tx_sync::Response> {
let span = tracing::Span::current();
let retry_cfg = tryhard::RetryFutureConfig::new(1024)
.exponential_backoff(Duration::from_millis(100))
.max_delay(Duration::from_secs(2))
.on_retry(
move |attempt: u32,
next_delay: Option<Duration>,
error: &sequencer_client::extension_trait::Error| {
let wait_duration = next_delay
.map(humantime::format_duration)
.map(tracing::field::display);
warn!(
parent: &span,
attempt,
wait_duration,
error = error as &dyn std::error::Error,
"attempt to submit transaction failed; retrying after backoff",
);
futures::future::ready(())
},
)
);

tryhard::retry_fn(|| {
let client = client.clone();
let transaction = transaction.clone();

async move { client.submit_transaction_sync(transaction).await }
})
.with_config(retry_cfg)
.in_current_span()
.await
.wrap_err("failed to submit transaction")
}
1 change: 0 additions & 1 deletion crates/astria-auctioneer/src/auctioneer/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ pub(super) struct Auctioneer {
}

impl Auctioneer {
const AUCTION_DRIVER: &'static str = "auction_driver";
const OPTIMISTIC_EXECUTOR: &'static str = "optimistic_executor";
const _BUNDLE_COLLECTOR: &'static str = "bundle_collector";

Expand Down
1 change: 1 addition & 0 deletions crates/astria-auctioneer/src/block/executed_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ pub(crate) fn make_execution_requests_stream(
let (blocks_to_execute_tx, blocks_to_execute_rx) = mpsc::channel(16);
let blocks_to_execute_stream_rx = ReceiverStream::new(blocks_to_execute_rx);

// TODO: dont skip empty blocks
let requests = blocks_to_execute_stream_rx.filter_map(move |block: Optimistic| async move {
let base_block = block
.try_into_base_block(rollup_id)
Expand Down
3 changes: 3 additions & 0 deletions crates/astria-auctioneer/src/bundle/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,10 @@ impl Bundle {
nonce: u32,
rollup_id: RollupId,
fee_asset: asset::Denom,
chain_id: String,
) -> TransactionBody {
let data = self.into_raw().encode_to_vec();
// TODO: sign the bundle data and put it in a `SignedBundle` message or something

TransactionBody::builder()
.actions(vec![
Expand All @@ -84,6 +86,7 @@ impl Bundle {
.into(),
])
.nonce(nonce)
.chain_id(chain_id)
.try_build()
.expect("failed to build transaction body")
}
Expand Down

0 comments on commit 58ac5fd

Please sign in to comment.