From 009764a09ad2770a71efd9044122040eeee92aec Mon Sep 17 00:00:00 2001 From: Bharath Date: Thu, 26 Sep 2024 20:22:11 +0530 Subject: [PATCH] separate bundle service into OptimisticExecutionService and BundleService --- .../src/generated/astria.bundle.v1alpha1.rs | 297 +++++++++++++++--- .../astria/bundle/v1alpha1/bundle.proto | 6 +- 2 files changed, 264 insertions(+), 39 deletions(-) diff --git a/crates/astria-core/src/generated/astria.bundle.v1alpha1.rs b/crates/astria-core/src/generated/astria.bundle.v1alpha1.rs index a56ccf5eab..70380401fb 100644 --- a/crates/astria-core/src/generated/astria.bundle.v1alpha1.rs +++ b/crates/astria-core/src/generated/astria.bundle.v1alpha1.rs @@ -99,16 +99,15 @@ impl ::prost::Name for Bundle { } /// Generated client implementations. #[cfg(feature = "client")] -pub mod bundle_service_client { +pub mod optimistic_execution_service_client { #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] use tonic::codegen::*; use tonic::codegen::http::Uri; - /// Geth will serve this to the Auctioneer #[derive(Debug, Clone)] - pub struct BundleServiceClient { + pub struct OptimisticExecutionServiceClient { inner: tonic::client::Grpc, } - impl BundleServiceClient { + impl OptimisticExecutionServiceClient { /// Attempt to create a new client by connecting to a given endpoint. pub async fn connect(dst: D) -> Result where @@ -119,7 +118,7 @@ pub mod bundle_service_client { Ok(Self::new(conn)) } } - impl BundleServiceClient + impl OptimisticExecutionServiceClient where T: tonic::client::GrpcService, T::Error: Into, @@ -137,7 +136,7 @@ pub mod bundle_service_client { pub fn with_interceptor( inner: T, interceptor: F, - ) -> BundleServiceClient> + ) -> OptimisticExecutionServiceClient> where F: tonic::service::Interceptor, T::ResponseBody: Default, @@ -151,7 +150,9 @@ pub mod bundle_service_client { http::Request, >>::Error: Into + Send + Sync, { - BundleServiceClient::new(InterceptedService::new(inner, interceptor)) + OptimisticExecutionServiceClient::new( + InterceptedService::new(inner, interceptor), + ) } /// Compress requests with the given encoding. /// @@ -208,18 +209,106 @@ pub mod bundle_service_client { })?; let codec = tonic::codec::ProstCodec::default(); let path = http::uri::PathAndQuery::from_static( - "/astria.bundle.v1alpha1.BundleService/StreamExecuteOptimisticBlock", + "/astria.bundle.v1alpha1.OptimisticExecutionService/StreamExecuteOptimisticBlock", ); let mut req = request.into_streaming_request(); req.extensions_mut() .insert( GrpcMethod::new( - "astria.bundle.v1alpha1.BundleService", + "astria.bundle.v1alpha1.OptimisticExecutionService", "StreamExecuteOptimisticBlock", ), ); self.inner.streaming(req, path, codec).await } + } +} +/// Generated client implementations. +#[cfg(feature = "client")] +pub mod bundle_service_client { + #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] + use tonic::codegen::*; + use tonic::codegen::http::Uri; + #[derive(Debug, Clone)] + pub struct BundleServiceClient { + inner: tonic::client::Grpc, + } + impl BundleServiceClient { + /// Attempt to create a new client by connecting to a given endpoint. + pub async fn connect(dst: D) -> Result + where + D: TryInto, + D::Error: Into, + { + let conn = tonic::transport::Endpoint::new(dst)?.connect().await?; + Ok(Self::new(conn)) + } + } + impl BundleServiceClient + where + T: tonic::client::GrpcService, + T::Error: Into, + T::ResponseBody: Body + Send + 'static, + ::Error: Into + Send, + { + pub fn new(inner: T) -> Self { + let inner = tonic::client::Grpc::new(inner); + Self { inner } + } + pub fn with_origin(inner: T, origin: Uri) -> Self { + let inner = tonic::client::Grpc::with_origin(inner, origin); + Self { inner } + } + pub fn with_interceptor( + inner: T, + interceptor: F, + ) -> BundleServiceClient> + where + F: tonic::service::Interceptor, + T::ResponseBody: Default, + T: tonic::codegen::Service< + http::Request, + Response = http::Response< + >::ResponseBody, + >, + >, + , + >>::Error: Into + Send + Sync, + { + BundleServiceClient::new(InterceptedService::new(inner, interceptor)) + } + /// Compress requests with the given encoding. + /// + /// This requires the server to support it otherwise it might respond with an + /// error. + #[must_use] + pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.inner = self.inner.send_compressed(encoding); + self + } + /// Enable decompressing responses. + #[must_use] + pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.inner = self.inner.accept_compressed(encoding); + self + } + /// Limits the maximum size of a decoded message. + /// + /// Default: `4MB` + #[must_use] + pub fn max_decoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_decoding_message_size(limit); + self + } + /// Limits the maximum size of an encoded message. + /// + /// Default: `usize::MAX` + #[must_use] + pub fn max_encoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_encoding_message_size(limit); + self + } /// A bundle submitter requests bundles given a new optimistic Sequencer block, /// and receives a stream of potential bundles for submission, until either a timeout /// or the connection is closed by the client. @@ -257,12 +346,12 @@ pub mod bundle_service_client { } /// Generated server implementations. #[cfg(feature = "server")] -pub mod bundle_service_server { +pub mod optimistic_execution_service_server { #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] use tonic::codegen::*; - /// Generated trait containing gRPC methods that should be implemented for use with BundleServiceServer. + /// Generated trait containing gRPC methods that should be implemented for use with OptimisticExecutionServiceServer. #[async_trait] - pub trait BundleService: Send + Sync + 'static { + pub trait OptimisticExecutionService: Send + Sync + 'static { /// Server streaming response type for the StreamExecuteOptimisticBlock method. type StreamExecuteOptimisticBlockStream: tonic::codegen::tokio_stream::Stream< Item = std::result::Result< @@ -283,26 +372,9 @@ pub mod bundle_service_server { tonic::Response, tonic::Status, >; - /// Server streaming response type for the StreamBundles method. - type StreamBundlesStream: tonic::codegen::tokio_stream::Stream< - Item = std::result::Result, - > - + Send - + 'static; - /// A bundle submitter requests bundles given a new optimistic Sequencer block, - /// and receives a stream of potential bundles for submission, until either a timeout - /// or the connection is closed by the client. - async fn stream_bundles( - self: std::sync::Arc, - request: tonic::Request, - ) -> std::result::Result< - tonic::Response, - tonic::Status, - >; } - /// Geth will serve this to the Auctioneer #[derive(Debug)] - pub struct BundleServiceServer { + pub struct OptimisticExecutionServiceServer { inner: _Inner, accept_compression_encodings: EnabledCompressionEncodings, send_compression_encodings: EnabledCompressionEncodings, @@ -310,7 +382,7 @@ pub mod bundle_service_server { max_encoding_message_size: Option, } struct _Inner(Arc); - impl BundleServiceServer { + impl OptimisticExecutionServiceServer { pub fn new(inner: T) -> Self { Self::from_arc(Arc::new(inner)) } @@ -362,9 +434,10 @@ pub mod bundle_service_server { self } } - impl tonic::codegen::Service> for BundleServiceServer + impl tonic::codegen::Service> + for OptimisticExecutionServiceServer where - T: BundleService, + T: OptimisticExecutionService, B: Body + Send + 'static, B::Error: Into + Send + 'static, { @@ -380,11 +453,15 @@ pub mod bundle_service_server { fn call(&mut self, req: http::Request) -> Self::Future { let inner = self.inner.clone(); match req.uri().path() { - "/astria.bundle.v1alpha1.BundleService/StreamExecuteOptimisticBlock" => { + "/astria.bundle.v1alpha1.OptimisticExecutionService/StreamExecuteOptimisticBlock" => { #[allow(non_camel_case_types)] - struct StreamExecuteOptimisticBlockSvc(pub Arc); + struct StreamExecuteOptimisticBlockSvc< + T: OptimisticExecutionService, + >( + pub Arc, + ); impl< - T: BundleService, + T: OptimisticExecutionService, > tonic::server::StreamingService< super::StreamExecuteOptimisticBlockRequest, > for StreamExecuteOptimisticBlockSvc { @@ -402,7 +479,7 @@ pub mod bundle_service_server { ) -> Self::Future { let inner = Arc::clone(&self.0); let fut = async move { - ::stream_execute_optimistic_block( + ::stream_execute_optimistic_block( inner, request, ) @@ -434,6 +511,152 @@ pub mod bundle_service_server { }; Box::pin(fut) } + _ => { + Box::pin(async move { + Ok( + http::Response::builder() + .status(200) + .header("grpc-status", "12") + .header("content-type", "application/grpc") + .body(empty_body()) + .unwrap(), + ) + }) + } + } + } + } + impl Clone for OptimisticExecutionServiceServer { + fn clone(&self) -> Self { + let inner = self.inner.clone(); + Self { + inner, + accept_compression_encodings: self.accept_compression_encodings, + send_compression_encodings: self.send_compression_encodings, + max_decoding_message_size: self.max_decoding_message_size, + max_encoding_message_size: self.max_encoding_message_size, + } + } + } + impl Clone for _Inner { + fn clone(&self) -> Self { + Self(Arc::clone(&self.0)) + } + } + impl std::fmt::Debug for _Inner { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{:?}", self.0) + } + } + impl tonic::server::NamedService + for OptimisticExecutionServiceServer { + const NAME: &'static str = "astria.bundle.v1alpha1.OptimisticExecutionService"; + } +} +/// Generated server implementations. +#[cfg(feature = "server")] +pub mod bundle_service_server { + #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] + use tonic::codegen::*; + /// Generated trait containing gRPC methods that should be implemented for use with BundleServiceServer. + #[async_trait] + pub trait BundleService: Send + Sync + 'static { + /// Server streaming response type for the StreamBundles method. + type StreamBundlesStream: tonic::codegen::tokio_stream::Stream< + Item = std::result::Result, + > + + Send + + 'static; + /// A bundle submitter requests bundles given a new optimistic Sequencer block, + /// and receives a stream of potential bundles for submission, until either a timeout + /// or the connection is closed by the client. + async fn stream_bundles( + self: std::sync::Arc, + request: tonic::Request, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; + } + #[derive(Debug)] + pub struct BundleServiceServer { + inner: _Inner, + accept_compression_encodings: EnabledCompressionEncodings, + send_compression_encodings: EnabledCompressionEncodings, + max_decoding_message_size: Option, + max_encoding_message_size: Option, + } + struct _Inner(Arc); + impl BundleServiceServer { + pub fn new(inner: T) -> Self { + Self::from_arc(Arc::new(inner)) + } + pub fn from_arc(inner: Arc) -> Self { + let inner = _Inner(inner); + Self { + inner, + accept_compression_encodings: Default::default(), + send_compression_encodings: Default::default(), + max_decoding_message_size: None, + max_encoding_message_size: None, + } + } + pub fn with_interceptor( + inner: T, + interceptor: F, + ) -> InterceptedService + where + F: tonic::service::Interceptor, + { + InterceptedService::new(Self::new(inner), interceptor) + } + /// Enable decompressing requests with the given encoding. + #[must_use] + pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.accept_compression_encodings.enable(encoding); + self + } + /// Compress responses with the given encoding, if the client supports it. + #[must_use] + pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.send_compression_encodings.enable(encoding); + self + } + /// Limits the maximum size of a decoded message. + /// + /// Default: `4MB` + #[must_use] + pub fn max_decoding_message_size(mut self, limit: usize) -> Self { + self.max_decoding_message_size = Some(limit); + self + } + /// Limits the maximum size of an encoded message. + /// + /// Default: `usize::MAX` + #[must_use] + pub fn max_encoding_message_size(mut self, limit: usize) -> Self { + self.max_encoding_message_size = Some(limit); + self + } + } + impl tonic::codegen::Service> for BundleServiceServer + where + T: BundleService, + B: Body + Send + 'static, + B::Error: Into + Send + 'static, + { + type Response = http::Response; + type Error = std::convert::Infallible; + type Future = BoxFuture; + fn poll_ready( + &mut self, + _cx: &mut Context<'_>, + ) -> Poll> { + Poll::Ready(Ok(())) + } + fn call(&mut self, req: http::Request) -> Self::Future { + let inner = self.inner.clone(); + match req.uri().path() { "/astria.bundle.v1alpha1.BundleService/StreamBundles" => { #[allow(non_camel_case_types)] struct StreamBundlesSvc(pub Arc); diff --git a/proto/executionapis/astria/bundle/v1alpha1/bundle.proto b/proto/executionapis/astria/bundle/v1alpha1/bundle.proto index 1e469e32a3..c077f28a0e 100644 --- a/proto/executionapis/astria/bundle/v1alpha1/bundle.proto +++ b/proto/executionapis/astria/bundle/v1alpha1/bundle.proto @@ -53,11 +53,13 @@ message Bundle { bytes prev_rollup_block_hash = 4; } -// Geth will serve this to the Auctioneer -service BundleService { +service OptimisticExecutionService { // Stream blocks from the Auctioneer to Geth for optimistic execution. Geth will stream back // metadata from the executed blocks. rpc StreamExecuteOptimisticBlock(stream StreamExecuteOptimisticBlockRequest) returns (stream StreamExecuteOptimisticBlockResponse); +} + +service BundleService { // A bundle submitter requests bundles given a new optimistic Sequencer block, // and receives a stream of potential bundles for submission, until either a timeout // or the connection is closed by the client.