diff --git a/crates/astria-auctioneer/src/auction/bid.rs b/crates/astria-auctioneer/src/auction/bid.rs index 6802657d98..d123aeda0b 100644 --- a/crates/astria-auctioneer/src/auction/bid.rs +++ b/crates/astria-auctioneer/src/auction/bid.rs @@ -34,13 +34,12 @@ impl Bundle { unimplemented!() } - pub(crate) fn into_transaction(self) -> Transaction { + pub(crate) fn into_transaction(self, _nonce: u64) -> Transaction { unimplemented!() } - pub(crate) fn bid(&self) -> Bid { - let bundle = self.clone(); - Bid::from_bundle(bundle) + pub(crate) fn into_bid(self) -> Bid { + Bid::from_bundle(self) } } diff --git a/crates/astria-auctioneer/src/auction/builder.rs b/crates/astria-auctioneer/src/auction/builder.rs index 73921907b8..716b123a98 100644 --- a/crates/astria-auctioneer/src/auction/builder.rs +++ b/crates/astria-auctioneer/src/auction/builder.rs @@ -1,14 +1,75 @@ +use std::time::Duration; + use astria_eyre::eyre; +use tokio::sync::{ + mpsc, + oneshot, +}; +use tokio_util::sync::CancellationToken; -use super::driver::Driver; +use super::{ + BundlesHandle, + Driver, + Id, + OptimisticExecutionHandle, +}; use crate::Metrics; pub(crate) struct Builder { pub(crate) metrics: &'static Metrics, + pub(crate) shutdown_token: CancellationToken, + + /// The endpoint for the sequencer gRPC service used to get pending nonces + pub(crate) sequencer_grpc_endpoint: String, + /// The endpoint for the sequencer ABCI service used to submit transactions + pub(crate) sequencer_abci_endpoint: String, + /// The amount of time to wait after a commit before closing the auction for bids and + /// submitting the resulting transaction + pub(crate) latency_margin: Duration, + /// The ID of the auction to be run + pub(crate) auction_id: Id, } impl Builder { - pub(crate) fn build(self) -> eyre::Result { - unimplemented!() + pub(crate) fn build(self) -> eyre::Result<(Driver, OptimisticExecutionHandle, BundlesHandle)> { + let Self { + metrics, + shutdown_token, + sequencer_grpc_endpoint, + sequencer_abci_endpoint, + latency_margin, + auction_id, + } = self; + + let (executed_block_tx, executed_block_rx) = oneshot::channel(); + let (block_commitment_tx, block_commitment_rx) = oneshot::channel(); + let (reorg_tx, reorg_rx) = oneshot::channel(); + // TODO: get the capacity from config or something instead of using a magic number + let (new_bids_tx, new_bids_rx) = mpsc::channel(16); + + let driver = Driver { + metrics, + shutdown_token, + sequencer_grpc_endpoint, + sequencer_abci_endpoint, + executed_block_rx, + block_commitment_rx, + reorg_rx, + new_bids_rx, + auction_id, + latency_margin, + }; + + Ok(( + driver, + OptimisticExecutionHandle { + executed_block_tx: Some(executed_block_tx), + block_commitment_tx: Some(block_commitment_tx), + reorg_tx: Some(reorg_tx), + }, + BundlesHandle { + new_bids_tx, + }, + )) } } diff --git a/crates/astria-auctioneer/src/auction/driver.rs b/crates/astria-auctioneer/src/auction/driver.rs deleted file mode 100644 index ae0be946b2..0000000000 --- a/crates/astria-auctioneer/src/auction/driver.rs +++ /dev/null @@ -1,180 +0,0 @@ -use std::time::Duration; - -use astria_core::protocol::transaction::v1::Transaction; -use astria_eyre::eyre::{ - self, - eyre, - Context, -}; -use tokio::{ - select, - sync::{ - mpsc, - oneshot, - }, -}; -use tokio_util::sync::CancellationToken; - -use super::{ - bid::Bid, - Auction, -}; -use crate::Metrics; - -pub(crate) struct Handle { - executed_block_tx: Option>, - block_commitments_tx: Option>, - reorg_tx: Option>, -} - -impl Handle { - pub(crate) async fn send_bundle(&self) -> eyre::Result<()> { - unimplemented!() - } - - pub(crate) fn executed_block(&mut self) -> eyre::Result<()> { - let _ = self - .executed_block_tx - .take() - .expect("should only send executed signal to a given auction once") - .send(()); - Ok(()) - } - - pub(crate) fn block_commitment(&mut self) -> eyre::Result<()> { - let _ = self - .block_commitments_tx - .take() - .expect("should only send block commitment signal to a given auction once") - .send(()); - - Ok(()) - } - - pub(crate) fn reorg(&mut self) -> eyre::Result<()> { - let _ = self - .reorg_tx - .take() - .expect("should only send reorg signal to a given auction once"); - - Ok(()) - } -} - -pub(crate) struct Driver { - #[allow(dead_code)] - metrics: &'static Metrics, - shutdown_token: CancellationToken, - - /// The time between receiving a block commitment - latency_margin: Duration, - /// Channel for receiving the executed block signal to start processing bundles - executed_block_rx: oneshot::Receiver<()>, - /// Channel for receiving the block commitment signal to start the latency margin timer - block_commitments_rx: oneshot::Receiver<()>, - /// Channel for receiving the reorg signal - reorg_rx: oneshot::Receiver<()>, - /// Channel for receiving new bundles - new_bids_rx: mpsc::Receiver, - /// The current state of the auction - auction: Auction, -} - -impl Driver { - pub(crate) async fn run(mut self) -> eyre::Result<()> { - // TODO: should the timer be inside the auction so that we only have one option? - let mut latency_margin_timer = None; - let mut auction: Option = None; - - let mut nonce_fetch: Option>> = None; - - let auction_result = loop { - select! { - biased; - - () = self.shutdown_token.cancelled() => break Err(eyre!("received shutdown signal")), - - signal = &mut self.reorg_rx => { - match signal { - Ok(()) => { - break Err(eyre!("reorg signal received")) - } - Err(_) => { - return Err(eyre!("reorg signal channel closed")); - } - } - // - } - - // get the auction winner when the timer expires - // TODO: should this also be conditioned on auction.is_some()? this feels redundant as we only populate the timer if the auction isnt none - _ = async { latency_margin_timer.as_mut().unwrap() }, if latency_margin_timer.is_some() => { - break Ok(auction.unwrap().winner()); - } - - signal = &mut self.executed_block_rx, if auction.is_none() => { - if let Err(e) = signal { - break Err(eyre!("exec signal channel closed")).wrap_err(e); - } - // set auction to open so it starts collecting bids - auction = Some(Auction::new()); - } - - signal = &mut self.block_commitments_rx, if auction.is_some() => { - if let Err(e) = signal { - break Err(eyre!("commit signal channel closed")).wrap_err(e); - } - // set the timer - latency_margin_timer = Some(tokio::time::sleep(self.latency_margin)); - - // TODO: also want to fetch the pending nonce here (we wait for commit because we want the pending nonce from after the commit) - nonce_fetch = Some(tokio::task::spawn(async { - // TODO: fetch the pending nonce using the sequencer client with tryhard - Ok(0) - })); - } - - // TODO: new bundles from the bundle stream if auction exists? - // - add the bid to the auction if executed - - } - // submit the auction result to the sequencer/wait for cancellation signal - // 1. result from submit_fut if !submission.terminated() - }; - - // handle auction result - let transaction = match auction_result { - Ok(winner) => winner.into_transaction(), - Err(e) => { - return Err(e); - } - }; - - // await the nonce fetch result - // TODO: flatten this or get rid of the option somehow - let nonce = nonce_fetch - .expect("should have received commit to exit the bid loop") - .await - .wrap_err("task failed")? - .wrap_err("failed to fetch nonce")?; - - let submission_result = select! { - biased; - - // TODO: should this be Ok() or something? - () = self.shutdown_token.cancelled() => Err(eyre!("received shutdown signal")), - - // submit the transaction to the sequencer - result = self.submit_transaction(transaction) => { - // TODO: handle submission failure better? - result - } - }; - - submission_result - } - - async fn submit_transaction(&self, _transaction: Transaction) -> eyre::Result<()> { - unimplemented!() - } -} diff --git a/crates/astria-auctioneer/src/auction/mod.rs b/crates/astria-auctioneer/src/auction/mod.rs index ad62b13e07..dcbc1f8eaa 100644 --- a/crates/astria-auctioneer/src/auction/mod.rs +++ b/crates/astria-auctioneer/src/auction/mod.rs @@ -1,10 +1,28 @@ -use crate::block; - mod bid; mod builder; -mod driver; -use bid::Bundle; -pub(crate) use driver::Handle; +use std::time::Duration; + +use astria_core::protocol::transaction::v1::Transaction; +use astria_eyre::eyre::{ + self, + eyre, + Context, +}; +use bid::{ + Bid, + Bundle, +}; +pub(crate) use builder::Builder; +use tokio::{ + select, + sync::{ + mpsc, + oneshot, + }, +}; +use tokio_util::sync::CancellationToken; + +use crate::Metrics; #[derive(Hash, Eq, PartialEq, Clone, Copy)] pub(crate) struct Id([u8; 32]); @@ -26,7 +44,7 @@ impl Auction { } } - fn bid(&mut self, bid: Bundle) -> bool { + fn bid(&mut self, _bid: Bundle) -> bool { // save the bid if its higher than self.highest_bid unimplemented!() } @@ -35,3 +53,185 @@ impl Auction { unimplemented!() } } + +pub(crate) struct OptimisticExecutionHandle { + executed_block_tx: Option>, + block_commitment_tx: Option>, + reorg_tx: Option>, +} + +impl OptimisticExecutionHandle { + pub(crate) async fn send_bundle(&self) -> eyre::Result<()> { + unimplemented!() + } + + pub(crate) fn executed_block(&mut self) -> eyre::Result<()> { + let _ = self + .executed_block_tx + .take() + .expect("should only send executed signal to a given auction once") + .send(()); + Ok(()) + } + + pub(crate) fn block_commitment(&mut self) -> eyre::Result<()> { + let _ = self + .block_commitment_tx + .take() + .expect("should only send block commitment signal to a given auction once") + .send(()); + + Ok(()) + } + + pub(crate) fn reorg(&mut self) -> eyre::Result<()> { + let _ = self + .reorg_tx + .take() + .expect("should only send reorg signal to a given auction once"); + + Ok(()) + } +} + +pub(crate) struct BundlesHandle { + new_bids_tx: mpsc::Sender, +} + +impl BundlesHandle { + pub(crate) fn send_bundle_timeout(&mut self, bundle: Bundle) -> eyre::Result<()> { + const BUNDLE_TIMEOUT: Duration = Duration::from_millis(100); + + let bid = bundle.into_bid(); + + self.new_bids_tx + .try_send(bid) + .wrap_err("bid channel full")?; + + Ok(()) + } +} + +// TODO: should this be the same object as the auction? +pub(crate) struct Driver { + #[allow(dead_code)] + metrics: &'static Metrics, + shutdown_token: CancellationToken, + + /// The endpoint for the sequencer's gRPC service, used for fetching pending nonces + sequencer_grpc_endpoint: String, + /// The endpoint for the sequencer's ABCI server, used for submitting transactions + sequencer_abci_endpoint: String, + /// Channel for receiving the executed block signal to start processing bundles + executed_block_rx: oneshot::Receiver<()>, + /// Channel for receiving the block commitment signal to start the latency margin timer + block_commitment_rx: oneshot::Receiver<()>, + /// Channel for receiving the reorg signal + reorg_rx: oneshot::Receiver<()>, + /// Channel for receiving new bundles + new_bids_rx: mpsc::Receiver, + /// The time between receiving a block commitment + latency_margin: Duration, + /// The ID of the auction + auction_id: Id, +} + +impl Driver { + pub(crate) async fn run(mut self) -> eyre::Result<()> { + // TODO: should the timer be inside the auction so that we only have one option? + let mut latency_margin_timer = None; + let mut auction: Option = None; + + let mut nonce_fetch: Option>> = None; + + let auction_result = loop { + select! { + biased; + + () = self.shutdown_token.cancelled() => break Err(eyre!("received shutdown signal")), + + signal = &mut self.reorg_rx => { + match signal { + Ok(()) => { + break Err(eyre!("reorg signal received")) + } + Err(_) => { + return Err(eyre!("reorg signal channel closed")); + } + } + // + } + + // get the auction winner when the timer expires + // TODO: should this also be conditioned on auction.is_some()? this feels redundant as we only populate the timer if the auction isnt none + _ = async { latency_margin_timer.as_mut().unwrap() }, if latency_margin_timer.is_some() => { + break Ok(auction.unwrap().winner()); + } + + signal = &mut self.executed_block_rx, if auction.is_none() => { + if let Err(e) = signal { + break Err(eyre!("exec signal channel closed")).wrap_err(e); + } + // set auction to open so it starts collecting bids + auction = Some(Auction::new()); + } + + signal = &mut self.block_commitment_rx, if auction.is_some() => { + if let Err(e) = signal { + break Err(eyre!("commit signal channel closed")).wrap_err(e); + } + // set the timer + latency_margin_timer = Some(tokio::time::sleep(self.latency_margin)); + + // TODO: also want to fetch the pending nonce here (we wait for commit because we want the pending nonce from after the commit) + nonce_fetch = Some(tokio::task::spawn(async { + // TODO: fetch the pending nonce using the sequencer client with tryhard + Ok(0) + })); + } + + // TODO: new bundles from the bundle stream if auction exists? + // - add the bid to the auction if executed + + } + // submit the auction result to the sequencer/wait for cancellation signal + // 1. result from submit_fut if !submission.terminated() + }; + + // await the nonce fetch result + // TODO: flatten this or get rid of the option somehow + let nonce = nonce_fetch + .expect("should have received commit to exit the bid loop") + .await + .wrap_err("task failed")? + .wrap_err("failed to fetch nonce")?; + + // handle auction result + let transaction = match auction_result { + // TODO: add signer + Ok(winner) => winner.into_transaction(nonce), + Err(e) => { + return Err(e); + } + }; + + let submission_result = select! { + biased; + + // TODO: should this be Ok() or something? + () = self.shutdown_token.cancelled() => Err(eyre!("received shutdown signal")), + + // submit the transaction to the sequencer + result = self.submit_transaction(transaction) => { + // TODO: handle submission failure better? + result + } + }; + + submission_result + } + + async fn submit_transaction(&self, _transaction: Transaction) -> eyre::Result<()> { + unimplemented!() + } +} diff --git a/crates/astria-auctioneer/src/auctioneer/inner.rs b/crates/astria-auctioneer/src/auctioneer/inner.rs index e9aea3f5b1..8bb38bd7fe 100644 --- a/crates/astria-auctioneer/src/auctioneer/inner.rs +++ b/crates/astria-auctioneer/src/auctioneer/inner.rs @@ -60,6 +60,7 @@ impl Auctioneer { rollup_id, optimistic_execution_grpc_endpoint: todo!(), bundle_grpc_endpoint: todo!(), + latency_margin: todo!(), } .build() .wrap_err("failed to initialize the optimistic executor")?; diff --git a/crates/astria-auctioneer/src/optimistic_executor/builder.rs b/crates/astria-auctioneer/src/optimistic_executor/builder.rs index ae224cb681..6f27d6233d 100644 --- a/crates/astria-auctioneer/src/optimistic_executor/builder.rs +++ b/crates/astria-auctioneer/src/optimistic_executor/builder.rs @@ -1,3 +1,5 @@ +use std::time::Duration; + use astria_core::primitive::v1::RollupId; use astria_eyre::eyre; use tokio_util::sync::CancellationToken; @@ -16,6 +18,8 @@ pub(crate) struct Builder { pub(crate) optimistic_execution_grpc_endpoint: String, /// The endpoint for the rollup's bundle gRPC service pub(crate) bundle_grpc_endpoint: String, + /// The amount of time to wait after a commit before closing the auction + pub(crate) latency_margin: Duration, } impl Builder { @@ -27,6 +31,7 @@ impl Builder { rollup_id, optimistic_execution_grpc_endpoint, bundle_grpc_endpoint, + latency_margin, } = self; let rollup_id = RollupId::from_unhashed_bytes(&rollup_id); @@ -38,6 +43,7 @@ impl Builder { rollup_id, optimistic_execution_grpc_endpoint, bundle_grpc_endpoint, + latency_margin, }) } } diff --git a/crates/astria-auctioneer/src/optimistic_executor/mod.rs b/crates/astria-auctioneer/src/optimistic_executor/mod.rs index 58e0570c6a..ea2d34ee85 100644 --- a/crates/astria-auctioneer/src/optimistic_executor/mod.rs +++ b/crates/astria-auctioneer/src/optimistic_executor/mod.rs @@ -45,9 +45,11 @@ pub(crate) struct OptimisticExecutor { metrics: &'static crate::Metrics, shutdown_token: CancellationToken, sequencer_grpc_endpoint: String, + sequencer_abci_endpoint: String, rollup_id: RollupId, optimistic_execution_grpc_endpoint: String, bundle_grpc_endpoint: String, + latency_margin: Duration, } impl OptimisticExecutor { @@ -78,7 +80,10 @@ impl OptimisticExecutor { // maybe just make this a fused future `auction_fut` let mut auction_futs: JoinMap> = JoinMap::new(); - let mut auction_handles: HashMap = HashMap::new(); + let mut auction_oe_handles: HashMap = + HashMap::new(); + let mut auction_bundle_handles: HashMap = + HashMap::new(); let mut curr_block: Option = None; @@ -107,31 +112,43 @@ impl OptimisticExecutor { }; let opt = block::Optimistic::try_from_raw(rsp.block.unwrap()).wrap_err("failed to parse incoming optimistic block")?; - // TODO: should i have an if/let statement that logs if its none, instead? - // - this will probably only change once - might want to move that into startup instead of having the option - // - if the stream.next() returns the opt after sending it to the opt exec api (see note below), we don't need to clone here - let next_block = CurrentBlock::opt(opt.clone()); - // 1. reorg by shutting down current auction fut, dumping the oneshot (this is the // state transition) - if let Some(hash) = curr_block.as_ref().map(|block| block.sequencer_block_hash()) { + if let Some(block) = curr_block { + let hash = block.sequencer_block_hash(); let auction_id = auction::Id::from_sequencer_block_hash(hash); - let handle = auction_handles.get_mut(&auction_id).ok_or_eyre("unable to get handle for current auction")?; + let handle = auction_oe_handles.get_mut(&auction_id).ok_or_eyre("unable to get handle for current auction")?; handle.reorg()?; }; + // TODO: should i have an if/let statement that logs if its none, instead? + // - this will probably only change once - might want to move that into startup instead of having the option + // - if the stream.next() returns the opt after sending it to the opt exec api (see note below), we don't need to clone here + // TODO: log the dumping of curr block? + curr_block = Some(CurrentBlock::opt(opt.clone())); + + // create and run the auction fut and save the its handles + // TODO: get rid of this expect, probably by getting rid of the option on curr block. + let block_hash = curr_block.take().expect("set this to Some already").sequencer_block_hash(); + let auction_id = auction::Id::from_sequencer_block_hash(block_hash); + let (auction_driver, optimistic_execution_handle, bundles_handle) = auction::Builder { + metrics: self.metrics, + shutdown_token: self.shutdown_token.clone(), + sequencer_grpc_endpoint: self.sequencer_grpc_endpoint.clone(), + sequencer_abci_endpoint: self.sequencer_abci_endpoint.clone(), + latency_margin: self.latency_margin, + auction_id, + }.build().wrap_err("failed to build auction")?; + + auction_futs.spawn(auction_id, auction_driver.run()); + auction_oe_handles.insert(auction_id, optimistic_execution_handle); + auction_bundle_handles.insert(auction_id, bundles_handle); - // 2. start new auction fut in the joinmap with the sequencer block hash as key - // TODO: - // 1. make new auction and save the handle - // 2. add the handle.run() to the joinmap? // 3. send new opt to the optimistic execution stream // TODO: move this into stream domain type // - this should just be part of the stream.next future probably, then we dont need the mpsc channel let _ = opts_to_exec_tx.send_timeout(opt, Duration::from_millis(100)); - // TODO: log the dumpingh of curr block? - curr_block = Some(next_block); }, // 2. executed block stream @@ -148,15 +165,15 @@ impl OptimisticExecutor { let commit = block::Committed::try_from_raw(raw).wrap_err("failed to parse incoming block commitment")?; - let next_block = curr_block - .map(|block| block.commit(commit).wrap_err("state transition failure")) - .expect("should only receive block commitment after optimistic block")?; - curr_block = Some(next_block); + if let Some(block) = curr_block { + curr_block = Some(block.commit(commit).wrap_err("state transition failure")?); + } + // 2. send commit signal to the auction so it will start the timer - if let Some(hash) = curr_block.as_ref().map(|block| block.sequencer_block_hash()) { + if let Some(hash) = curr_block.as_ref().map(|block| block.sequencer_block_hash()) { let auction_id = auction::Id::from_sequencer_block_hash(hash); - let handle = auction_handles.get_mut(&auction_id).ok_or_eyre("unable to get handle for current auction")?; + let handle = auction_oe_handles.get_mut(&auction_id).ok_or_eyre("unable to get handle for current auction")?; handle.block_commitment()?; } }, @@ -179,7 +196,7 @@ impl OptimisticExecutor { // 2. send executed signal to the auction so it will start pulling bundles if let Some(hash) = curr_block.as_ref().map(|block| block.sequencer_block_hash()) { let auction_id = auction::Id::from_sequencer_block_hash(hash); - let handle = auction_handles.get_mut(&auction_id).ok_or_eyre("unable to get handle for current auction")?; + let handle = auction_oe_handles.get_mut(&auction_id).ok_or_eyre("unable to get handle for current auction")?; handle.executed_block()?; } } @@ -191,8 +208,6 @@ impl OptimisticExecutor { // before execution, we can decide how to react here. // for example, we can drop all the bundles that are stuck in the channel and log a warning, // or we can kill the auction for that given block - - // 3. execute the execute_fut if not terminated? } } };