From c3dcc61ad104b2db5c17d0a4850d52827f7d0dab Mon Sep 17 00:00:00 2001 From: Ethan Oroshiba Date: Thu, 26 Sep 2024 09:07:05 -0500 Subject: [PATCH] fix(conductor): fix flaky soft_and_firm test (#1472) ## Summary Fixed flakiness in conductor `soft_and_firm::simple` test. ## Background Test would previously fail ~6% of the time due to receiving firm block before soft block, hence never fulfilling the soft update commitment mock. ## Changes - Split `simple()` into 2 tests: 1. Tests for receiving soft block first, and then firm block (and enforcing this order). 2. Tests for receiving firm block first, then ignoring the later soft block response at the same height. The tests for receiving the soft block at the next height. - made changes to `astria-grpc` to enable a delayed response ## Testing All tests passing. ## Related Issues closes #1143 --- .../tests/blackbox/helpers/macros.rs | 76 +++++-- .../tests/blackbox/helpers/mod.rs | 43 ++-- .../tests/blackbox/soft_and_firm.rs | 215 ++++++++++++++++-- crates/astria-grpc-mock/src/mock.rs | 12 +- crates/astria-grpc-mock/src/mock_server.rs | 11 +- crates/astria-grpc-mock/src/mock_set.rs | 18 +- crates/astria-grpc-mock/src/mounted_mock.rs | 14 +- crates/astria-grpc-mock/src/response.rs | 99 +++++--- 8 files changed, 375 insertions(+), 113 deletions(-) diff --git a/crates/astria-conductor/tests/blackbox/helpers/macros.rs b/crates/astria-conductor/tests/blackbox/helpers/macros.rs index df3520530c..b416db47ff 100644 --- a/crates/astria-conductor/tests/blackbox/helpers/macros.rs +++ b/crates/astria-conductor/tests/blackbox/helpers/macros.rs @@ -115,6 +115,20 @@ macro_rules! mount_celestia_blobs { celestia_height: $celestia_height:expr, sequencer_heights: [ $($sequencer_height:expr),+ ] $(,)? + ) => { + mount_celestia_blobs!( + $test_env, + celestia_height: $celestia_height, + sequencer_heights: [ $($sequencer_height),+ ], + delay: None, + ) + }; + ( + $test_env:ident, + celestia_height: $celestia_height:expr, + sequencer_heights: [ $($sequencer_height:expr),+ ], + delay: $delay:expr + $(,)? ) => {{ let blobs = $crate::helpers::make_blobs(&[ $( $sequencer_height ),+ ]); $test_env @@ -122,6 +136,7 @@ macro_rules! mount_celestia_blobs { $celestia_height, $crate::sequencer_namespace(), vec![blobs.header], + $delay, ) .await; $test_env @@ -129,6 +144,7 @@ macro_rules! mount_celestia_blobs { $celestia_height, $crate::rollup_namespace(), vec![blobs.rollup], + $delay, ) .await }}; @@ -177,6 +193,22 @@ macro_rules! mount_get_commitment_state { #[macro_export] macro_rules! mount_update_commitment_state { + ( + $test_env:ident, + firm: ( number: $firm_number:expr, hash: $firm_hash:expr, parent: $firm_parent:expr$(,)? ), + soft: ( number: $soft_number:expr, hash: $soft_hash:expr, parent: $soft_parent:expr$(,)? ), + base_celestia_height: $base_celestia_height:expr + $(,)? + ) => { + mount_update_commitment_state!( + $test_env, + mock_name: None, + firm: ( number: $firm_number, hash: $firm_hash, parent: $firm_parent, ), + soft: ( number: $soft_number, hash: $soft_hash, parent: $soft_parent, ), + base_celestia_height: $base_celestia_height, + expected_calls: 1, + ) + }; ( $test_env:ident, mock_name: $mock_name:expr, @@ -184,6 +216,24 @@ macro_rules! mount_update_commitment_state { soft: ( number: $soft_number:expr, hash: $soft_hash:expr, parent: $soft_parent:expr$(,)? ), base_celestia_height: $base_celestia_height:expr $(,)? + ) => { + mount_update_commitment_state!( + $test_env, + mock_name: $mock_name, + firm: ( number: $firm_number, hash: $firm_hash, parent: $firm_parent, ), + soft: ( number: $soft_number, hash: $soft_hash, parent: $soft_parent, ), + base_celestia_height: $base_celestia_height, + expected_calls: 1, + ) + }; + ( + $test_env:ident, + mock_name: $mock_name:expr, + firm: ( number: $firm_number:expr, hash: $firm_hash:expr, parent: $firm_parent:expr$(,)? ), + soft: ( number: $soft_number:expr, hash: $soft_hash:expr, parent: $soft_parent:expr$(,)? ), + base_celestia_height: $base_celestia_height:expr, + expected_calls: $expected_calls:expr + $(,)? ) => { $test_env .mount_update_commitment_state( @@ -201,24 +251,10 @@ macro_rules! mount_update_commitment_state { ), base_celestia_height: $base_celestia_height, ), + $expected_calls, ) .await }; - ( - $test_env:ident, - firm: ( number: $firm_number:expr, hash: $firm_hash:expr, parent: $firm_parent:expr$(,)? ), - soft: ( number: $soft_number:expr, hash: $soft_hash:expr, parent: $soft_parent:expr$(,)? ), - base_celestia_height: $base_celestia_height:expr - $(,)? - ) => { - mount_update_commitment_state!( - $test_env, - mock_name: None, - firm: ( number: $firm_number, hash: $firm_hash, parent: $firm_parent, ), - soft: ( number: $soft_number, hash: $soft_hash, parent: $soft_parent, ), - base_celestia_height: $base_celestia_height, - ) - }; } #[macro_export] @@ -270,7 +306,7 @@ macro_rules! mount_executed_block { #[macro_export] macro_rules! mount_get_filtered_sequencer_block { - ($test_env:ident, sequencer_height: $height:expr $(,)?) => { + ($test_env:ident, sequencer_height: $height:expr, delay: $delay:expr $(,)?) => { $test_env .mount_get_filtered_sequencer_block( ::astria_core::generated::sequencerblock::v1alpha1::GetFilteredSequencerBlockRequest { @@ -278,9 +314,17 @@ macro_rules! mount_get_filtered_sequencer_block { rollup_ids: vec![$crate::ROLLUP_ID.to_raw()], }, $crate::filtered_sequencer_block!(sequencer_height: $height), + $delay, ) .await; }; + ($test_env:ident, sequencer_height: $height:expr$(,)?) => { + mount_get_filtered_sequencer_block!( + $test_env, + sequencer_height: $height, + delay: Duration::from_secs(0), + ) + }; } #[macro_export] diff --git a/crates/astria-conductor/tests/blackbox/helpers/mod.rs b/crates/astria-conductor/tests/blackbox/helpers/mod.rs index 01af511319..4cf9835176 100644 --- a/crates/astria-conductor/tests/blackbox/helpers/mod.rs +++ b/crates/astria-conductor/tests/blackbox/helpers/mod.rs @@ -19,11 +19,7 @@ use astria_core::{ }, primitive::v1::RollupId, }; -use astria_grpc_mock::{ - response::ResponseResult, - AnyMessage, - Respond, -}; +use astria_grpc_mock::response::error_response; use bytes::Bytes; use celestia_types::{ nmt::Namespace, @@ -198,6 +194,7 @@ impl TestConductor { celestia_height: u64, namespace: Namespace, blobs: Vec, + delay: Option, ) { use base64::prelude::*; use wiremock::{ @@ -209,6 +206,7 @@ impl TestConductor { Request, ResponseTemplate, }; + let delay = delay.unwrap_or(Duration::from_millis(0)); let namespace_params = BASE64_STANDARD.encode(namespace.as_bytes()); Mock::given(body_partial_json(json!({ "jsonrpc": "2.0", @@ -222,11 +220,13 @@ impl TestConductor { .respond_with(move |request: &Request| { let body: serde_json::Value = serde_json::from_slice(&request.body).unwrap(); let id = body.get("id"); - ResponseTemplate::new(200).set_body_json(json!({ - "jsonrpc": "2.0", - "id": id, - "result": blobs, - })) + ResponseTemplate::new(200) + .set_body_json(json!({ + "jsonrpc": "2.0", + "id": id, + "result": blobs, + })) + .set_delay(delay) }) .expect(1..) .mount(&self.mock_http) @@ -407,6 +407,7 @@ impl TestConductor { &self, expected_pbjson: S, response: FilteredSequencerBlock, + delay: Duration, ) { use astria_grpc_mock::{ matcher::message_partial_pbjson, @@ -417,7 +418,7 @@ impl TestConductor { "get_filtered_sequencer_block", message_partial_pbjson(&expected_pbjson), ) - .respond_with(constant_response(response)) + .respond_with(constant_response(response).set_delay(delay)) .expect(1..) .mount(&self.mock_grpc.mock_server) .await; @@ -427,6 +428,7 @@ impl TestConductor { &self, mock_name: Option<&str>, commitment_state: CommitmentState, + expected_calls: u64, ) -> astria_grpc_mock::MockGuard { use astria_core::generated::execution::v1alpha2::UpdateCommitmentStateRequest; use astria_grpc_mock::{ @@ -444,7 +446,7 @@ impl TestConductor { if let Some(name) = mock_name { mock = mock.with_name(name); } - mock.expect(1) + mock.expect(expected_calls) .mount_as_scoped(&self.mock_grpc.mock_server) .await } @@ -697,20 +699,3 @@ pub fn rollup_namespace() -> Namespace { pub fn sequencer_namespace() -> Namespace { astria_core::celestia::namespace_v0_from_sha256_of_bytes(SEQUENCER_CHAIN_ID.as_bytes()) } - -pub struct ErrorResponse { - status: tonic::Status, -} - -impl Respond for ErrorResponse { - fn respond(&self, _req: &tonic::Request) -> ResponseResult { - Err(self.status.clone()) - } -} - -#[must_use] -pub fn error_response(code: tonic::Code) -> ErrorResponse { - ErrorResponse { - status: tonic::Status::new(code, "error"), - } -} diff --git a/crates/astria-conductor/tests/blackbox/soft_and_firm.rs b/crates/astria-conductor/tests/blackbox/soft_and_firm.rs index 27cf904ab0..4d66792592 100644 --- a/crates/astria-conductor/tests/blackbox/soft_and_firm.rs +++ b/crates/astria-conductor/tests/blackbox/soft_and_firm.rs @@ -33,12 +33,8 @@ use crate::{ /// 4. block information for rollup number 1, sequencer height 2 is reconstructed from Celestia /// height 1 /// 5. the rollup's firm commitment state is updated (but without executing the block) -/// -/// NOTE: there is a potential race condition in this test in that the information could be first -/// retrieved from Celestia before Sequencer and executed against the rollup. In that case step 3. -/// would be skipped (no soft commitment update). #[tokio::test(flavor = "multi_thread", worker_threads = 1)] -async fn simple() { +async fn executes_soft_first_then_updates_firm() { let test_conductor = spawn_conductor(CommitLevel::SoftAndFirm).await; mount_get_genesis_info!( @@ -74,10 +70,51 @@ async fn simple() { height: 1u32, ); + mount_get_filtered_sequencer_block!( + test_conductor, + sequencer_height: 3, + ); + + let execute_block = mount_executed_block!( + test_conductor, + number: 2, + hash: [2; 64], + parent: [1; 64], + ); + + let update_commitment_state_soft = mount_update_commitment_state!( + test_conductor, + firm: ( + number: 1, + hash: [1; 64], + parent: [0; 64], + ), + soft: ( + number: 2, + hash: [2; 64], + parent: [1; 64], + ), + base_celestia_height: 1, + ); + + timeout( + Duration::from_millis(500), + join( + execute_block.wait_until_satisfied(), + update_commitment_state_soft.wait_until_satisfied(), + ), + ) + .await + .expect( + "Conductor should have executed the block and updated the soft commitment state within \ + 500ms", + ); + mount_celestia_blobs!( test_conductor, celestia_height: 1, sequencer_heights: [3], + delay: Some(Duration::from_millis(500)) ); mount_sequencer_commit!( @@ -87,11 +124,99 @@ async fn simple() { mount_sequencer_validator_set!(test_conductor, height: 2u32); - mount_get_filtered_sequencer_block!( + let update_commitment_state_firm = mount_update_commitment_state!( test_conductor, - sequencer_height: 3, + firm: ( + number: 2, + hash: [2; 64], + parent: [1; 64], + ), + soft: ( + number: 2, + hash: [2; 64], + parent: [1; 64], + ), + base_celestia_height: 1, + ); + + timeout( + Duration::from_millis(1000), + update_commitment_state_firm.wait_until_satisfied(), + ) + .await + .expect("conductor should have updated the firm commitment state within 1000ms"); +} + +/// Tests if a single block is executed and the rollup's state updated after first receiving a firm +/// block, ensuring that update commitment state is not called upon receiving a tardy soft block. +/// Then, ensures the conductor updates the state for the soft block at the next height. +/// +/// The following steps occur: +/// 1. Firm and soft blocks at the current height are mounted, the soft block with a 500ms delay to +/// allow for the firm block to be received first. +/// 2. The soft block for the next height is mounted with a 1000ms delay, so that execution and +/// state update of the current height happen before receipt of the next block. +/// 3. Mounts are made for firm and soft update commitment state calls, with the soft mount +/// expecting exactly 0 calls. +/// 4. 1000ms is allotted for the conductor to execute the block and update the firm commitment +/// state, noting that this allows time to test for an erroneously updated soft commitment state +/// before the conductor receives the next block. +/// 5. 2000ms is allotted for the conductor to execute the next block and update the soft commitment +/// state at the next height. +#[expect( + clippy::too_many_lines, + reason = "all mounts and test logic are necessary" +)] +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn executes_firm_then_soft_at_next_height() { + let test_conductor = spawn_conductor(CommitLevel::SoftAndFirm).await; + + mount_get_genesis_info!( + test_conductor, + sequencer_genesis_block_height: 1, + celestia_block_variance: 10, + ); + + mount_get_commitment_state!( + test_conductor, + firm: ( + number: 1, + hash: [1; 64], + parent: [0; 64], + ), + soft: ( + number: 1, + hash: [1; 64], + parent: [0; 64], + ), + base_celestia_height: 1, ); + mount_abci_info!( + test_conductor, + latest_sequencer_height: 4, + ); + + mount_sequencer_genesis!(test_conductor); + + mount_celestia_header_network_head!( + test_conductor, + height: 1u32, + ); + + mount_celestia_blobs!( + test_conductor, + celestia_height: 1, + sequencer_heights: [3], + ); + + mount_sequencer_commit!( + test_conductor, + height: 3u32, + ); + + mount_sequencer_validator_set!(test_conductor, height: 2u32); + let execute_block = mount_executed_block!( test_conductor, number: 2, @@ -99,8 +224,44 @@ async fn simple() { parent: [1; 64], ); - let update_commitment_state_soft = mount_update_commitment_state!( + // Mount soft block at current height with a slight delay + mount_get_filtered_sequencer_block!( + test_conductor, + sequencer_height: 3, + delay: Duration::from_millis(500), + ); + + // Mount soft block at next height with substantial delay + mount_get_filtered_sequencer_block!( + test_conductor, + sequencer_height: 4, + delay: Duration::from_millis(1000), + ); + + let update_commitment_state_firm = mount_update_commitment_state!( test_conductor, + firm: ( + number: 2, + hash: [2; 64], + parent: [1; 64], + ), + soft: ( + number: 2, + hash: [2; 64], + parent: [1; 64], + ), + base_celestia_height: 1, + ); + + // This guard's conditions will be checked when it is dropped, ensuring that there have been 0 + // calls to update the commitment state for the stale soft block. This is done instead of + // waiting for the guard to be satisfied because if we call `wait_until_satisfied` on it, it + // will succeed immediately and future erroneous calls will not be checked. It would be most + // ideal to mount this logic directly to the server, but this workaround functions with the + // current setup of the blackbox test helpers. + let _stale_update_soft_commitment_state = mount_update_commitment_state!( + test_conductor, + mock_name: "should_be_ignored_update_commitment_state_soft", firm: ( number: 1, hash: [1; 64], @@ -112,9 +273,30 @@ async fn simple() { parent: [1; 64], ), base_celestia_height: 1, + expected_calls: 0, ); - let update_commitment_state_firm = mount_update_commitment_state!( + timeout( + Duration::from_millis(1000), + join( + execute_block.wait_until_satisfied(), + update_commitment_state_firm.wait_until_satisfied(), + ), + ) + .await + .expect( + "Conductor should have executed the block and updated the firm commitment state within \ + 1000ms", + ); + + let execute_block = mount_executed_block!( + test_conductor, + number: 3, + hash: [3; 64], + parent: [2; 64], + ); + + let update_commitment_state_soft = mount_update_commitment_state!( test_conductor, firm: ( number: 2, @@ -122,25 +304,24 @@ async fn simple() { parent: [1; 64], ), soft: ( - number: 2, - hash: [2; 64], - parent: [1; 64], + number: 3, + hash: [3; 64], + parent: [2; 64], ), base_celestia_height: 1, ); timeout( - Duration::from_millis(1000), - join3( + Duration::from_millis(2000), + join( execute_block.wait_until_satisfied(), update_commitment_state_soft.wait_until_satisfied(), - update_commitment_state_firm.wait_until_satisfied(), ), ) .await .expect( - "conductor should have executed the block and updated the soft and firm commitment states \ - within 1000ms", + "conductor should have executed the block and updated the soft commitment state within \ + 2000ms", ); } diff --git a/crates/astria-grpc-mock/src/mock.rs b/crates/astria-grpc-mock/src/mock.rs index f0a7ff3e16..fd3fa5df79 100644 --- a/crates/astria-grpc-mock/src/mock.rs +++ b/crates/astria-grpc-mock/src/mock.rs @@ -8,12 +8,10 @@ use std::ops::{ RangeToInclusive, }; -use super::{ - response::Respond, - AnyMessage, -}; +use super::AnyMessage; use crate::{ mock_server::MockGuard, + response::ResponseTemplate, MockServer, }; @@ -32,7 +30,7 @@ impl Match for Matcher { pub struct Mock { pub(crate) rpc: &'static str, pub(crate) matchers: Vec, - pub(crate) response: Box, + pub(crate) response: ResponseTemplate, pub(crate) max_n_matches: Option, pub(crate) expectation_range: Times, pub(crate) name: Option, @@ -86,7 +84,7 @@ impl MockBuilder { self } - pub fn respond_with(self, rsp: impl Respond + 'static) -> Mock { + pub fn respond_with(self, rsp: ResponseTemplate) -> Mock { let Self { rpc, matchers, @@ -94,7 +92,7 @@ impl MockBuilder { Mock { rpc, matchers, - response: Box::new(rsp), + response: rsp, max_n_matches: None, name: None, expectation_range: Times(TimesEnum::Unbounded(RangeFull)), diff --git a/crates/astria-grpc-mock/src/mock_server.rs b/crates/astria-grpc-mock/src/mock_server.rs index bd74a5b8bc..09b6c8d535 100644 --- a/crates/astria-grpc-mock/src/mock_server.rs +++ b/crates/astria-grpc-mock/src/mock_server.rs @@ -54,7 +54,11 @@ impl MockServer { rpc: &'static str, req: tonic::Request, ) -> tonic::Result> { - self.state.write().await.handle_request(rpc, req) + let (response, delay) = self.state.write().await.handle_request(rpc, req); + if let Some(delay) = delay { + tokio::time::sleep(delay).await; + } + response } pub async fn register(&self, mock: Mock) { @@ -176,7 +180,10 @@ impl MockServerState { &mut self, rpc: &'static str, req: tonic::Request, - ) -> tonic::Result> { + ) -> ( + tonic::Result>, + Option, + ) { if let Some(received_requests) = &mut self.received_requests { received_requests.push((rpc, erase_request(clone_request(&req)).into())); } diff --git a/crates/astria-grpc-mock/src/mock_set.rs b/crates/astria-grpc-mock/src/mock_set.rs index 341d858a4f..59e4169ffb 100644 --- a/crates/astria-grpc-mock/src/mock_set.rs +++ b/crates/astria-grpc-mock/src/mock_set.rs @@ -55,29 +55,34 @@ impl MockSet { &mut self, rpc: &'static str, req: tonic::Request, - ) -> tonic::Result> { + ) -> ( + tonic::Result>, + Option, + ) { debug!(rpc, "handling request."); // perform erasure here so that it's not done in every single `Mock::matches` call. let erased = erase_request(req); let mut mock_response: Option>> = None; + let mut delay = None; for (mock, mock_state) in &mut self.mocks { if let MountedMockState::OutOfScope = mock_state { continue; } match mock.match_and_respond::(rpc, &erased) { - MockResult::NoMatch => continue, - MockResult::BadResponse(status) => { + (MockResult::NoMatch, _) => continue, + (MockResult::BadResponse(status), _) => { mock_response.replace(Err(status)); break; } - MockResult::Success(response) => { + (MockResult::Success(response), response_delay) => { mock_response.replace(response); + delay = response_delay; break; } } } - mock_response + let result = mock_response .ok_or_else(|| { let mut msg = "got unexpected request: ".to_string(); msg.push_str( @@ -86,7 +91,8 @@ impl MockSet { ); tonic::Status::not_found(msg) }) - .and_then(std::convert::identity) + .and_then(std::convert::identity); + (result, delay) } pub(crate) fn register(&mut self, mock: Mock) -> (Arc<(Notify, AtomicBool)>, MockId) { diff --git a/crates/astria-grpc-mock/src/mounted_mock.rs b/crates/astria-grpc-mock/src/mounted_mock.rs index efd14f52a2..5fed8b00ae 100644 --- a/crates/astria-grpc-mock/src/mounted_mock.rs +++ b/crates/astria-grpc-mock/src/mounted_mock.rs @@ -111,7 +111,7 @@ impl MountedMock { &mut self, rpc: &'static str, request: &Request, - ) -> MockResult { + ) -> (MockResult, Option) { let n_matches = u64::try_from(self.successful_responses.len() + self.bad_responses.len()).ok(); if self.inner.max_n_matches == n_matches @@ -122,16 +122,18 @@ impl MountedMock { .iter() .all(|matcher| matcher.matches(request)) { - return MockResult::NoMatch; + return (MockResult::NoMatch, None); } + let mut delay = None; let response = match self.inner.response.respond(request) { - Err(status) => { + (Err(status), _) => { self.successful_responses .push((clone_request(request), Err(status.clone()))); Ok(Err(status)) } - Ok(mock_response) => { + (Ok(mock_response), rsp_delay) => { + delay = rsp_delay; let (metadata, erased_message, extensions) = clone_response(&mock_response.inner).into_parts(); if let Ok(message) = erased_message.clone_box().into_any().downcast::() { @@ -173,8 +175,8 @@ impl MountedMock { self.notify.0.notify_waiters(); } match response { - Ok(ok) => MockResult::Success(ok), - Err(err) => MockResult::BadResponse(err), + Ok(ok) => (MockResult::Success(ok), delay), + Err(err) => (MockResult::BadResponse(err), None), } } diff --git a/crates/astria-grpc-mock/src/response.rs b/crates/astria-grpc-mock/src/response.rs index 3e05e669d2..6073d64c4b 100644 --- a/crates/astria-grpc-mock/src/response.rs +++ b/crates/astria-grpc-mock/src/response.rs @@ -1,4 +1,7 @@ -use std::marker::PhantomData; +use std::{ + marker::PhantomData, + time::Duration, +}; use super::{ clone_response, @@ -10,14 +13,17 @@ pub fn constant_response< T: erased_serde::Serialize + prost::Name + Clone + Default + Send + Sync + 'static, >( value: T, -) -> ConstantResponse { - ConstantResponse { - type_name: std::any::type_name::(), - response: erase_response(tonic::Response::new(value)), +) -> ResponseTemplate { + ResponseTemplate { + response: Box::new(ConstantResponse { + type_name: std::any::type_name::(), + response: erase_response(tonic::Response::new(value)), + }), + delay: None, } } -pub struct ConstantResponse { +struct ConstantResponse { type_name: &'static str, response: tonic::Response, } @@ -34,37 +40,30 @@ impl Respond for ConstantResponse { #[must_use] pub fn default_response< T: erased_serde::Serialize + prost::Name + Clone + Default + Send + Sync + 'static, ->() -> DefaultResponse { +>() -> ResponseTemplate { let response = T::default(); - DefaultResponse { - type_name: std::any::type_name::(), - response: erase_response(tonic::Response::new(response)), - } -} - -pub struct DefaultResponse { - type_name: &'static str, - response: tonic::Response, -} - -impl Respond for DefaultResponse { - fn respond(&self, _req: &tonic::Request) -> ResponseResult { - Ok(MockResponse { - type_name: self.type_name, - inner: clone_response(&self.response), - }) + ResponseTemplate { + response: Box::new(ConstantResponse { + type_name: std::any::type_name::(), + response: erase_response(tonic::Response::new(response)), + }), + delay: None, } } -pub fn dynamic_response(responder: F) -> DynamicResponse +pub fn dynamic_response(responder: F) -> ResponseTemplate where O: erased_serde::Serialize + prost::Name + Clone + 'static, - F: Fn(&I) -> O, + F: Send + Sync + 'static + Fn(&I) -> O, + I: Send + Sync + 'static, { - DynamicResponse { - type_name: std::any::type_name::(), - responder: Box::new(responder), - _phantom_data: PhantomData, + ResponseTemplate { + response: Box::new(DynamicResponse { + type_name: std::any::type_name::(), + responder: Box::new(responder), + _phantom_data: PhantomData, + }), + delay: None, } } @@ -74,6 +73,26 @@ pub struct DynamicResponse { _phantom_data: PhantomData<(I, O)>, } +struct ErrorResponse { + status: tonic::Status, +} + +impl Respond for ErrorResponse { + fn respond(&self, _req: &tonic::Request) -> ResponseResult { + Err(self.status.clone()) + } +} + +#[must_use] +pub fn error_response(code: tonic::Code) -> ResponseTemplate { + ResponseTemplate { + response: Box::new(ErrorResponse { + status: tonic::Status::new(code, "error"), + }), + delay: None, + } +} + impl Respond for DynamicResponse where I: Send + Sync + 'static, @@ -119,6 +138,26 @@ impl Clone for MockResponse { } } +pub struct ResponseTemplate { + response: Box, + delay: Option, +} + +impl ResponseTemplate { + pub(crate) fn respond( + &self, + req: &tonic::Request, + ) -> (ResponseResult, Option) { + (self.response.respond(req), self.delay) + } + + #[must_use] + pub fn set_delay(mut self, delay: Duration) -> Self { + self.delay = Some(delay); + self + } +} + pub trait Respond: Send + Sync { fn respond(&self, req: &tonic::Request) -> ResponseResult; }