diff --git a/zebrad/tests/common/lightwalletd/send_transaction_test.rs b/zebrad/tests/common/lightwalletd/send_transaction_test.rs index bee6cf78356..6ac031e491b 100644 --- a/zebrad/tests/common/lightwalletd/send_transaction_test.rs +++ b/zebrad/tests/common/lightwalletd/send_transaction_test.rs @@ -16,15 +16,18 @@ //! were obtained. This is to ensure that zebra does not reject the transactions because they have //! already been seen in a block. -use std::{cmp::min, sync::Arc, time::Duration}; +use std::{cmp::min, collections::HashSet, sync::Arc}; +use tower::BoxError; -use color_eyre::eyre::Result; +use color_eyre::eyre::{eyre, Result}; use zebra_chain::{ - parameters::Network::{self, *}, + block::Block, + parameters::Network::*, serialization::ZcashSerialize, transaction::{self, Transaction}, }; +use zebra_node_services::rpc_client::RpcRequestClient; use zebra_rpc::queue::CHANNEL_AND_QUEUE_CAPACITY; use zebrad::components::mempool::downloads::MAX_INBOUND_CONCURRENCY; @@ -34,10 +37,13 @@ use crate::common::{ lightwalletd::{ can_spawn_lightwalletd_for_rpc, spawn_lightwalletd_for_rpc, sync::wait_for_zebrad_and_lightwalletd_sync, - wallet_grpc::{self, connect_to_lightwalletd, Empty, Exclude}, + wallet_grpc::{ + self, compact_tx_streamer_client::CompactTxStreamerClient, connect_to_lightwalletd, + Empty, Exclude, + }, }, - sync::LARGE_CHECKPOINT_TIMEOUT, - test_type::TestType::{self, *}, + regtest::MiningRpcMethods, + test_type::TestType::*, }; /// The maximum number of transactions we want to send in the test. @@ -85,11 +91,19 @@ pub async fn run() -> Result<()> { "running gRPC send transaction test using lightwalletd & zebrad", ); - let transactions = - load_transactions_from_future_blocks(network.clone(), test_type, test_name).await?; + let mut count = 0; + let blocks: Vec = + get_future_blocks(&network, test_type, test_name, MAX_NUM_FUTURE_BLOCKS) + .await? + .into_iter() + .take_while(|block| { + count += block.transactions.len() - 1; + count <= max_sent_transactions() + }) + .collect(); tracing::info!( - transaction_count = ?transactions.len(), + blocks_count = ?blocks.len(), partial_sync_path = ?zebrad_state_path, "got transactions to send, spawning isolated zebrad...", ); @@ -113,6 +127,8 @@ pub async fn run() -> Result<()> { let zebra_rpc_address = zebra_rpc_address.expect("lightwalletd test must have RPC port"); + let zebrad_rpc_client = RpcRequestClient::new(zebra_rpc_address); + tracing::info!( ?test_type, ?zebra_rpc_address, @@ -134,7 +150,7 @@ pub async fn run() -> Result<()> { "spawned lightwalletd connected to zebrad, waiting for them both to sync...", ); - let (_lightwalletd, mut zebrad) = wait_for_zebrad_and_lightwalletd_sync( + let (_lightwalletd, _zebrad) = wait_for_zebrad_and_lightwalletd_sync( lightwalletd, lightwalletd_rpc_port, zebrad, @@ -164,6 +180,53 @@ pub async fn run() -> Result<()> { .await? .into_inner(); + let mut transaction_hashes = HashSet::new(); + let mut has_tx_with_shielded_elements = false; + let mut counter = 0; + + for block in blocks { + let (has_shielded_elements, count) = send_transactions_from_block( + &mut rpc_client, + &zebrad_rpc_client, + block.clone(), + &mut transaction_hashes, + ) + .await?; + + has_tx_with_shielded_elements |= has_shielded_elements; + counter += count; + + tracing::info!( + height = ?block.coinbase_height(), + "submitting block at height" + ); + + let submit_block_response = zebrad_rpc_client.submit_block(block).await; + tracing::info!(?submit_block_response, "submitted block"); + } + + // GetMempoolTx: make sure at least one of the transactions were inserted into the mempool. + assert!( + !has_tx_with_shielded_elements || counter >= 1, + "failed to read v4+ transactions with shielded elements \ + from future blocks in mempool via lightwalletd" + ); + + Ok(()) +} + +/// Sends non-coinbase transactions from a block to the mempool, verifies that the transactions +/// can be found in the mempool via lightwalletd, and commits the block to Zebra's chainstate. +/// +/// Returns the zebrad test child that's handling the RPC requests. + +#[tracing::instrument(skip_all)] +async fn send_transactions_from_block( + rpc_client: &mut CompactTxStreamerClient, + zebrad_rpc_client: &RpcRequestClient, + block: Block, + transaction_hashes: &mut HashSet, +) -> Result<(bool, usize)> { // Lightwalletd won't call `get_raw_mempool` again until 2 seconds after the last call: // // @@ -171,8 +234,17 @@ pub async fn run() -> Result<()> { let sleep_until_lwd_last_mempool_refresh = tokio::time::sleep(std::time::Duration::from_secs(4)); - let transaction_hashes: Vec = - transactions.iter().map(|tx| tx.hash()).collect(); + let transactions: Vec<_> = block + .transactions + .iter() + .filter(|tx| !tx.is_coinbase()) + .collect(); + + if transactions.is_empty() { + return Ok((false, 0)); + } + + transaction_hashes.extend(transactions.iter().map(|tx| tx.hash())); tracing::info!( transaction_count = ?transactions.len(), @@ -181,7 +253,7 @@ pub async fn run() -> Result<()> { ); let mut has_tx_with_shielded_elements = false; - for transaction in transactions { + for &transaction in &transactions { let transaction_hash = transaction.hash(); // See @@ -195,20 +267,24 @@ pub async fn run() -> Result<()> { tracing::info!(?transaction_hash, "sending transaction..."); - let request = prepare_send_transaction_request(transaction); + let request = prepare_send_transaction_request(transaction.clone()); - let response = rpc_client.send_transaction(request).await?.into_inner(); + match rpc_client.send_transaction(request).await { + Ok(response) => assert_eq!(response.into_inner(), expected_response), + Err(err) => { + tracing::warn!(?err, "failed to send transaction"); + let send_tx_rsp = zebrad_rpc_client + .send_transaction(transaction) + .await + .map_err(|e| eyre!(e)); - assert_eq!(response, expected_response); + tracing::warn!(?send_tx_rsp, "failed to send tx twice"); + } + }; } - // Check if some transaction is sent to mempool, - // Fails if there are only coinbase transactions in the first 50 future blocks - tracing::info!("waiting for mempool to verify some transactions..."); - zebrad.expect_stdout_line_matches("sending mempool transaction broadcast")?; // Wait for more transactions to verify, `GetMempoolTx` only returns txs where tx.HasShieldedElements() // - tokio::time::sleep(std::time::Duration::from_secs(2)).await; sleep_until_lwd_last_mempool_refresh.await; tracing::info!("calling GetMempoolTx gRPC to fetch transactions..."); @@ -217,25 +293,6 @@ pub async fn run() -> Result<()> { .await? .into_inner(); - // Sometimes lightwalletd doesn't check the mempool, and waits for the next block instead. - // If that happens, we skip the rest of the test. - tracing::info!("checking if lightwalletd has queried the mempool..."); - - // We need a short timeout here, because sometimes this message is not logged. - zebrad = zebrad.with_timeout(Duration::from_secs(60)); - let tx_log = - zebrad.expect_stdout_line_matches("answered mempool request .*req.*=.*TransactionIds"); - // Reset the failed timeout and give the rest of the test enough time to finish. - #[allow(unused_assignments)] - { - zebrad = zebrad.with_timeout(LARGE_CHECKPOINT_TIMEOUT); - } - - if tx_log.is_err() { - tracing::info!("lightwalletd didn't query the mempool, skipping mempool contents checks"); - return Ok(()); - } - tracing::info!("checking the mempool contains some of the sent transactions..."); let mut counter = 0; while let Some(tx) = transactions_stream.message().await? { @@ -251,16 +308,6 @@ pub async fn run() -> Result<()> { counter += 1; } - // GetMempoolTx: make sure at least one of the transactions were inserted into the mempool. - // - // TODO: Update `load_transactions_from_future_blocks()` to return block height offsets and, - // only check if a transaction from the first block has shielded elements - assert!( - !has_tx_with_shielded_elements || counter >= 1, - "failed to read v4+ transactions with shielded elements from future blocks in mempool via lightwalletd" - ); - - // TODO: GetMempoolStream: make sure at least one of the transactions were inserted into the mempool. tracing::info!("calling GetMempoolStream gRPC to fetch transactions..."); let mut transaction_stream = rpc_client.get_mempool_stream(Empty {}).await?.into_inner(); @@ -270,32 +317,7 @@ pub async fn run() -> Result<()> { _counter += 1; } - Ok(()) -} - -/// Loads transactions from a few block(s) after the chain tip of the cached state. -/// -/// Returns a list of non-coinbase transactions from blocks that have not been finalized to disk -/// in the `ZEBRA_CACHED_STATE_DIR`. -/// -/// ## Panics -/// -/// If the provided `test_type` doesn't need an rpc server and cached state -#[tracing::instrument] -async fn load_transactions_from_future_blocks( - network: Network, - test_type: TestType, - test_name: &str, -) -> Result>> { - let transactions = get_future_blocks(&network, test_type, test_name, MAX_NUM_FUTURE_BLOCKS) - .await? - .into_iter() - .flat_map(|block| block.transactions) - .filter(|transaction| !transaction.is_coinbase()) - .take(max_sent_transactions()) - .collect(); - - Ok(transactions) + Ok((has_tx_with_shielded_elements, counter)) } /// Prepare a request to send to lightwalletd that contains a transaction to be sent. @@ -307,3 +329,21 @@ fn prepare_send_transaction_request(transaction: Arc) -> wallet_grp height: 0, } } + +trait SendTransactionMethod { + async fn send_transaction( + &self, + transaction: &Arc, + ) -> Result; +} + +impl SendTransactionMethod for RpcRequestClient { + async fn send_transaction( + &self, + transaction: &Arc, + ) -> Result { + let tx_data = hex::encode(transaction.zcash_serialize_to_vec()?); + self.json_result_from_call("sendrawtransaction", format!(r#"["{tx_data}"]"#)) + .await + } +} diff --git a/zebrad/tests/common/lightwalletd/sync.rs b/zebrad/tests/common/lightwalletd/sync.rs index 8dce05a9150..3a55aedb5c2 100644 --- a/zebrad/tests/common/lightwalletd/sync.rs +++ b/zebrad/tests/common/lightwalletd/sync.rs @@ -32,7 +32,7 @@ pub fn wait_for_zebrad_and_lightwalletd_sync< wait_for_zebrad_mempool: bool, wait_for_zebrad_tip: bool, ) -> Result<(TestChild, TestChild

)> { - let is_zebrad_finished = AtomicBool::new(false); + let is_zebrad_finished = AtomicBool::new(!wait_for_zebrad_tip); let is_lightwalletd_finished = AtomicBool::new(false); let is_zebrad_finished = &is_zebrad_finished; diff --git a/zebrad/tests/common/lightwalletd/wallet_grpc_test.rs b/zebrad/tests/common/lightwalletd/wallet_grpc_test.rs index 702bb740142..a26ada3f9c3 100644 --- a/zebrad/tests/common/lightwalletd/wallet_grpc_test.rs +++ b/zebrad/tests/common/lightwalletd/wallet_grpc_test.rs @@ -37,11 +37,14 @@ use color_eyre::eyre::Result; use hex_literal::hex; use zebra_chain::{ - block::Block, - parameters::Network, - parameters::NetworkUpgrade::{Nu5, Sapling}, + block::{Block, Height}, + parameters::{ + Network, + NetworkUpgrade::{Nu5, Sapling}, + }, serialization::ZcashDeserializeInto, }; +use zebra_consensus::funding_stream_address; use zebra_state::state_database_format_version_in_code; use crate::common::{ @@ -291,60 +294,89 @@ pub async fn run() -> Result<()> { // For the provided address in the first 10 blocks there are 10 transactions in the mainnet assert_eq!(10, counter); - // Call `GetTaddressBalance` with the ZF funding stream address - let balance = rpc_client - .get_taddress_balance(AddressList { - addresses: vec!["t3dvVE3SQEi7kqNzwrfNePxZ1d4hUyztBA1".to_string()], - }) - .await? - .into_inner(); - - // With ZFND or Major Grants funding stream address, the balance will always be greater than zero, - // because new coins are created in each block - assert!(balance.value_zat > 0); - - // Call `GetTaddressBalanceStream` with the ZF funding stream address as a stream argument - let zf_stream_address = Address { - address: "t3dvVE3SQEi7kqNzwrfNePxZ1d4hUyztBA1".to_string(), - }; - - let balance_zf = rpc_client - .get_taddress_balance_stream(tokio_stream::iter(vec![zf_stream_address.clone()])) - .await? - .into_inner(); - - // With ZFND funding stream address, the balance will always be greater than zero, - // because new coins are created in each block - assert!(balance_zf.value_zat > 0); - - // Call `GetTaddressBalanceStream` with the MG funding stream address as a stream argument - let mg_stream_address = Address { - address: "t3XyYW8yBFRuMnfvm5KLGFbEVz25kckZXym".to_string(), - }; - - let balance_mg = rpc_client - .get_taddress_balance_stream(tokio_stream::iter(vec![mg_stream_address.clone()])) - .await? - .into_inner(); + let lwd_tip_height: Height = u32::try_from(block_tip.height) + .expect("should be below max block height") + .try_into() + .expect("should be below max block height"); + + let mut all_stream_addresses = Vec::new(); + let mut all_balance_streams = Vec::new(); + for &fs_receiver in network.funding_streams(lwd_tip_height).recipients().keys() { + let Some(fs_address) = funding_stream_address(lwd_tip_height, &network, fs_receiver) else { + // Skip if the lightwalletd tip height is above the funding stream end height. + continue; + }; + + tracing::info!(?fs_address, "getting balance for active fs address"); + + // Call `GetTaddressBalance` with the active funding stream address. + let balance = rpc_client + .get_taddress_balance(AddressList { + addresses: vec![fs_address.to_string()], + }) + .await? + .into_inner(); + + // Call `GetTaddressBalanceStream` with the active funding stream address as a stream argument. + let stream_address = Address { + address: fs_address.to_string(), + }; + + let balance_stream = rpc_client + .get_taddress_balance_stream(tokio_stream::iter(vec![stream_address.clone()])) + .await? + .into_inner(); + + // With any active funding stream address, the balance will always be greater than zero for blocks + // below the funding stream end height because new coins are created in each block. + assert!(balance.value_zat > 0); + assert!(balance_stream.value_zat > 0); + + all_stream_addresses.push(stream_address); + all_balance_streams.push(balance_stream.value_zat); + + // Call `GetAddressUtxos` with the active funding stream address that will always have utxos + let utxos = rpc_client + .get_address_utxos(GetAddressUtxosArg { + addresses: vec![fs_address.to_string()], + start_height: 1, + max_entries: 1, + }) + .await? + .into_inner(); + + // As we requested one entry we should get a response of length 1 + assert_eq!(utxos.address_utxos.len(), 1); + + // Call `GetAddressUtxosStream` with the active funding stream address that will always have utxos + let mut utxos_zf = rpc_client + .get_address_utxos_stream(GetAddressUtxosArg { + addresses: vec![fs_address.to_string()], + start_height: 1, + max_entries: 2, + }) + .await? + .into_inner(); + + let mut counter = 0; + while let Some(_utxos) = utxos_zf.message().await? { + counter += 1; + } + // As we are in a "in sync" chain we know there are more than 2 utxos for this address (coinbase maturity rule) + // but we will receive the max of 2 from the stream response because we used a limit of 2 `max_entries`. + assert_eq!(2, counter); + } - // With Major Grants funding stream address, the balance will always be greater than zero, - // because new coins are created in each block - assert!(balance_mg.value_zat > 0); + if let Some(expected_total_balance) = all_balance_streams.into_iter().reduce(|a, b| a + b) { + // Call `GetTaddressBalanceStream` for all active funding stream addresses as a stream argument. + let total_balance = rpc_client + .get_taddress_balance_stream(tokio_stream::iter(all_stream_addresses)) + .await? + .into_inner(); - // Call `GetTaddressBalanceStream` with both, the ZFND and the MG funding stream addresses as a stream argument - let balance_both = rpc_client - .get_taddress_balance_stream(tokio_stream::iter(vec![ - zf_stream_address, - mg_stream_address, - ])) - .await? - .into_inner(); - - // The result is the sum of the values in both addresses - assert_eq!( - balance_both.value_zat, - balance_zf.value_zat + balance_mg.value_zat - ); + // The result should be the sum of the values in all active funding stream addresses. + assert_eq!(total_balance.value_zat, expected_total_balance); + } let sapling_treestate_init_height = sapling_activation_height + 1; @@ -374,37 +406,6 @@ pub async fn run() -> Result<()> { *zebra_test::vectors::SAPLING_TREESTATE_MAINNET_419201_STRING ); - // Call `GetAddressUtxos` with the ZF funding stream address that will always have utxos - let utxos = rpc_client - .get_address_utxos(GetAddressUtxosArg { - addresses: vec!["t3dvVE3SQEi7kqNzwrfNePxZ1d4hUyztBA1".to_string()], - start_height: 1, - max_entries: 1, - }) - .await? - .into_inner(); - - // As we requested one entry we should get a response of length 1 - assert_eq!(utxos.address_utxos.len(), 1); - - // Call `GetAddressUtxosStream` with the ZF funding stream address that will always have utxos - let mut utxos_zf = rpc_client - .get_address_utxos_stream(GetAddressUtxosArg { - addresses: vec!["t3dvVE3SQEi7kqNzwrfNePxZ1d4hUyztBA1".to_string()], - start_height: 1, - max_entries: 2, - }) - .await? - .into_inner(); - - let mut counter = 0; - while let Some(_utxos) = utxos_zf.message().await? { - counter += 1; - } - // As we are in a "in sync" chain we know there are more than 2 utxos for this address - // but we will receive the max of 2 from the stream response because we used a limit of 2 `max_entries`. - assert_eq!(2, counter); - // Call `GetLightdInfo` let lightd_info = rpc_client.get_lightd_info(Empty {}).await?.into_inner();