From 45810bf5244fb7b154338af8d6ca88823b996d73 Mon Sep 17 00:00:00 2001 From: itamar Date: Mon, 4 Nov 2024 17:32:50 -0500 Subject: [PATCH] add bundle stream --- .../src/auction/allocation_rule.rs | 10 +++-- .../astria-auctioneer/src/auction/builder.rs | 8 +++- .../astria-auctioneer/src/auction/manager.rs | 21 +++++----- crates/astria-auctioneer/src/auction/mod.rs | 18 ++++---- .../src/block/block_commitment_stream.rs | 4 +- .../src/block/optimistic_stream.rs | 4 +- crates/astria-auctioneer/src/bundle/client.rs | 26 ++++++++++++ crates/astria-auctioneer/src/bundle/mod.rs | 1 + .../src/optimistic_executor/mod.rs | 42 ++++++++++++------- .../src/sequencer_grpc_client.rs | 21 ++++++---- 10 files changed, 107 insertions(+), 48 deletions(-) diff --git a/crates/astria-auctioneer/src/auction/allocation_rule.rs b/crates/astria-auctioneer/src/auction/allocation_rule.rs index 584df5b4e5..1b617c60f7 100644 --- a/crates/astria-auctioneer/src/auction/allocation_rule.rs +++ b/crates/astria-auctioneer/src/auction/allocation_rule.rs @@ -11,9 +11,13 @@ impl FirstPrice { } } - pub(crate) fn bid(&mut self, _bid: Bundle) -> bool { - // save the bid if its higher than self.highest_bid - unimplemented!() + pub(crate) fn bid(&mut self, bundle: Bundle) -> bool { + if bundle.bid() > self.highest_bid.as_ref().map_or(0, |b| b.bid()) { + self.highest_bid = Some(bundle); + true + } else { + false + } } pub(crate) fn highest_bid(self) -> Option { diff --git a/crates/astria-auctioneer/src/auction/builder.rs b/crates/astria-auctioneer/src/auction/builder.rs index 700d55b5d3..8b5592fc87 100644 --- a/crates/astria-auctioneer/src/auction/builder.rs +++ b/crates/astria-auctioneer/src/auction/builder.rs @@ -35,6 +35,8 @@ pub(crate) struct Builder { pub(crate) sequencer_key: SequencerKey, /// The denomination of the fee asset used in the sequencer transactions pub(crate) fee_asset_denomination: asset::Denom, + /// The chain ID used for sequencer transactions + pub(crate) sequencer_chain_id: String, /// The rollup ID used for `RollupDataSubmission` with the auction result pub(crate) rollup_id: RollupId, } @@ -51,6 +53,7 @@ impl Builder { fee_asset_denomination, rollup_id, sequencer_key, + sequencer_chain_id, } = self; let (executed_block_tx, executed_block_rx) = oneshot::channel(); @@ -59,7 +62,7 @@ impl Builder { // TODO: get the capacity from config or something instead of using a magic number let (new_bundles_tx, new_bundles_rx) = mpsc::channel(16); - let driver = Auction { + let auction = Auction { metrics, shutdown_token, sequencer_grpc_endpoint, @@ -72,6 +75,7 @@ impl Builder { latency_margin, sequencer_key, fee_asset_denomination, + sequencer_chain_id, rollup_id, }; @@ -82,7 +86,7 @@ impl Builder { start_timer_tx: Some(block_commitment_tx), abort_tx: Some(reorg_tx), }, - driver, + auction, ) } } diff --git a/crates/astria-auctioneer/src/auction/manager.rs b/crates/astria-auctioneer/src/auction/manager.rs index c34d549b22..043a29912e 100644 --- a/crates/astria-auctioneer/src/auction/manager.rs +++ b/crates/astria-auctioneer/src/auction/manager.rs @@ -1,11 +1,8 @@ use std::collections::HashMap; -use astria_core::{ - generated::bundle::v1alpha1::Bundle, - primitive::v1::{ - asset, - RollupId, - }, +use astria_core::primitive::v1::{ + asset, + RollupId, }; use astria_eyre::eyre::{ self, @@ -19,6 +16,7 @@ use tokio_util::{ use tracing::instrument; use super::{ + Bundle, Handle, Id, SequencerKey, @@ -102,6 +100,7 @@ impl Manager { auction_id, sequencer_key: self.sequencer_key.clone(), fee_asset_denomination: self.fee_asset_denomination.clone(), + sequencer_chain_id: self.sequencer_chain_id.clone(), rollup_id: self.rollup_id, } .build(); @@ -138,10 +137,12 @@ impl Manager { .wrap_err("failed to start processing bids") } - pub(crate) fn try_send_bundle(&mut self, _auction_id: Id, _bundle: Bundle) -> eyre::Result<()> { - unimplemented!() - // try to get the handle for the appropriate auction - // try send into that auction + pub(crate) fn try_send_bundle(&mut self, auction_id: Id, bundle: Bundle) -> eyre::Result<()> { + self.auction_handles + .get_mut(&auction_id) + .ok_or_eyre("unable to get handle for the given auction")? + .try_send_bundle(bundle) + .wrap_err("failed to add bundle to auction") } pub(crate) async fn join_next(&mut self) -> Option<(Id, eyre::Result<()>)> { diff --git a/crates/astria-auctioneer/src/auction/mod.rs b/crates/astria-auctioneer/src/auction/mod.rs index 3d625ce865..50ade7a5cc 100644 --- a/crates/astria-auctioneer/src/auction/mod.rs +++ b/crates/astria-auctioneer/src/auction/mod.rs @@ -16,6 +16,7 @@ use astria_eyre::eyre::{ OptionExt as _, }; pub(crate) use builder::Builder; +use telemetry::display::base64; use tokio::{ select, sync::{ @@ -24,6 +25,7 @@ use tokio::{ }, }; use tokio_util::sync::CancellationToken; +use tracing::info; use crate::{ bundle::Bundle, @@ -88,9 +90,7 @@ impl Handle { Ok(()) } - pub(crate) fn send_bundle_timeout(&mut self, bundle: Bundle) -> eyre::Result<()> { - const BUNDLE_TIMEOUT: Duration = Duration::from_millis(100); - + pub(crate) fn try_send_bundle(&mut self, bundle: Bundle) -> eyre::Result<()> { self.new_bundles_tx .try_send(bundle) .wrap_err("bid channel full")?; @@ -125,6 +125,8 @@ struct Auction { sequencer_key: SequencerKey, /// Fee asset for submitting transactions fee_asset_denomination: asset::Denom, + /// The chain ID used for sequencer transactions + sequencer_chain_id: String, /// Rollup ID to submit the auction result to rollup_id: RollupId, } @@ -132,7 +134,7 @@ struct Auction { impl Auction { pub(crate) async fn run(mut self) -> eyre::Result<()> { let mut latency_margin_timer = None; - let allocation_rule = FirstPrice::new(); + let mut allocation_rule = FirstPrice::new(); let mut auction_is_open = false; let mut nonce_fetch: Option>> = None; @@ -182,9 +184,11 @@ impl Auction { })); } - // TODO: new bundles from the bundle stream if auction exists? - // - add the bid to the auction if executed - // Some(bid) = &mut self.bids_rx, if auction_is_open() => {} + Some(bundle) = self.new_bundles_rx.recv(), if auction_is_open => { + if allocation_rule.bid(bundle.clone()) { + info!(auction.id = %base64(self.auction_id), bundle.bid = %bundle.bid(), "new highest bid") + } + } } }; diff --git a/crates/astria-auctioneer/src/block/block_commitment_stream.rs b/crates/astria-auctioneer/src/block/block_commitment_stream.rs index 3261ba77a0..9dc41d7ee2 100644 --- a/crates/astria-auctioneer/src/block/block_commitment_stream.rs +++ b/crates/astria-auctioneer/src/block/block_commitment_stream.rs @@ -12,7 +12,7 @@ use futures::{ }; use super::Commitment; -use crate::sequencer_grpc_client::SequencerGrpcClient; +use crate::sequencer_grpc_client::OptimisticBlockClient; /// A stream for receiving committed blocks from the sequencer. pub(crate) struct BlockCommitmentStream { @@ -20,7 +20,7 @@ pub(crate) struct BlockCommitmentStream { } impl BlockCommitmentStream { - pub(crate) async fn connect(mut sequencer_client: SequencerGrpcClient) -> eyre::Result { + pub(crate) async fn connect(mut sequencer_client: OptimisticBlockClient) -> eyre::Result { let committed_stream_client = sequencer_client .get_block_commitment_stream() .await diff --git a/crates/astria-auctioneer/src/block/optimistic_stream.rs b/crates/astria-auctioneer/src/block/optimistic_stream.rs index d54df5c8aa..e30950ea1c 100644 --- a/crates/astria-auctioneer/src/block/optimistic_stream.rs +++ b/crates/astria-auctioneer/src/block/optimistic_stream.rs @@ -15,7 +15,7 @@ use futures::{ }; use super::Optimistic; -use crate::sequencer_grpc_client::SequencerGrpcClient; +use crate::sequencer_grpc_client::OptimisticBlockClient; /// A stream for receiving optimistic blocks from the sequencer. pub(crate) struct OptimisticBlockStream { @@ -25,7 +25,7 @@ pub(crate) struct OptimisticBlockStream { impl OptimisticBlockStream { pub(crate) async fn connect( rollup_id: RollupId, - mut sequencer_client: SequencerGrpcClient, + mut sequencer_client: OptimisticBlockClient, ) -> eyre::Result { let optimistic_stream_client = sequencer_client .get_optimistic_block_stream(rollup_id) diff --git a/crates/astria-auctioneer/src/bundle/client.rs b/crates/astria-auctioneer/src/bundle/client.rs index 4381095f41..2adfceb26c 100644 --- a/crates/astria-auctioneer/src/bundle/client.rs +++ b/crates/astria-auctioneer/src/bundle/client.rs @@ -10,9 +10,14 @@ use astria_core::generated::bundle::v1alpha1::{ }; use astria_eyre::eyre::{ self, + OptionExt, WrapErr as _, }; use axum::http::Uri; +use futures::{ + Stream, + StreamExt, +}; use tonic::transport::Endpoint; use tracing::{ warn, @@ -21,6 +26,8 @@ use tracing::{ }; use tryhard::backoff_strategies::ExponentialBackoff; +use super::Bundle; + pub(crate) struct BundleClient { inner: BundleServiceClient, uri: Uri, @@ -106,3 +113,22 @@ impl BundleStream { }) } } + +impl Stream for BundleStream { + type Item = eyre::Result; + + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + let raw = futures::ready!(self.client.poll_next_unpin(cx)) + .ok_or_eyre("stream has been closed")? + .wrap_err("received gRPC error")? + .bundle + .ok_or_eyre("bundle stream response did not contain bundle")?; + + let bundle = Bundle::try_from_raw(raw).wrap_err("failed to parse raw Bundle")?; + + std::task::Poll::Ready(Some(Ok(bundle))) + } +} diff --git a/crates/astria-auctioneer/src/bundle/mod.rs b/crates/astria-auctioneer/src/bundle/mod.rs index a99aa04f11..5758504387 100644 --- a/crates/astria-auctioneer/src/bundle/mod.rs +++ b/crates/astria-auctioneer/src/bundle/mod.rs @@ -16,6 +16,7 @@ use astria_eyre::eyre::{ WrapErr as _, }; use bytes::Bytes; +pub(crate) use client::BundleStream; use prost::Message as _; mod client; diff --git a/crates/astria-auctioneer/src/optimistic_executor/mod.rs b/crates/astria-auctioneer/src/optimistic_executor/mod.rs index c32abdcde9..02a2e126de 100644 --- a/crates/astria-auctioneer/src/optimistic_executor/mod.rs +++ b/crates/astria-auctioneer/src/optimistic_executor/mod.rs @@ -30,7 +30,11 @@ use crate::{ executed_stream::ExecutedBlockStream, optimistic_stream::OptimisticBlockStream, }, - sequencer_grpc_client::SequencerGrpcClient, + bundle::{ + Bundle, + BundleStream, + }, + sequencer_grpc_client::OptimisticBlockClient, sequencer_key::SequencerKey, }; @@ -63,7 +67,7 @@ impl Startup { sequencer_chain_id, } = self; - let sequencer_client = SequencerGrpcClient::new(&sequencer_grpc_endpoint) + let sequencer_client = OptimisticBlockClient::new(&sequencer_grpc_endpoint) .wrap_err("failed to initialize sequencer grpc client")?; // TODO: have a connect streams helper? let mut optimistic_blocks = @@ -76,10 +80,13 @@ impl Startup { .wrap_err("failed to initialize block commitment stream")?; let (blocks_to_execute_handle, executed_blocks) = - ExecutedBlockStream::connect(rollup_id, rollup_grpc_endpoint) + ExecutedBlockStream::connect(rollup_id, rollup_grpc_endpoint.clone()) .await .wrap_err("failed to initialize executed block stream")?; + let bundle_stream = BundleStream::connect(rollup_grpc_endpoint) + .await + .wrap_err("failed to initialize bundle stream")?; // let bundle_stream = BundleServiceClient::new(bundle_service_grpc_url) // .wrap_err("failed to initialize bundle service grpc client")?; @@ -112,6 +119,7 @@ impl Startup { block_commitments, executed_blocks, blocks_to_execute_handle, + bundle_stream, auctions, current_block, }) @@ -125,6 +133,7 @@ pub(crate) struct Running { block_commitments: BlockCommitmentStream, executed_blocks: ExecutedBlockStream, blocks_to_execute_handle: block::executed_stream::Handle, + bundle_stream: BundleStream, auctions: auction::Manager, current_block: block::Current, } @@ -163,17 +172,11 @@ impl Running { self.executed_block_handler(executed_block).wrap_err("failed to handle executed block")?; } - // 2. forward bundles from bundle stream into the correct auction fut - // - bundles will build up in the channel into the auction until the executed signal is - // sent to the auction fut. so if backpressure builds up here, i.e. bids arrive way - // before execution, we can decide how to react here. - // for example, we can drop all the bundles that are stuck in the channel and log a warning, - // or we can kill the auction for that given block - // Some(res) = bundle_stream.next() => { - // filter by auction id/block hash - // push into correct mpsc channel - // TODO: document how backpressure is handled here - // } + Some(res) = self.bundle_stream.next() => { + let bundle = res.wrap_err("failed to get bundle")?; + + self.bundle_handler(bundle).wrap_err("failed to handle bundle")?; + } } } }; @@ -254,6 +257,17 @@ impl Running { Ok(()) } + #[instrument(skip(self), fields(auction.id = %base64(self.current_block.sequencer_block_hash())))] + fn bundle_handler(&mut self, bundle: Bundle) -> eyre::Result<()> { + let auction_id = + auction::Id::from_sequencer_block_hash(self.current_block.sequencer_block_hash()); + self.auctions + .try_send_bundle(auction_id, bundle) + .wrap_err("failed to submit bundle to auction")?; + + Ok(()) + } + async fn shutdown(self) { self.shutdown_token.cancel(); } diff --git a/crates/astria-auctioneer/src/sequencer_grpc_client.rs b/crates/astria-auctioneer/src/sequencer_grpc_client.rs index 6b2a9253be..3b2b0b93a2 100644 --- a/crates/astria-auctioneer/src/sequencer_grpc_client.rs +++ b/crates/astria-auctioneer/src/sequencer_grpc_client.rs @@ -1,12 +1,15 @@ use std::time::Duration; use astria_core::{ - generated::sequencerblock::optimisticblock::v1alpha1::{ - optimistic_block_service_client::OptimisticBlockServiceClient, - GetBlockCommitmentStreamRequest, - GetBlockCommitmentStreamResponse, - GetOptimisticBlockStreamRequest, - GetOptimisticBlockStreamResponse, + generated::{ + primitive::v1::Address, + sequencerblock::optimisticblock::v1alpha1::{ + optimistic_block_service_client::OptimisticBlockServiceClient, + GetBlockCommitmentStreamRequest, + GetBlockCommitmentStreamResponse, + GetOptimisticBlockStreamRequest, + GetOptimisticBlockStreamResponse, + }, }, primitive::v1::RollupId, }; @@ -29,12 +32,12 @@ use tryhard::backoff_strategies::ExponentialBackoff; /// Wraps the gRPC client for the Sequencer service that wraps client calls with `tryhard`. #[derive(Debug, Clone)] -pub(crate) struct SequencerGrpcClient { +pub(crate) struct OptimisticBlockClient { inner: OptimisticBlockServiceClient, uri: Uri, } -impl SequencerGrpcClient { +impl OptimisticBlockClient { pub(crate) fn new(sequencer_uri: &str) -> eyre::Result { let uri = sequencer_uri .parse::() @@ -49,6 +52,7 @@ impl SequencerGrpcClient { }) } + // TODO: this should probably be separated from the tryhard logic and put in an extension trait #[instrument(skip_all, fields( uri = %self.uri, %rollup_id, @@ -78,6 +82,7 @@ impl SequencerGrpcClient { Ok(stream) } + // TODO: this should probably be separated from the tryhard logic and put in an extension trait #[instrument(skip_all, fields( uri = %self.uri, err,