From 7a5f323ceedba8c3781b585ed76aee41f09a8564 Mon Sep 17 00:00:00 2001 From: Arya Date: Fri, 22 Nov 2024 00:53:47 -0500 Subject: [PATCH 01/16] Fixes bug in send transaction test --- .../lightwalletd/send_transaction_test.rs | 212 ++++++++++-------- 1 file changed, 117 insertions(+), 95 deletions(-) diff --git a/zebrad/tests/common/lightwalletd/send_transaction_test.rs b/zebrad/tests/common/lightwalletd/send_transaction_test.rs index bee6cf78356..70c912dd28c 100644 --- a/zebrad/tests/common/lightwalletd/send_transaction_test.rs +++ b/zebrad/tests/common/lightwalletd/send_transaction_test.rs @@ -21,10 +21,12 @@ use std::{cmp::min, sync::Arc, time::Duration}; use color_eyre::eyre::Result; use zebra_chain::{ + block::Block, parameters::Network::{self, *}, serialization::ZcashSerialize, transaction::{self, Transaction}, }; +use zebra_node_services::rpc_client::RpcRequestClient; use zebra_rpc::queue::CHANNEL_AND_QUEUE_CAPACITY; use zebrad::components::mempool::downloads::MAX_INBOUND_CONCURRENCY; @@ -36,6 +38,7 @@ use crate::common::{ sync::wait_for_zebrad_and_lightwalletd_sync, wallet_grpc::{self, connect_to_lightwalletd, Empty, Exclude}, }, + regtest::MiningRpcMethods, sync::LARGE_CHECKPOINT_TIMEOUT, test_type::TestType::{self, *}, }; @@ -85,11 +88,19 @@ pub async fn run() -> Result<()> { "running gRPC send transaction test using lightwalletd & zebrad", ); - let transactions = - load_transactions_from_future_blocks(network.clone(), test_type, test_name).await?; + let mut count = 0; + let blocks: Vec = + get_future_blocks(&network, test_type, test_name, MAX_NUM_FUTURE_BLOCKS) + .await? + .into_iter() + .take_while(|block| { + count += block.transactions.len(); + count <= max_sent_transactions() + }) + .collect(); tracing::info!( - transaction_count = ?transactions.len(), + blocks_count = ?blocks.len(), partial_sync_path = ?zebrad_state_path, "got transactions to send, spawning isolated zebrad...", ); @@ -113,6 +124,8 @@ pub async fn run() -> Result<()> { let zebra_rpc_address = zebra_rpc_address.expect("lightwalletd test must have RPC port"); + let zebrad_rpc_client = RpcRequestClient::new(zebra_rpc_address); + tracing::info!( ?test_type, ?zebra_rpc_address, @@ -164,110 +177,119 @@ pub async fn run() -> Result<()> { .await? .into_inner(); - // Lightwalletd won't call `get_raw_mempool` again until 2 seconds after the last call: - // - // - // So we need to wait much longer than that here. - let sleep_until_lwd_last_mempool_refresh = - tokio::time::sleep(std::time::Duration::from_secs(4)); - - let transaction_hashes: Vec = - transactions.iter().map(|tx| tx.hash()).collect(); - - tracing::info!( - transaction_count = ?transactions.len(), - ?transaction_hashes, - "connected gRPC client to lightwalletd, sending transactions...", - ); - - let mut has_tx_with_shielded_elements = false; - for transaction in transactions { - let transaction_hash = transaction.hash(); - - // See - has_tx_with_shielded_elements |= transaction.version() >= 4 - && (transaction.has_shielded_inputs() || transaction.has_shielded_outputs()); - - let expected_response = wallet_grpc::SendResponse { - error_code: 0, - error_message: format!("\"{transaction_hash}\""), - }; - - tracing::info!(?transaction_hash, "sending transaction..."); - - let request = prepare_send_transaction_request(transaction); - - let response = rpc_client.send_transaction(request).await?.into_inner(); - - assert_eq!(response, expected_response); - } - - // Check if some transaction is sent to mempool, - // Fails if there are only coinbase transactions in the first 50 future blocks - tracing::info!("waiting for mempool to verify some transactions..."); - zebrad.expect_stdout_line_matches("sending mempool transaction broadcast")?; - // Wait for more transactions to verify, `GetMempoolTx` only returns txs where tx.HasShieldedElements() - // - tokio::time::sleep(std::time::Duration::from_secs(2)).await; - sleep_until_lwd_last_mempool_refresh.await; - - tracing::info!("calling GetMempoolTx gRPC to fetch transactions..."); - let mut transactions_stream = rpc_client - .get_mempool_tx(Exclude { txid: vec![] }) - .await? - .into_inner(); + for block in blocks { + // Lightwalletd won't call `get_raw_mempool` again until 2 seconds after the last call: + // + // + // So we need to wait much longer than that here. + let sleep_until_lwd_last_mempool_refresh = + tokio::time::sleep(std::time::Duration::from_secs(4)); - // Sometimes lightwalletd doesn't check the mempool, and waits for the next block instead. - // If that happens, we skip the rest of the test. - tracing::info!("checking if lightwalletd has queried the mempool..."); - - // We need a short timeout here, because sometimes this message is not logged. - zebrad = zebrad.with_timeout(Duration::from_secs(60)); - let tx_log = - zebrad.expect_stdout_line_matches("answered mempool request .*req.*=.*TransactionIds"); - // Reset the failed timeout and give the rest of the test enough time to finish. - #[allow(unused_assignments)] - { - zebrad = zebrad.with_timeout(LARGE_CHECKPOINT_TIMEOUT); - } + let transactions = &block.transactions; - if tx_log.is_err() { - tracing::info!("lightwalletd didn't query the mempool, skipping mempool contents checks"); - return Ok(()); - } + let transaction_hashes: Vec = + transactions.iter().map(|tx| tx.hash()).collect(); - tracing::info!("checking the mempool contains some of the sent transactions..."); - let mut counter = 0; - while let Some(tx) = transactions_stream.message().await? { - let hash: [u8; 32] = tx.hash.clone().try_into().expect("hash is correct length"); - let hash = transaction::Hash::from_bytes_in_display_order(&hash); + tracing::info!( + transaction_count = ?transactions.len(), + ?transaction_hashes, + "connected gRPC client to lightwalletd, sending transactions...", + ); - assert!( - transaction_hashes.contains(&hash), - "unexpected transaction {hash:?}\n\ + let mut has_tx_with_shielded_elements = false; + for transaction in transactions { + let transaction_hash = transaction.hash(); + + // See + has_tx_with_shielded_elements |= transaction.version() >= 4 + && (transaction.has_shielded_inputs() || transaction.has_shielded_outputs()); + + let expected_response = wallet_grpc::SendResponse { + error_code: 0, + error_message: format!("\"{transaction_hash}\""), + }; + + tracing::info!(?transaction_hash, "sending transaction..."); + + let request = prepare_send_transaction_request(transaction.clone()); + + let response = rpc_client.send_transaction(request).await?.into_inner(); + + assert_eq!(response, expected_response); + } + + // Check if some transaction is sent to mempool, + // Fails if there are only coinbase transactions in the first 50 future blocks + tracing::info!("waiting for mempool to verify some transactions..."); + zebrad.expect_stdout_line_matches("sending mempool transaction broadcast")?; + + // Wait for more transactions to verify, `GetMempoolTx` only returns txs where tx.HasShieldedElements() + // + tokio::time::sleep(std::time::Duration::from_secs(2)).await; + sleep_until_lwd_last_mempool_refresh.await; + + tracing::info!("calling GetMempoolTx gRPC to fetch transactions..."); + let mut transactions_stream = rpc_client + .get_mempool_tx(Exclude { txid: vec![] }) + .await? + .into_inner(); + + // Sometimes lightwalletd doesn't check the mempool, and waits for the next block instead. + // If that happens, we skip the rest of the test. + tracing::info!("checking if lightwalletd has queried the mempool..."); + + // We need a short timeout here, because sometimes this message is not logged. + zebrad = zebrad.with_timeout(Duration::from_secs(60)); + let tx_log = + zebrad.expect_stdout_line_matches("answered mempool request .*req.*=.*TransactionIds"); + // Reset the failed timeout and give the rest of the test enough time to finish. + #[allow(unused_assignments)] + { + zebrad = zebrad.with_timeout(LARGE_CHECKPOINT_TIMEOUT); + } + + if tx_log.is_err() { + tracing::info!( + "lightwalletd didn't query the mempool, skipping mempool contents checks" + ); + return Ok(()); + } + + tracing::info!("checking the mempool contains some of the sent transactions..."); + let mut counter = 0; + while let Some(tx) = transactions_stream.message().await? { + let hash: [u8; 32] = tx.hash.clone().try_into().expect("hash is correct length"); + let hash = transaction::Hash::from_bytes_in_display_order(&hash); + + assert!( + transaction_hashes.contains(&hash), + "unexpected transaction {hash:?}\n\ in isolated mempool: {tx:?}", - ); + ); - counter += 1; - } + counter += 1; + } - // GetMempoolTx: make sure at least one of the transactions were inserted into the mempool. - // - // TODO: Update `load_transactions_from_future_blocks()` to return block height offsets and, - // only check if a transaction from the first block has shielded elements - assert!( + // GetMempoolTx: make sure at least one of the transactions were inserted into the mempool. + // + // TODO: Update `load_transactions_from_future_blocks()` to return block height offsets and, + // only check if a transaction from the first block has shielded elements + assert!( !has_tx_with_shielded_elements || counter >= 1, "failed to read v4+ transactions with shielded elements from future blocks in mempool via lightwalletd" ); - // TODO: GetMempoolStream: make sure at least one of the transactions were inserted into the mempool. - tracing::info!("calling GetMempoolStream gRPC to fetch transactions..."); - let mut transaction_stream = rpc_client.get_mempool_stream(Empty {}).await?.into_inner(); + // TODO: GetMempoolStream: make sure at least one of the transactions were inserted into the mempool. + tracing::info!("calling GetMempoolStream gRPC to fetch transactions..."); + let mut transaction_stream = rpc_client.get_mempool_stream(Empty {}).await?.into_inner(); + + let mut _counter = 0; + while let Some(_tx) = transaction_stream.message().await? { + // TODO: check tx.data or tx.height here? + _counter += 1; + } - let mut _counter = 0; - while let Some(_tx) = transaction_stream.message().await? { - // TODO: check tx.data or tx.height here? - _counter += 1; + zebrad_rpc_client.submit_block(block).await?; } Ok(()) From 0ecf779d63c43cbb50dcb2b5a154799ef344f91e Mon Sep 17 00:00:00 2001 From: Arya Date: Fri, 22 Nov 2024 14:32:37 -0500 Subject: [PATCH 02/16] fixes new bug in send_transaction_test --- zebrad/tests/common/lightwalletd/send_transaction_test.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/zebrad/tests/common/lightwalletd/send_transaction_test.rs b/zebrad/tests/common/lightwalletd/send_transaction_test.rs index 70c912dd28c..5ad08ede06d 100644 --- a/zebrad/tests/common/lightwalletd/send_transaction_test.rs +++ b/zebrad/tests/common/lightwalletd/send_transaction_test.rs @@ -185,7 +185,11 @@ pub async fn run() -> Result<()> { let sleep_until_lwd_last_mempool_refresh = tokio::time::sleep(std::time::Duration::from_secs(4)); - let transactions = &block.transactions; + let transactions: Vec<_> = block + .transactions + .iter() + .filter(|tx| !tx.is_coinbase()) + .collect(); let transaction_hashes: Vec = transactions.iter().map(|tx| tx.hash()).collect(); From cecbb65bd3c069ad3eea4910e3ed1f84c7b65bc7 Mon Sep 17 00:00:00 2001 From: Arya Date: Fri, 22 Nov 2024 14:41:12 -0500 Subject: [PATCH 03/16] Removes unused `load_transactions_from_future_blocks` and factors out code for sending transactions to its own fn --- .../lightwalletd/send_transaction_test.rs | 246 +++++++++--------- 1 file changed, 120 insertions(+), 126 deletions(-) diff --git a/zebrad/tests/common/lightwalletd/send_transaction_test.rs b/zebrad/tests/common/lightwalletd/send_transaction_test.rs index 5ad08ede06d..c329f8a292e 100644 --- a/zebrad/tests/common/lightwalletd/send_transaction_test.rs +++ b/zebrad/tests/common/lightwalletd/send_transaction_test.rs @@ -22,12 +22,13 @@ use color_eyre::eyre::Result; use zebra_chain::{ block::Block, - parameters::Network::{self, *}, + parameters::Network::*, serialization::ZcashSerialize, transaction::{self, Transaction}, }; use zebra_node_services::rpc_client::RpcRequestClient; use zebra_rpc::queue::CHANNEL_AND_QUEUE_CAPACITY; +use zebra_test::prelude::TestChild; use zebrad::components::mempool::downloads::MAX_INBOUND_CONCURRENCY; use crate::common::{ @@ -36,11 +37,14 @@ use crate::common::{ lightwalletd::{ can_spawn_lightwalletd_for_rpc, spawn_lightwalletd_for_rpc, sync::wait_for_zebrad_and_lightwalletd_sync, - wallet_grpc::{self, connect_to_lightwalletd, Empty, Exclude}, + wallet_grpc::{ + self, compact_tx_streamer_client::CompactTxStreamerClient, connect_to_lightwalletd, + Empty, Exclude, + }, }, regtest::MiningRpcMethods, sync::LARGE_CHECKPOINT_TIMEOUT, - test_type::TestType::{self, *}, + test_type::TestType::*, }; /// The maximum number of transactions we want to send in the test. @@ -178,150 +182,140 @@ pub async fn run() -> Result<()> { .into_inner(); for block in blocks { - // Lightwalletd won't call `get_raw_mempool` again until 2 seconds after the last call: - // - // - // So we need to wait much longer than that here. - let sleep_until_lwd_last_mempool_refresh = - tokio::time::sleep(std::time::Duration::from_secs(4)); - - let transactions: Vec<_> = block - .transactions - .iter() - .filter(|tx| !tx.is_coinbase()) - .collect(); + zebrad = send_transactions_from_block(zebrad, &mut rpc_client, &zebrad_rpc_client, block) + .await?; + } - let transaction_hashes: Vec = - transactions.iter().map(|tx| tx.hash()).collect(); + Ok(()) +} - tracing::info!( - transaction_count = ?transactions.len(), - ?transaction_hashes, - "connected gRPC client to lightwalletd, sending transactions...", - ); +/// Sends non-coinbase transactions from a block to the mempool, verifies that the transactions +/// can be found in the mempool via lightwalletd, and commits the block to Zebra's chainstate. +/// +/// Returns the zebrad test child that's handling the RPC requests. +#[tracing::instrument] +async fn send_transactions_from_block( + mut zebrad: TestChild, + rpc_client: &mut CompactTxStreamerClient, + zebrad_rpc_client: &RpcRequestClient, + block: Block, +) -> Result> { + // Lightwalletd won't call `get_raw_mempool` again until 2 seconds after the last call: + // + // + // So we need to wait much longer than that here. + let sleep_until_lwd_last_mempool_refresh = + tokio::time::sleep(std::time::Duration::from_secs(4)); + + let transactions: Vec<_> = block + .transactions + .iter() + .filter(|tx| !tx.is_coinbase()) + .collect(); - let mut has_tx_with_shielded_elements = false; - for transaction in transactions { - let transaction_hash = transaction.hash(); + let transaction_hashes: Vec = + transactions.iter().map(|tx| tx.hash()).collect(); - // See - has_tx_with_shielded_elements |= transaction.version() >= 4 - && (transaction.has_shielded_inputs() || transaction.has_shielded_outputs()); + tracing::info!( + transaction_count = ?transactions.len(), + ?transaction_hashes, + "connected gRPC client to lightwalletd, sending transactions...", + ); - let expected_response = wallet_grpc::SendResponse { - error_code: 0, - error_message: format!("\"{transaction_hash}\""), - }; + let mut has_tx_with_shielded_elements = false; + for transaction in transactions { + let transaction_hash = transaction.hash(); - tracing::info!(?transaction_hash, "sending transaction..."); + // See + has_tx_with_shielded_elements |= transaction.version() >= 4 + && (transaction.has_shielded_inputs() || transaction.has_shielded_outputs()); - let request = prepare_send_transaction_request(transaction.clone()); + let expected_response = wallet_grpc::SendResponse { + error_code: 0, + error_message: format!("\"{transaction_hash}\""), + }; - let response = rpc_client.send_transaction(request).await?.into_inner(); + tracing::info!(?transaction_hash, "sending transaction..."); - assert_eq!(response, expected_response); - } + let request = prepare_send_transaction_request(transaction.clone()); - // Check if some transaction is sent to mempool, - // Fails if there are only coinbase transactions in the first 50 future blocks - tracing::info!("waiting for mempool to verify some transactions..."); - zebrad.expect_stdout_line_matches("sending mempool transaction broadcast")?; + let response = rpc_client.send_transaction(request).await?.into_inner(); - // Wait for more transactions to verify, `GetMempoolTx` only returns txs where tx.HasShieldedElements() - // - tokio::time::sleep(std::time::Duration::from_secs(2)).await; - sleep_until_lwd_last_mempool_refresh.await; + assert_eq!(response, expected_response); + } - tracing::info!("calling GetMempoolTx gRPC to fetch transactions..."); - let mut transactions_stream = rpc_client - .get_mempool_tx(Exclude { txid: vec![] }) - .await? - .into_inner(); - - // Sometimes lightwalletd doesn't check the mempool, and waits for the next block instead. - // If that happens, we skip the rest of the test. - tracing::info!("checking if lightwalletd has queried the mempool..."); - - // We need a short timeout here, because sometimes this message is not logged. - zebrad = zebrad.with_timeout(Duration::from_secs(60)); - let tx_log = - zebrad.expect_stdout_line_matches("answered mempool request .*req.*=.*TransactionIds"); - // Reset the failed timeout and give the rest of the test enough time to finish. - #[allow(unused_assignments)] - { - zebrad = zebrad.with_timeout(LARGE_CHECKPOINT_TIMEOUT); - } - - if tx_log.is_err() { - tracing::info!( - "lightwalletd didn't query the mempool, skipping mempool contents checks" - ); - return Ok(()); - } - - tracing::info!("checking the mempool contains some of the sent transactions..."); - let mut counter = 0; - while let Some(tx) = transactions_stream.message().await? { - let hash: [u8; 32] = tx.hash.clone().try_into().expect("hash is correct length"); - let hash = transaction::Hash::from_bytes_in_display_order(&hash); - - assert!( - transaction_hashes.contains(&hash), - "unexpected transaction {hash:?}\n\ - in isolated mempool: {tx:?}", - ); + // Check if some transaction is sent to mempool, + // Fails if there are only coinbase transactions in the first 50 future blocks + tracing::info!("waiting for mempool to verify some transactions..."); + zebrad.expect_stdout_line_matches("sending mempool transaction broadcast")?; - counter += 1; - } + // Wait for more transactions to verify, `GetMempoolTx` only returns txs where tx.HasShieldedElements() + // + tokio::time::sleep(std::time::Duration::from_secs(2)).await; + sleep_until_lwd_last_mempool_refresh.await; - // GetMempoolTx: make sure at least one of the transactions were inserted into the mempool. - // - // TODO: Update `load_transactions_from_future_blocks()` to return block height offsets and, - // only check if a transaction from the first block has shielded elements - assert!( - !has_tx_with_shielded_elements || counter >= 1, - "failed to read v4+ transactions with shielded elements from future blocks in mempool via lightwalletd" - ); + tracing::info!("calling GetMempoolTx gRPC to fetch transactions..."); + let mut transactions_stream = rpc_client + .get_mempool_tx(Exclude { txid: vec![] }) + .await? + .into_inner(); - // TODO: GetMempoolStream: make sure at least one of the transactions were inserted into the mempool. - tracing::info!("calling GetMempoolStream gRPC to fetch transactions..."); - let mut transaction_stream = rpc_client.get_mempool_stream(Empty {}).await?.into_inner(); + // Sometimes lightwalletd doesn't check the mempool, and waits for the next block instead. + // If that happens, we skip the rest of the test. + tracing::info!("checking if lightwalletd has queried the mempool..."); + + // We need a short timeout here, because sometimes this message is not logged. + zebrad = zebrad.with_timeout(Duration::from_secs(60)); + let tx_log = + zebrad.expect_stdout_line_matches("answered mempool request .*req.*=.*TransactionIds"); + // Reset the failed timeout and give the rest of the test enough time to finish. + #[allow(unused_assignments)] + { + zebrad = zebrad.with_timeout(LARGE_CHECKPOINT_TIMEOUT); + } - let mut _counter = 0; - while let Some(_tx) = transaction_stream.message().await? { - // TODO: check tx.data or tx.height here? - _counter += 1; - } + if tx_log.is_err() { + tracing::info!("lightwalletd didn't query the mempool, skipping mempool contents checks"); + return Ok(zebrad); + } + + tracing::info!("checking the mempool contains some of the sent transactions..."); + let mut counter = 0; + while let Some(tx) = transactions_stream.message().await? { + let hash: [u8; 32] = tx.hash.clone().try_into().expect("hash is correct length"); + let hash = transaction::Hash::from_bytes_in_display_order(&hash); + + assert!( + transaction_hashes.contains(&hash), + "unexpected transaction {hash:?}\n\ + in isolated mempool: {tx:?}", + ); - zebrad_rpc_client.submit_block(block).await?; + counter += 1; } - Ok(()) -} + // GetMempoolTx: make sure at least one of the transactions were inserted into the mempool. + // + // TODO: Update `load_transactions_from_future_blocks()` to return block height offsets and, + // only check if a transaction from the first block has shielded elements + assert!( + !has_tx_with_shielded_elements || counter >= 1, + "failed to read v4+ transactions with shielded elements from future blocks in mempool via lightwalletd" + ); -/// Loads transactions from a few block(s) after the chain tip of the cached state. -/// -/// Returns a list of non-coinbase transactions from blocks that have not been finalized to disk -/// in the `ZEBRA_CACHED_STATE_DIR`. -/// -/// ## Panics -/// -/// If the provided `test_type` doesn't need an rpc server and cached state -#[tracing::instrument] -async fn load_transactions_from_future_blocks( - network: Network, - test_type: TestType, - test_name: &str, -) -> Result>> { - let transactions = get_future_blocks(&network, test_type, test_name, MAX_NUM_FUTURE_BLOCKS) - .await? - .into_iter() - .flat_map(|block| block.transactions) - .filter(|transaction| !transaction.is_coinbase()) - .take(max_sent_transactions()) - .collect(); + // TODO: GetMempoolStream: make sure at least one of the transactions were inserted into the mempool. + tracing::info!("calling GetMempoolStream gRPC to fetch transactions..."); + let mut transaction_stream = rpc_client.get_mempool_stream(Empty {}).await?.into_inner(); + + let mut _counter = 0; + while let Some(_tx) = transaction_stream.message().await? { + // TODO: check tx.data or tx.height here? + _counter += 1; + } + + zebrad_rpc_client.submit_block(block).await?; - Ok(transactions) + Ok(zebrad) } /// Prepare a request to send to lightwalletd that contains a transaction to be sent. From aa5ca26d518c04e13bdf0af569542b4234ee63ed Mon Sep 17 00:00:00 2001 From: Arya Date: Fri, 22 Nov 2024 14:42:32 -0500 Subject: [PATCH 04/16] corrects tx count updates to exclude coinbase txs --- zebrad/tests/common/lightwalletd/send_transaction_test.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zebrad/tests/common/lightwalletd/send_transaction_test.rs b/zebrad/tests/common/lightwalletd/send_transaction_test.rs index c329f8a292e..9d65929c2e9 100644 --- a/zebrad/tests/common/lightwalletd/send_transaction_test.rs +++ b/zebrad/tests/common/lightwalletd/send_transaction_test.rs @@ -98,7 +98,7 @@ pub async fn run() -> Result<()> { .await? .into_iter() .take_while(|block| { - count += block.transactions.len(); + count += block.transactions.len() - 1; count <= max_sent_transactions() }) .collect(); From 26fab551bb25030c27ccd766b22db462556772a1 Mon Sep 17 00:00:00 2001 From: Arya Date: Fri, 22 Nov 2024 14:43:54 -0500 Subject: [PATCH 05/16] fixes formatting --- zebrad/tests/common/lightwalletd/send_transaction_test.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/zebrad/tests/common/lightwalletd/send_transaction_test.rs b/zebrad/tests/common/lightwalletd/send_transaction_test.rs index 9d65929c2e9..c48949c159d 100644 --- a/zebrad/tests/common/lightwalletd/send_transaction_test.rs +++ b/zebrad/tests/common/lightwalletd/send_transaction_test.rs @@ -299,9 +299,10 @@ async fn send_transactions_from_block( // TODO: Update `load_transactions_from_future_blocks()` to return block height offsets and, // only check if a transaction from the first block has shielded elements assert!( - !has_tx_with_shielded_elements || counter >= 1, - "failed to read v4+ transactions with shielded elements from future blocks in mempool via lightwalletd" - ); + !has_tx_with_shielded_elements || counter >= 1, + "failed to read v4+ transactions with shielded elements \ + from future blocks in mempool via lightwalletd" + ); // TODO: GetMempoolStream: make sure at least one of the transactions were inserted into the mempool. tracing::info!("calling GetMempoolStream gRPC to fetch transactions..."); From 8bf1e64713b77dd5a40496122c7f2f67a9b3188b Mon Sep 17 00:00:00 2001 From: ar Date: Tue, 26 Nov 2024 14:22:07 -0500 Subject: [PATCH 06/16] Calls zebra's sendrawtransaction method if lwd's send_transaction() return an error for more detailed error info --- .../lightwalletd/send_transaction_test.rs | 72 ++++++++++++++----- 1 file changed, 53 insertions(+), 19 deletions(-) diff --git a/zebrad/tests/common/lightwalletd/send_transaction_test.rs b/zebrad/tests/common/lightwalletd/send_transaction_test.rs index c48949c159d..1ade56b7b7e 100644 --- a/zebrad/tests/common/lightwalletd/send_transaction_test.rs +++ b/zebrad/tests/common/lightwalletd/send_transaction_test.rs @@ -17,8 +17,9 @@ //! already been seen in a block. use std::{cmp::min, sync::Arc, time::Duration}; +use tower::BoxError; -use color_eyre::eyre::Result; +use color_eyre::eyre::{eyre, Result}; use zebra_chain::{ block::Block, @@ -181,11 +182,29 @@ pub async fn run() -> Result<()> { .await? .into_inner(); + let mut has_tx_with_shielded_elements = false; + let mut counter = 0; + for block in blocks { - zebrad = send_transactions_from_block(zebrad, &mut rpc_client, &zebrad_rpc_client, block) - .await?; + let (zebrad_child, has_shielded_elements, count) = + send_transactions_from_block(zebrad, &mut rpc_client, &zebrad_rpc_client, block) + .await?; + + zebrad = zebrad_child; + has_tx_with_shielded_elements |= has_shielded_elements; + counter += count; } + // GetMempoolTx: make sure at least one of the transactions were inserted into the mempool. + // + // TODO: Update `load_transactions_from_future_blocks()` to return block height offsets and, + // only check if a transaction from the first block has shielded elements + assert!( + !has_tx_with_shielded_elements || counter >= 1, + "failed to read v4+ transactions with shielded elements \ + from future blocks in mempool via lightwalletd" + ); + Ok(()) } @@ -199,7 +218,7 @@ async fn send_transactions_from_block( rpc_client: &mut CompactTxStreamerClient, zebrad_rpc_client: &RpcRequestClient, block: Block, -) -> Result> { +) -> Result<(TestChild, bool, usize)> { // Lightwalletd won't call `get_raw_mempool` again until 2 seconds after the last call: // // @@ -239,9 +258,16 @@ async fn send_transactions_from_block( let request = prepare_send_transaction_request(transaction.clone()); - let response = rpc_client.send_transaction(request).await?.into_inner(); - - assert_eq!(response, expected_response); + match rpc_client.send_transaction(request).await { + Ok(response) => assert_eq!(response.into_inner(), expected_response), + Err(err) => { + tracing::warn!(?err, "failed to send transaction"); + zebrad_rpc_client + .send_transaction(transaction) + .await + .map_err(|e| eyre!(e))?; + } + }; } // Check if some transaction is sent to mempool, @@ -276,7 +302,7 @@ async fn send_transactions_from_block( if tx_log.is_err() { tracing::info!("lightwalletd didn't query the mempool, skipping mempool contents checks"); - return Ok(zebrad); + return Ok((zebrad, has_tx_with_shielded_elements, 0)); } tracing::info!("checking the mempool contains some of the sent transactions..."); @@ -294,16 +320,6 @@ async fn send_transactions_from_block( counter += 1; } - // GetMempoolTx: make sure at least one of the transactions were inserted into the mempool. - // - // TODO: Update `load_transactions_from_future_blocks()` to return block height offsets and, - // only check if a transaction from the first block has shielded elements - assert!( - !has_tx_with_shielded_elements || counter >= 1, - "failed to read v4+ transactions with shielded elements \ - from future blocks in mempool via lightwalletd" - ); - // TODO: GetMempoolStream: make sure at least one of the transactions were inserted into the mempool. tracing::info!("calling GetMempoolStream gRPC to fetch transactions..."); let mut transaction_stream = rpc_client.get_mempool_stream(Empty {}).await?.into_inner(); @@ -316,7 +332,7 @@ async fn send_transactions_from_block( zebrad_rpc_client.submit_block(block).await?; - Ok(zebrad) + Ok((zebrad, has_tx_with_shielded_elements, counter)) } /// Prepare a request to send to lightwalletd that contains a transaction to be sent. @@ -328,3 +344,21 @@ fn prepare_send_transaction_request(transaction: Arc) -> wallet_grp height: 0, } } + +trait SendTransactionMethod { + async fn send_transaction( + &self, + transaction: &Arc, + ) -> Result; +} + +impl SendTransactionMethod for RpcRequestClient { + async fn send_transaction( + &self, + transaction: &Arc, + ) -> Result { + let tx_data = hex::encode(transaction.zcash_serialize_to_vec()?); + self.json_result_from_call("sendrawtransaction", format!(r#"["{tx_data}"]"#)) + .await + } +} From 44679293b5f8f1d725ae277811bf0d495ed5c795 Mon Sep 17 00:00:00 2001 From: Arya Date: Tue, 26 Nov 2024 17:09:48 -0500 Subject: [PATCH 07/16] removes instrument --- zebrad/tests/common/lightwalletd/send_transaction_test.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/zebrad/tests/common/lightwalletd/send_transaction_test.rs b/zebrad/tests/common/lightwalletd/send_transaction_test.rs index 1ade56b7b7e..2de3dc43bf1 100644 --- a/zebrad/tests/common/lightwalletd/send_transaction_test.rs +++ b/zebrad/tests/common/lightwalletd/send_transaction_test.rs @@ -212,7 +212,6 @@ pub async fn run() -> Result<()> { /// can be found in the mempool via lightwalletd, and commits the block to Zebra's chainstate. /// /// Returns the zebrad test child that's handling the RPC requests. -#[tracing::instrument] async fn send_transactions_from_block( mut zebrad: TestChild, rpc_client: &mut CompactTxStreamerClient, From 55bcd27deafadd43df295029ba1d701491e9a4b3 Mon Sep 17 00:00:00 2001 From: Arya Date: Tue, 26 Nov 2024 17:23:08 -0500 Subject: [PATCH 08/16] avoids panic when a future block has only a coinbase transaction --- .../lightwalletd/send_transaction_test.rs | 20 +++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/zebrad/tests/common/lightwalletd/send_transaction_test.rs b/zebrad/tests/common/lightwalletd/send_transaction_test.rs index 2de3dc43bf1..ca4bc4f9677 100644 --- a/zebrad/tests/common/lightwalletd/send_transaction_test.rs +++ b/zebrad/tests/common/lightwalletd/send_transaction_test.rs @@ -186,13 +186,19 @@ pub async fn run() -> Result<()> { let mut counter = 0; for block in blocks { - let (zebrad_child, has_shielded_elements, count) = - send_transactions_from_block(zebrad, &mut rpc_client, &zebrad_rpc_client, block) - .await?; + let (zebrad_child, has_shielded_elements, count) = send_transactions_from_block( + zebrad, + &mut rpc_client, + &zebrad_rpc_client, + block.clone(), + ) + .await?; zebrad = zebrad_child; has_tx_with_shielded_elements |= has_shielded_elements; counter += count; + + zebrad_rpc_client.submit_block(block).await?; } // GetMempoolTx: make sure at least one of the transactions were inserted into the mempool. @@ -231,6 +237,10 @@ async fn send_transactions_from_block( .filter(|tx| !tx.is_coinbase()) .collect(); + if !transactions.is_empty() { + return Ok((zebrad, false, 0)); + } + let transaction_hashes: Vec = transactions.iter().map(|tx| tx.hash()).collect(); @@ -241,7 +251,7 @@ async fn send_transactions_from_block( ); let mut has_tx_with_shielded_elements = false; - for transaction in transactions { + for &transaction in &transactions { let transaction_hash = transaction.hash(); // See @@ -329,8 +339,6 @@ async fn send_transactions_from_block( _counter += 1; } - zebrad_rpc_client.submit_block(block).await?; - Ok((zebrad, has_tx_with_shielded_elements, counter)) } From 65982f7926746762c3bb611b6e2c9b111431a309 Mon Sep 17 00:00:00 2001 From: Arya Date: Thu, 28 Nov 2024 14:34:15 -0500 Subject: [PATCH 09/16] fixes check for gossip log (only happens when 10 txs have been added --- .../tests/common/lightwalletd/send_transaction_test.rs | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/zebrad/tests/common/lightwalletd/send_transaction_test.rs b/zebrad/tests/common/lightwalletd/send_transaction_test.rs index ca4bc4f9677..68a7ca6daf9 100644 --- a/zebrad/tests/common/lightwalletd/send_transaction_test.rs +++ b/zebrad/tests/common/lightwalletd/send_transaction_test.rs @@ -279,10 +279,12 @@ async fn send_transactions_from_block( }; } - // Check if some transaction is sent to mempool, - // Fails if there are only coinbase transactions in the first 50 future blocks - tracing::info!("waiting for mempool to verify some transactions..."); - zebrad.expect_stdout_line_matches("sending mempool transaction broadcast")?; + if transactions.len() >= 10 { + // Check if some transaction is sent to mempool, + // Fails if there are only coinbase transactions in the first 50 future blocks + tracing::info!("waiting for mempool to verify some transactions..."); + zebrad.expect_stdout_line_matches("sending mempool transaction broadcast")?; + } // Wait for more transactions to verify, `GetMempoolTx` only returns txs where tx.HasShieldedElements() // From 07fa64e1432826beb3f359e3a42cd1dbbd2c727d Mon Sep 17 00:00:00 2001 From: Arya Date: Fri, 29 Nov 2024 07:05:19 -0500 Subject: [PATCH 10/16] fixes a concurrency bug, adds more detailed errors. --- zebrad/src/components/mempool/downloads.rs | 2 +- .../lightwalletd/send_transaction_test.rs | 21 +++++++++++++++++++ zebrad/tests/common/lightwalletd/sync.rs | 2 +- 3 files changed, 23 insertions(+), 2 deletions(-) diff --git a/zebrad/src/components/mempool/downloads.rs b/zebrad/src/components/mempool/downloads.rs index 45fd44a7c05..bfe6cab8570 100644 --- a/zebrad/src/components/mempool/downloads.rs +++ b/zebrad/src/components/mempool/downloads.rs @@ -121,7 +121,7 @@ pub enum TransactionDownloadVerifyError { #[error("transaction download / verification was cancelled")] Cancelled, - #[error("transaction did not pass consensus validation")] + #[error("transaction did not pass consensus validation, error: {0}")] Invalid(#[from] zebra_consensus::error::TransactionError), } diff --git a/zebrad/tests/common/lightwalletd/send_transaction_test.rs b/zebrad/tests/common/lightwalletd/send_transaction_test.rs index 68a7ca6daf9..a3addeb2f5f 100644 --- a/zebrad/tests/common/lightwalletd/send_transaction_test.rs +++ b/zebrad/tests/common/lightwalletd/send_transaction_test.rs @@ -167,6 +167,7 @@ pub async fn run() -> Result<()> { ?lightwalletd_rpc_port, "connecting gRPC client to lightwalletd...", ); + zebrad.wait_for_stdout_line(None); let mut rpc_client = connect_to_lightwalletd(lightwalletd_rpc_port).await?; @@ -181,11 +182,14 @@ pub async fn run() -> Result<()> { .get_mempool_tx(Exclude { txid: vec![] }) .await? .into_inner(); + zebrad.wait_for_stdout_line(None); let mut has_tx_with_shielded_elements = false; let mut counter = 0; for block in blocks { + zebrad.wait_for_stdout_line(None); + let (zebrad_child, has_shielded_elements, count) = send_transactions_from_block( zebrad, &mut rpc_client, @@ -195,6 +199,8 @@ pub async fn run() -> Result<()> { .await?; zebrad = zebrad_child; + zebrad.wait_for_stdout_line(None); + has_tx_with_shielded_elements |= has_shielded_elements; counter += count; @@ -218,6 +224,8 @@ pub async fn run() -> Result<()> { /// can be found in the mempool via lightwalletd, and commits the block to Zebra's chainstate. /// /// Returns the zebrad test child that's handling the RPC requests. + +#[tracing::instrument(skip_all)] async fn send_transactions_from_block( mut zebrad: TestChild, rpc_client: &mut CompactTxStreamerClient, @@ -264,6 +272,8 @@ async fn send_transactions_from_block( }; tracing::info!(?transaction_hash, "sending transaction..."); + // TODO: This may consume expected lines, try increasing the tracing buffer size instead if a full stdout pipe is causing panics. + zebrad.wait_for_stdout_line(None); let request = prepare_send_transaction_request(transaction.clone()); @@ -284,18 +294,24 @@ async fn send_transactions_from_block( // Fails if there are only coinbase transactions in the first 50 future blocks tracing::info!("waiting for mempool to verify some transactions..."); zebrad.expect_stdout_line_matches("sending mempool transaction broadcast")?; + } else { + zebrad.wait_for_stdout_line(None); } // Wait for more transactions to verify, `GetMempoolTx` only returns txs where tx.HasShieldedElements() // tokio::time::sleep(std::time::Duration::from_secs(2)).await; + zebrad.wait_for_stdout_line(None); + sleep_until_lwd_last_mempool_refresh.await; + zebrad.wait_for_stdout_line(None); tracing::info!("calling GetMempoolTx gRPC to fetch transactions..."); let mut transactions_stream = rpc_client .get_mempool_tx(Exclude { txid: vec![] }) .await? .into_inner(); + zebrad.wait_for_stdout_line(None); // Sometimes lightwalletd doesn't check the mempool, and waits for the next block instead. // If that happens, we skip the rest of the test. @@ -319,6 +335,8 @@ async fn send_transactions_from_block( tracing::info!("checking the mempool contains some of the sent transactions..."); let mut counter = 0; while let Some(tx) = transactions_stream.message().await? { + zebrad.wait_for_stdout_line(None); + let hash: [u8; 32] = tx.hash.clone().try_into().expect("hash is correct length"); let hash = transaction::Hash::from_bytes_in_display_order(&hash); @@ -330,15 +348,18 @@ async fn send_transactions_from_block( counter += 1; } + zebrad.wait_for_stdout_line(None); // TODO: GetMempoolStream: make sure at least one of the transactions were inserted into the mempool. tracing::info!("calling GetMempoolStream gRPC to fetch transactions..."); let mut transaction_stream = rpc_client.get_mempool_stream(Empty {}).await?.into_inner(); + zebrad.wait_for_stdout_line(None); let mut _counter = 0; while let Some(_tx) = transaction_stream.message().await? { // TODO: check tx.data or tx.height here? _counter += 1; + zebrad.wait_for_stdout_line(None); } Ok((zebrad, has_tx_with_shielded_elements, counter)) diff --git a/zebrad/tests/common/lightwalletd/sync.rs b/zebrad/tests/common/lightwalletd/sync.rs index 8dce05a9150..3a55aedb5c2 100644 --- a/zebrad/tests/common/lightwalletd/sync.rs +++ b/zebrad/tests/common/lightwalletd/sync.rs @@ -32,7 +32,7 @@ pub fn wait_for_zebrad_and_lightwalletd_sync< wait_for_zebrad_mempool: bool, wait_for_zebrad_tip: bool, ) -> Result<(TestChild, TestChild

)> { - let is_zebrad_finished = AtomicBool::new(false); + let is_zebrad_finished = AtomicBool::new(!wait_for_zebrad_tip); let is_lightwalletd_finished = AtomicBool::new(false); let is_zebrad_finished = &is_zebrad_finished; From f624891597d109af2783cd6f4420327691e1a675 Mon Sep 17 00:00:00 2001 From: ar Date: Fri, 29 Nov 2024 16:15:13 -0500 Subject: [PATCH 11/16] removes unnecessary wait_for_stdout calls and fixes condition for early return --- zebrad/src/components/mempool/downloads.rs | 2 +- .../lightwalletd/send_transaction_test.rs | 24 +------------------ 2 files changed, 2 insertions(+), 24 deletions(-) diff --git a/zebrad/src/components/mempool/downloads.rs b/zebrad/src/components/mempool/downloads.rs index bfe6cab8570..35b6cf876e3 100644 --- a/zebrad/src/components/mempool/downloads.rs +++ b/zebrad/src/components/mempool/downloads.rs @@ -121,7 +121,7 @@ pub enum TransactionDownloadVerifyError { #[error("transaction download / verification was cancelled")] Cancelled, - #[error("transaction did not pass consensus validation, error: {0}")] + #[error("transaction did not pass consensus validation, error: {0:?}")] Invalid(#[from] zebra_consensus::error::TransactionError), } diff --git a/zebrad/tests/common/lightwalletd/send_transaction_test.rs b/zebrad/tests/common/lightwalletd/send_transaction_test.rs index a3addeb2f5f..e5ed18ea1b7 100644 --- a/zebrad/tests/common/lightwalletd/send_transaction_test.rs +++ b/zebrad/tests/common/lightwalletd/send_transaction_test.rs @@ -167,7 +167,6 @@ pub async fn run() -> Result<()> { ?lightwalletd_rpc_port, "connecting gRPC client to lightwalletd...", ); - zebrad.wait_for_stdout_line(None); let mut rpc_client = connect_to_lightwalletd(lightwalletd_rpc_port).await?; @@ -182,14 +181,11 @@ pub async fn run() -> Result<()> { .get_mempool_tx(Exclude { txid: vec![] }) .await? .into_inner(); - zebrad.wait_for_stdout_line(None); let mut has_tx_with_shielded_elements = false; let mut counter = 0; for block in blocks { - zebrad.wait_for_stdout_line(None); - let (zebrad_child, has_shielded_elements, count) = send_transactions_from_block( zebrad, &mut rpc_client, @@ -199,7 +195,6 @@ pub async fn run() -> Result<()> { .await?; zebrad = zebrad_child; - zebrad.wait_for_stdout_line(None); has_tx_with_shielded_elements |= has_shielded_elements; counter += count; @@ -208,9 +203,6 @@ pub async fn run() -> Result<()> { } // GetMempoolTx: make sure at least one of the transactions were inserted into the mempool. - // - // TODO: Update `load_transactions_from_future_blocks()` to return block height offsets and, - // only check if a transaction from the first block has shielded elements assert!( !has_tx_with_shielded_elements || counter >= 1, "failed to read v4+ transactions with shielded elements \ @@ -245,7 +237,7 @@ async fn send_transactions_from_block( .filter(|tx| !tx.is_coinbase()) .collect(); - if !transactions.is_empty() { + if transactions.is_empty() { return Ok((zebrad, false, 0)); } @@ -272,8 +264,6 @@ async fn send_transactions_from_block( }; tracing::info!(?transaction_hash, "sending transaction..."); - // TODO: This may consume expected lines, try increasing the tracing buffer size instead if a full stdout pipe is causing panics. - zebrad.wait_for_stdout_line(None); let request = prepare_send_transaction_request(transaction.clone()); @@ -294,24 +284,17 @@ async fn send_transactions_from_block( // Fails if there are only coinbase transactions in the first 50 future blocks tracing::info!("waiting for mempool to verify some transactions..."); zebrad.expect_stdout_line_matches("sending mempool transaction broadcast")?; - } else { - zebrad.wait_for_stdout_line(None); } // Wait for more transactions to verify, `GetMempoolTx` only returns txs where tx.HasShieldedElements() // - tokio::time::sleep(std::time::Duration::from_secs(2)).await; - zebrad.wait_for_stdout_line(None); - sleep_until_lwd_last_mempool_refresh.await; - zebrad.wait_for_stdout_line(None); tracing::info!("calling GetMempoolTx gRPC to fetch transactions..."); let mut transactions_stream = rpc_client .get_mempool_tx(Exclude { txid: vec![] }) .await? .into_inner(); - zebrad.wait_for_stdout_line(None); // Sometimes lightwalletd doesn't check the mempool, and waits for the next block instead. // If that happens, we skip the rest of the test. @@ -335,8 +318,6 @@ async fn send_transactions_from_block( tracing::info!("checking the mempool contains some of the sent transactions..."); let mut counter = 0; while let Some(tx) = transactions_stream.message().await? { - zebrad.wait_for_stdout_line(None); - let hash: [u8; 32] = tx.hash.clone().try_into().expect("hash is correct length"); let hash = transaction::Hash::from_bytes_in_display_order(&hash); @@ -348,18 +329,15 @@ async fn send_transactions_from_block( counter += 1; } - zebrad.wait_for_stdout_line(None); // TODO: GetMempoolStream: make sure at least one of the transactions were inserted into the mempool. tracing::info!("calling GetMempoolStream gRPC to fetch transactions..."); let mut transaction_stream = rpc_client.get_mempool_stream(Empty {}).await?.into_inner(); - zebrad.wait_for_stdout_line(None); let mut _counter = 0; while let Some(_tx) = transaction_stream.message().await? { // TODO: check tx.data or tx.height here? _counter += 1; - zebrad.wait_for_stdout_line(None); } Ok((zebrad, has_tx_with_shielded_elements, counter)) From bf263d619a649780cbd53bf63a6ac7e769ac7ed6 Mon Sep 17 00:00:00 2001 From: Arya Date: Mon, 9 Dec 2024 14:08:40 -0500 Subject: [PATCH 12/16] Fixes issue around missing stdout line --- .../lightwalletd/send_transaction_test.rs | 65 ++++++------------- 1 file changed, 19 insertions(+), 46 deletions(-) diff --git a/zebrad/tests/common/lightwalletd/send_transaction_test.rs b/zebrad/tests/common/lightwalletd/send_transaction_test.rs index e5ed18ea1b7..d9ec51dee34 100644 --- a/zebrad/tests/common/lightwalletd/send_transaction_test.rs +++ b/zebrad/tests/common/lightwalletd/send_transaction_test.rs @@ -16,7 +16,7 @@ //! were obtained. This is to ensure that zebra does not reject the transactions because they have //! already been seen in a block. -use std::{cmp::min, sync::Arc, time::Duration}; +use std::{cmp::min, sync::Arc}; use tower::BoxError; use color_eyre::eyre::{eyre, Result}; @@ -29,7 +29,6 @@ use zebra_chain::{ }; use zebra_node_services::rpc_client::RpcRequestClient; use zebra_rpc::queue::CHANNEL_AND_QUEUE_CAPACITY; -use zebra_test::prelude::TestChild; use zebrad::components::mempool::downloads::MAX_INBOUND_CONCURRENCY; use crate::common::{ @@ -44,7 +43,6 @@ use crate::common::{ }, }, regtest::MiningRpcMethods, - sync::LARGE_CHECKPOINT_TIMEOUT, test_type::TestType::*, }; @@ -152,7 +150,7 @@ pub async fn run() -> Result<()> { "spawned lightwalletd connected to zebrad, waiting for them both to sync...", ); - let (_lightwalletd, mut zebrad) = wait_for_zebrad_and_lightwalletd_sync( + let (_lightwalletd, _zebrad) = wait_for_zebrad_and_lightwalletd_sync( lightwalletd, lightwalletd_rpc_port, zebrad, @@ -186,20 +184,20 @@ pub async fn run() -> Result<()> { let mut counter = 0; for block in blocks { - let (zebrad_child, has_shielded_elements, count) = send_transactions_from_block( - zebrad, - &mut rpc_client, - &zebrad_rpc_client, - block.clone(), - ) - .await?; - - zebrad = zebrad_child; + let (has_shielded_elements, count) = + send_transactions_from_block(&mut rpc_client, &zebrad_rpc_client, block.clone()) + .await?; has_tx_with_shielded_elements |= has_shielded_elements; counter += count; - zebrad_rpc_client.submit_block(block).await?; + tracing::info!( + height = ?block.coinbase_height(), + "submitting block at height" + ); + + let submit_block_response = zebrad_rpc_client.submit_block(block).await; + tracing::info!(?submit_block_response, "submitted block"); } // GetMempoolTx: make sure at least one of the transactions were inserted into the mempool. @@ -219,11 +217,10 @@ pub async fn run() -> Result<()> { #[tracing::instrument(skip_all)] async fn send_transactions_from_block( - mut zebrad: TestChild, rpc_client: &mut CompactTxStreamerClient, zebrad_rpc_client: &RpcRequestClient, block: Block, -) -> Result<(TestChild, bool, usize)> { +) -> Result<(bool, usize)> { // Lightwalletd won't call `get_raw_mempool` again until 2 seconds after the last call: // // @@ -238,7 +235,7 @@ async fn send_transactions_from_block( .collect(); if transactions.is_empty() { - return Ok((zebrad, false, 0)); + return Ok((false, 0)); } let transaction_hashes: Vec = @@ -271,21 +268,16 @@ async fn send_transactions_from_block( Ok(response) => assert_eq!(response.into_inner(), expected_response), Err(err) => { tracing::warn!(?err, "failed to send transaction"); - zebrad_rpc_client + let send_tx_rsp = zebrad_rpc_client .send_transaction(transaction) .await - .map_err(|e| eyre!(e))?; + .map_err(|e| eyre!(e)); + + tracing::warn!(?send_tx_rsp, "failed to send tx twice"); } }; } - if transactions.len() >= 10 { - // Check if some transaction is sent to mempool, - // Fails if there are only coinbase transactions in the first 50 future blocks - tracing::info!("waiting for mempool to verify some transactions..."); - zebrad.expect_stdout_line_matches("sending mempool transaction broadcast")?; - } - // Wait for more transactions to verify, `GetMempoolTx` only returns txs where tx.HasShieldedElements() // sleep_until_lwd_last_mempool_refresh.await; @@ -296,25 +288,6 @@ async fn send_transactions_from_block( .await? .into_inner(); - // Sometimes lightwalletd doesn't check the mempool, and waits for the next block instead. - // If that happens, we skip the rest of the test. - tracing::info!("checking if lightwalletd has queried the mempool..."); - - // We need a short timeout here, because sometimes this message is not logged. - zebrad = zebrad.with_timeout(Duration::from_secs(60)); - let tx_log = - zebrad.expect_stdout_line_matches("answered mempool request .*req.*=.*TransactionIds"); - // Reset the failed timeout and give the rest of the test enough time to finish. - #[allow(unused_assignments)] - { - zebrad = zebrad.with_timeout(LARGE_CHECKPOINT_TIMEOUT); - } - - if tx_log.is_err() { - tracing::info!("lightwalletd didn't query the mempool, skipping mempool contents checks"); - return Ok((zebrad, has_tx_with_shielded_elements, 0)); - } - tracing::info!("checking the mempool contains some of the sent transactions..."); let mut counter = 0; while let Some(tx) = transactions_stream.message().await? { @@ -340,7 +313,7 @@ async fn send_transactions_from_block( _counter += 1; } - Ok((zebrad, has_tx_with_shielded_elements, counter)) + Ok((has_tx_with_shielded_elements, counter)) } /// Prepare a request to send to lightwalletd that contains a transaction to be sent. From 06d163b410b4ead14c89d65f781eba0923b0d619 Mon Sep 17 00:00:00 2001 From: Arya Date: Mon, 9 Dec 2024 18:14:14 -0500 Subject: [PATCH 13/16] Fixes bug around expected tx ids and removes outdated TODO --- .../lightwalletd/send_transaction_test.rs | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/zebrad/tests/common/lightwalletd/send_transaction_test.rs b/zebrad/tests/common/lightwalletd/send_transaction_test.rs index d9ec51dee34..6ac031e491b 100644 --- a/zebrad/tests/common/lightwalletd/send_transaction_test.rs +++ b/zebrad/tests/common/lightwalletd/send_transaction_test.rs @@ -16,7 +16,7 @@ //! were obtained. This is to ensure that zebra does not reject the transactions because they have //! already been seen in a block. -use std::{cmp::min, sync::Arc}; +use std::{cmp::min, collections::HashSet, sync::Arc}; use tower::BoxError; use color_eyre::eyre::{eyre, Result}; @@ -180,13 +180,18 @@ pub async fn run() -> Result<()> { .await? .into_inner(); + let mut transaction_hashes = HashSet::new(); let mut has_tx_with_shielded_elements = false; let mut counter = 0; for block in blocks { - let (has_shielded_elements, count) = - send_transactions_from_block(&mut rpc_client, &zebrad_rpc_client, block.clone()) - .await?; + let (has_shielded_elements, count) = send_transactions_from_block( + &mut rpc_client, + &zebrad_rpc_client, + block.clone(), + &mut transaction_hashes, + ) + .await?; has_tx_with_shielded_elements |= has_shielded_elements; counter += count; @@ -220,6 +225,7 @@ async fn send_transactions_from_block( rpc_client: &mut CompactTxStreamerClient, zebrad_rpc_client: &RpcRequestClient, block: Block, + transaction_hashes: &mut HashSet, ) -> Result<(bool, usize)> { // Lightwalletd won't call `get_raw_mempool` again until 2 seconds after the last call: // @@ -238,8 +244,7 @@ async fn send_transactions_from_block( return Ok((false, 0)); } - let transaction_hashes: Vec = - transactions.iter().map(|tx| tx.hash()).collect(); + transaction_hashes.extend(transactions.iter().map(|tx| tx.hash())); tracing::info!( transaction_count = ?transactions.len(), @@ -303,7 +308,6 @@ async fn send_transactions_from_block( counter += 1; } - // TODO: GetMempoolStream: make sure at least one of the transactions were inserted into the mempool. tracing::info!("calling GetMempoolStream gRPC to fetch transactions..."); let mut transaction_stream = rpc_client.get_mempool_stream(Empty {}).await?.into_inner(); From 5cd91550b95aaa8f0387fc235f292477aa586a4e Mon Sep 17 00:00:00 2001 From: Arya Date: Mon, 9 Dec 2024 18:51:46 -0500 Subject: [PATCH 14/16] Fixes issue with expected ZF funding stream address balance in post-NU6 chains --- .../common/lightwalletd/wallet_grpc_test.rs | 111 +++++++++--------- 1 file changed, 56 insertions(+), 55 deletions(-) diff --git a/zebrad/tests/common/lightwalletd/wallet_grpc_test.rs b/zebrad/tests/common/lightwalletd/wallet_grpc_test.rs index 702bb740142..a6eafd6b2c1 100644 --- a/zebrad/tests/common/lightwalletd/wallet_grpc_test.rs +++ b/zebrad/tests/common/lightwalletd/wallet_grpc_test.rs @@ -37,11 +37,14 @@ use color_eyre::eyre::Result; use hex_literal::hex; use zebra_chain::{ - block::Block, - parameters::Network, - parameters::NetworkUpgrade::{Nu5, Sapling}, + block::{Block, Height}, + parameters::{ + Network, + NetworkUpgrade::{Nu5, Sapling}, + }, serialization::ZcashDeserializeInto, }; +use zebra_consensus::funding_stream_address; use zebra_state::state_database_format_version_in_code; use crate::common::{ @@ -291,60 +294,58 @@ pub async fn run() -> Result<()> { // For the provided address in the first 10 blocks there are 10 transactions in the mainnet assert_eq!(10, counter); - // Call `GetTaddressBalance` with the ZF funding stream address - let balance = rpc_client - .get_taddress_balance(AddressList { - addresses: vec!["t3dvVE3SQEi7kqNzwrfNePxZ1d4hUyztBA1".to_string()], - }) - .await? - .into_inner(); - - // With ZFND or Major Grants funding stream address, the balance will always be greater than zero, - // because new coins are created in each block - assert!(balance.value_zat > 0); - - // Call `GetTaddressBalanceStream` with the ZF funding stream address as a stream argument - let zf_stream_address = Address { - address: "t3dvVE3SQEi7kqNzwrfNePxZ1d4hUyztBA1".to_string(), - }; - - let balance_zf = rpc_client - .get_taddress_balance_stream(tokio_stream::iter(vec![zf_stream_address.clone()])) - .await? - .into_inner(); - - // With ZFND funding stream address, the balance will always be greater than zero, - // because new coins are created in each block - assert!(balance_zf.value_zat > 0); - - // Call `GetTaddressBalanceStream` with the MG funding stream address as a stream argument - let mg_stream_address = Address { - address: "t3XyYW8yBFRuMnfvm5KLGFbEVz25kckZXym".to_string(), - }; - - let balance_mg = rpc_client - .get_taddress_balance_stream(tokio_stream::iter(vec![mg_stream_address.clone()])) - .await? - .into_inner(); - - // With Major Grants funding stream address, the balance will always be greater than zero, - // because new coins are created in each block - assert!(balance_mg.value_zat > 0); + let lwd_tip_height: Height = u32::try_from(block_tip.height) + .expect("should be below max block height") + .try_into() + .expect("should be below max block height"); + + let mut all_stream_addresses = Vec::new(); + let mut all_balance_streams = Vec::new(); + for &fs_receiver in network.funding_streams(lwd_tip_height).recipients().keys() { + let Some(fs_address) = funding_stream_address(lwd_tip_height, &network, fs_receiver) else { + // Skip if the lightwalletd tip height is above the funding stream end height. + continue; + }; + + tracing::info!(?fs_address, "getting balance for active fs address"); + + // Call `GetTaddressBalance` with the active funding stream address. + let balance = rpc_client + .get_taddress_balance(AddressList { + addresses: vec![fs_address.to_string()], + }) + .await? + .into_inner(); + + // Call `GetTaddressBalanceStream` with the active funding stream address as a stream argument. + let stream_address = Address { + address: fs_address.to_string(), + }; + + let balance_stream = rpc_client + .get_taddress_balance_stream(tokio_stream::iter(vec![stream_address.clone()])) + .await? + .into_inner(); + + // With any active funding stream address, the balance will always be greater than zero for blocks + // below the funding stream end height because new coins are created in each block. + assert!(balance.value_zat > 0); + assert!(balance_stream.value_zat > 0); + + all_stream_addresses.push(stream_address); + all_balance_streams.push(balance_stream.value_zat); + } - // Call `GetTaddressBalanceStream` with both, the ZFND and the MG funding stream addresses as a stream argument - let balance_both = rpc_client - .get_taddress_balance_stream(tokio_stream::iter(vec![ - zf_stream_address, - mg_stream_address, - ])) - .await? - .into_inner(); + if let Some(expected_total_balance) = all_balance_streams.into_iter().reduce(|a, b| a + b) { + // Call `GetTaddressBalanceStream` for all active funding stream addresses as a stream argument. + let total_balance = rpc_client + .get_taddress_balance_stream(tokio_stream::iter(all_stream_addresses)) + .await? + .into_inner(); - // The result is the sum of the values in both addresses - assert_eq!( - balance_both.value_zat, - balance_zf.value_zat + balance_mg.value_zat - ); + // The result should be the sum of the values in all active funding stream addresses. + assert_eq!(total_balance.value_zat, expected_total_balance); + } let sapling_treestate_init_height = sapling_activation_height + 1; From d150018ffc2724f57ddff1bca661ce6eecfaef72 Mon Sep 17 00:00:00 2001 From: Arya Date: Mon, 9 Dec 2024 23:17:40 -0500 Subject: [PATCH 15/16] fixes the rest of wallet_grpc_test --- .../common/lightwalletd/wallet_grpc_test.rs | 62 +++++++++---------- 1 file changed, 31 insertions(+), 31 deletions(-) diff --git a/zebrad/tests/common/lightwalletd/wallet_grpc_test.rs b/zebrad/tests/common/lightwalletd/wallet_grpc_test.rs index a6eafd6b2c1..a26ada3f9c3 100644 --- a/zebrad/tests/common/lightwalletd/wallet_grpc_test.rs +++ b/zebrad/tests/common/lightwalletd/wallet_grpc_test.rs @@ -334,6 +334,37 @@ pub async fn run() -> Result<()> { all_stream_addresses.push(stream_address); all_balance_streams.push(balance_stream.value_zat); + + // Call `GetAddressUtxos` with the active funding stream address that will always have utxos + let utxos = rpc_client + .get_address_utxos(GetAddressUtxosArg { + addresses: vec![fs_address.to_string()], + start_height: 1, + max_entries: 1, + }) + .await? + .into_inner(); + + // As we requested one entry we should get a response of length 1 + assert_eq!(utxos.address_utxos.len(), 1); + + // Call `GetAddressUtxosStream` with the active funding stream address that will always have utxos + let mut utxos_zf = rpc_client + .get_address_utxos_stream(GetAddressUtxosArg { + addresses: vec![fs_address.to_string()], + start_height: 1, + max_entries: 2, + }) + .await? + .into_inner(); + + let mut counter = 0; + while let Some(_utxos) = utxos_zf.message().await? { + counter += 1; + } + // As we are in a "in sync" chain we know there are more than 2 utxos for this address (coinbase maturity rule) + // but we will receive the max of 2 from the stream response because we used a limit of 2 `max_entries`. + assert_eq!(2, counter); } if let Some(expected_total_balance) = all_balance_streams.into_iter().reduce(|a, b| a + b) { @@ -375,37 +406,6 @@ pub async fn run() -> Result<()> { *zebra_test::vectors::SAPLING_TREESTATE_MAINNET_419201_STRING ); - // Call `GetAddressUtxos` with the ZF funding stream address that will always have utxos - let utxos = rpc_client - .get_address_utxos(GetAddressUtxosArg { - addresses: vec!["t3dvVE3SQEi7kqNzwrfNePxZ1d4hUyztBA1".to_string()], - start_height: 1, - max_entries: 1, - }) - .await? - .into_inner(); - - // As we requested one entry we should get a response of length 1 - assert_eq!(utxos.address_utxos.len(), 1); - - // Call `GetAddressUtxosStream` with the ZF funding stream address that will always have utxos - let mut utxos_zf = rpc_client - .get_address_utxos_stream(GetAddressUtxosArg { - addresses: vec!["t3dvVE3SQEi7kqNzwrfNePxZ1d4hUyztBA1".to_string()], - start_height: 1, - max_entries: 2, - }) - .await? - .into_inner(); - - let mut counter = 0; - while let Some(_utxos) = utxos_zf.message().await? { - counter += 1; - } - // As we are in a "in sync" chain we know there are more than 2 utxos for this address - // but we will receive the max of 2 from the stream response because we used a limit of 2 `max_entries`. - assert_eq!(2, counter); - // Call `GetLightdInfo` let lightd_info = rpc_client.get_lightd_info(Empty {}).await?.into_inner(); From 6d2c66169d254865b8c26064a50ec69b58f965f4 Mon Sep 17 00:00:00 2001 From: Arya Date: Tue, 10 Dec 2024 20:26:41 -0500 Subject: [PATCH 16/16] Update zebrad/src/components/mempool/downloads.rs Co-authored-by: Conrado Gouvea --- zebrad/src/components/mempool/downloads.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zebrad/src/components/mempool/downloads.rs b/zebrad/src/components/mempool/downloads.rs index 37876c44ace..68d29aadcaf 100644 --- a/zebrad/src/components/mempool/downloads.rs +++ b/zebrad/src/components/mempool/downloads.rs @@ -124,7 +124,7 @@ pub enum TransactionDownloadVerifyError { #[error("transaction download / verification was cancelled")] Cancelled, - #[error("transaction did not pass consensus validation, error: {0:?}")] + #[error("transaction did not pass consensus validation: {0}")] Invalid(#[from] zebra_consensus::error::TransactionError), }