From 07fa64e1432826beb3f359e3a42cd1dbbd2c727d Mon Sep 17 00:00:00 2001 From: Arya Date: Fri, 29 Nov 2024 07:05:19 -0500 Subject: [PATCH] fixes a concurrency bug, adds more detailed errors. --- zebrad/src/components/mempool/downloads.rs | 2 +- .../lightwalletd/send_transaction_test.rs | 21 +++++++++++++++++++ zebrad/tests/common/lightwalletd/sync.rs | 2 +- 3 files changed, 23 insertions(+), 2 deletions(-) diff --git a/zebrad/src/components/mempool/downloads.rs b/zebrad/src/components/mempool/downloads.rs index 45fd44a7c05..bfe6cab8570 100644 --- a/zebrad/src/components/mempool/downloads.rs +++ b/zebrad/src/components/mempool/downloads.rs @@ -121,7 +121,7 @@ pub enum TransactionDownloadVerifyError { #[error("transaction download / verification was cancelled")] Cancelled, - #[error("transaction did not pass consensus validation")] + #[error("transaction did not pass consensus validation, error: {0}")] Invalid(#[from] zebra_consensus::error::TransactionError), } diff --git a/zebrad/tests/common/lightwalletd/send_transaction_test.rs b/zebrad/tests/common/lightwalletd/send_transaction_test.rs index 68a7ca6daf9..a3addeb2f5f 100644 --- a/zebrad/tests/common/lightwalletd/send_transaction_test.rs +++ b/zebrad/tests/common/lightwalletd/send_transaction_test.rs @@ -167,6 +167,7 @@ pub async fn run() -> Result<()> { ?lightwalletd_rpc_port, "connecting gRPC client to lightwalletd...", ); + zebrad.wait_for_stdout_line(None); let mut rpc_client = connect_to_lightwalletd(lightwalletd_rpc_port).await?; @@ -181,11 +182,14 @@ pub async fn run() -> Result<()> { .get_mempool_tx(Exclude { txid: vec![] }) .await? .into_inner(); + zebrad.wait_for_stdout_line(None); let mut has_tx_with_shielded_elements = false; let mut counter = 0; for block in blocks { + zebrad.wait_for_stdout_line(None); + let (zebrad_child, has_shielded_elements, count) = send_transactions_from_block( zebrad, &mut rpc_client, @@ -195,6 +199,8 @@ pub async fn run() -> Result<()> { .await?; zebrad = zebrad_child; + zebrad.wait_for_stdout_line(None); + has_tx_with_shielded_elements |= has_shielded_elements; counter += count; @@ -218,6 +224,8 @@ pub async fn run() -> Result<()> { /// can be found in the mempool via lightwalletd, and commits the block to Zebra's chainstate. /// /// Returns the zebrad test child that's handling the RPC requests. + +#[tracing::instrument(skip_all)] async fn send_transactions_from_block( mut zebrad: TestChild, rpc_client: &mut CompactTxStreamerClient, @@ -264,6 +272,8 @@ async fn send_transactions_from_block( }; tracing::info!(?transaction_hash, "sending transaction..."); + // TODO: This may consume expected lines, try increasing the tracing buffer size instead if a full stdout pipe is causing panics. + zebrad.wait_for_stdout_line(None); let request = prepare_send_transaction_request(transaction.clone()); @@ -284,18 +294,24 @@ async fn send_transactions_from_block( // Fails if there are only coinbase transactions in the first 50 future blocks tracing::info!("waiting for mempool to verify some transactions..."); zebrad.expect_stdout_line_matches("sending mempool transaction broadcast")?; + } else { + zebrad.wait_for_stdout_line(None); } // Wait for more transactions to verify, `GetMempoolTx` only returns txs where tx.HasShieldedElements() // tokio::time::sleep(std::time::Duration::from_secs(2)).await; + zebrad.wait_for_stdout_line(None); + sleep_until_lwd_last_mempool_refresh.await; + zebrad.wait_for_stdout_line(None); tracing::info!("calling GetMempoolTx gRPC to fetch transactions..."); let mut transactions_stream = rpc_client .get_mempool_tx(Exclude { txid: vec![] }) .await? .into_inner(); + zebrad.wait_for_stdout_line(None); // Sometimes lightwalletd doesn't check the mempool, and waits for the next block instead. // If that happens, we skip the rest of the test. @@ -319,6 +335,8 @@ async fn send_transactions_from_block( tracing::info!("checking the mempool contains some of the sent transactions..."); let mut counter = 0; while let Some(tx) = transactions_stream.message().await? { + zebrad.wait_for_stdout_line(None); + let hash: [u8; 32] = tx.hash.clone().try_into().expect("hash is correct length"); let hash = transaction::Hash::from_bytes_in_display_order(&hash); @@ -330,15 +348,18 @@ async fn send_transactions_from_block( counter += 1; } + zebrad.wait_for_stdout_line(None); // TODO: GetMempoolStream: make sure at least one of the transactions were inserted into the mempool. tracing::info!("calling GetMempoolStream gRPC to fetch transactions..."); let mut transaction_stream = rpc_client.get_mempool_stream(Empty {}).await?.into_inner(); + zebrad.wait_for_stdout_line(None); let mut _counter = 0; while let Some(_tx) = transaction_stream.message().await? { // TODO: check tx.data or tx.height here? _counter += 1; + zebrad.wait_for_stdout_line(None); } Ok((zebrad, has_tx_with_shielded_elements, counter)) diff --git a/zebrad/tests/common/lightwalletd/sync.rs b/zebrad/tests/common/lightwalletd/sync.rs index 8dce05a9150..3a55aedb5c2 100644 --- a/zebrad/tests/common/lightwalletd/sync.rs +++ b/zebrad/tests/common/lightwalletd/sync.rs @@ -32,7 +32,7 @@ pub fn wait_for_zebrad_and_lightwalletd_sync< wait_for_zebrad_mempool: bool, wait_for_zebrad_tip: bool, ) -> Result<(TestChild, TestChild

)> { - let is_zebrad_finished = AtomicBool::new(false); + let is_zebrad_finished = AtomicBool::new(!wait_for_zebrad_tip); let is_lightwalletd_finished = AtomicBool::new(false); let is_zebrad_finished = &is_zebrad_finished;