Skip to content

Commit

Permalink
Fixes bug in send transaction test
Browse files Browse the repository at this point in the history
  • Loading branch information
arya2 committed Nov 22, 2024
1 parent 3983428 commit 4234291
Showing 1 changed file with 117 additions and 95 deletions.
212 changes: 117 additions & 95 deletions zebrad/tests/common/lightwalletd/send_transaction_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@ use std::{cmp::min, sync::Arc, time::Duration};
use color_eyre::eyre::Result;

use zebra_chain::{
block::Block,
parameters::Network::{self, *},
serialization::ZcashSerialize,
transaction::{self, Transaction},
};
use zebra_node_services::rpc_client::RpcRequestClient;
use zebra_rpc::queue::CHANNEL_AND_QUEUE_CAPACITY;
use zebrad::components::mempool::downloads::MAX_INBOUND_CONCURRENCY;

Expand All @@ -36,6 +38,7 @@ use crate::common::{
sync::wait_for_zebrad_and_lightwalletd_sync,
wallet_grpc::{self, connect_to_lightwalletd, Empty, Exclude},
},
regtest::MiningRpcMethods,
sync::LARGE_CHECKPOINT_TIMEOUT,
test_type::TestType::{self, *},
};
Expand Down Expand Up @@ -85,11 +88,19 @@ pub async fn run() -> Result<()> {
"running gRPC send transaction test using lightwalletd & zebrad",
);

let transactions =
load_transactions_from_future_blocks(network.clone(), test_type, test_name).await?;
let mut count = 0;
let blocks: Vec<Block> =
get_future_blocks(&network, test_type, test_name, MAX_NUM_FUTURE_BLOCKS)
.await?
.into_iter()
.take_while(|block| {
count += block.transactions.len();
count <= max_sent_transactions()
})
.collect();

tracing::info!(
transaction_count = ?transactions.len(),
blocks_count = ?blocks.len(),
partial_sync_path = ?zebrad_state_path,
"got transactions to send, spawning isolated zebrad...",
);
Expand All @@ -113,6 +124,8 @@ pub async fn run() -> Result<()> {

let zebra_rpc_address = zebra_rpc_address.expect("lightwalletd test must have RPC port");

let zebrad_rpc_client = RpcRequestClient::new(zebra_rpc_address);

tracing::info!(
?test_type,
?zebra_rpc_address,
Expand Down Expand Up @@ -164,110 +177,119 @@ pub async fn run() -> Result<()> {
.await?
.into_inner();

// Lightwalletd won't call `get_raw_mempool` again until 2 seconds after the last call:
// <https://github.com/zcash/lightwalletd/blob/master/frontend/service.go#L482>
//
// So we need to wait much longer than that here.
let sleep_until_lwd_last_mempool_refresh =
tokio::time::sleep(std::time::Duration::from_secs(4));

let transaction_hashes: Vec<transaction::Hash> =
transactions.iter().map(|tx| tx.hash()).collect();

tracing::info!(
transaction_count = ?transactions.len(),
?transaction_hashes,
"connected gRPC client to lightwalletd, sending transactions...",
);

let mut has_tx_with_shielded_elements = false;
for transaction in transactions {
let transaction_hash = transaction.hash();

// See <https://github.com/zcash/lightwalletd/blob/master/parser/transaction.go#L367>
has_tx_with_shielded_elements |= transaction.version() >= 4
&& (transaction.has_shielded_inputs() || transaction.has_shielded_outputs());

let expected_response = wallet_grpc::SendResponse {
error_code: 0,
error_message: format!("\"{transaction_hash}\""),
};

tracing::info!(?transaction_hash, "sending transaction...");

let request = prepare_send_transaction_request(transaction);

let response = rpc_client.send_transaction(request).await?.into_inner();

assert_eq!(response, expected_response);
}

// Check if some transaction is sent to mempool,
// Fails if there are only coinbase transactions in the first 50 future blocks
tracing::info!("waiting for mempool to verify some transactions...");
zebrad.expect_stdout_line_matches("sending mempool transaction broadcast")?;
// Wait for more transactions to verify, `GetMempoolTx` only returns txs where tx.HasShieldedElements()
// <https://github.com/zcash/lightwalletd/blob/master/frontend/service.go#L537>
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
sleep_until_lwd_last_mempool_refresh.await;

tracing::info!("calling GetMempoolTx gRPC to fetch transactions...");
let mut transactions_stream = rpc_client
.get_mempool_tx(Exclude { txid: vec![] })
.await?
.into_inner();
for block in blocks {
// Lightwalletd won't call `get_raw_mempool` again until 2 seconds after the last call:
// <https://github.com/zcash/lightwalletd/blob/master/frontend/service.go#L482>
//
// So we need to wait much longer than that here.
let sleep_until_lwd_last_mempool_refresh =
tokio::time::sleep(std::time::Duration::from_secs(4));

// Sometimes lightwalletd doesn't check the mempool, and waits for the next block instead.
// If that happens, we skip the rest of the test.
tracing::info!("checking if lightwalletd has queried the mempool...");

// We need a short timeout here, because sometimes this message is not logged.
zebrad = zebrad.with_timeout(Duration::from_secs(60));
let tx_log =
zebrad.expect_stdout_line_matches("answered mempool request .*req.*=.*TransactionIds");
// Reset the failed timeout and give the rest of the test enough time to finish.
#[allow(unused_assignments)]
{
zebrad = zebrad.with_timeout(LARGE_CHECKPOINT_TIMEOUT);
}
let transactions = &block.transactions;

if tx_log.is_err() {
tracing::info!("lightwalletd didn't query the mempool, skipping mempool contents checks");
return Ok(());
}
let transaction_hashes: Vec<transaction::Hash> =
transactions.iter().map(|tx| tx.hash()).collect();

tracing::info!("checking the mempool contains some of the sent transactions...");
let mut counter = 0;
while let Some(tx) = transactions_stream.message().await? {
let hash: [u8; 32] = tx.hash.clone().try_into().expect("hash is correct length");
let hash = transaction::Hash::from_bytes_in_display_order(&hash);
tracing::info!(
transaction_count = ?transactions.len(),
?transaction_hashes,
"connected gRPC client to lightwalletd, sending transactions...",
);

assert!(
transaction_hashes.contains(&hash),
"unexpected transaction {hash:?}\n\
let mut has_tx_with_shielded_elements = false;
for transaction in transactions {
let transaction_hash = transaction.hash();

// See <https://github.com/zcash/lightwalletd/blob/master/parser/transaction.go#L367>
has_tx_with_shielded_elements |= transaction.version() >= 4
&& (transaction.has_shielded_inputs() || transaction.has_shielded_outputs());

let expected_response = wallet_grpc::SendResponse {
error_code: 0,
error_message: format!("\"{transaction_hash}\""),
};

tracing::info!(?transaction_hash, "sending transaction...");

let request = prepare_send_transaction_request(transaction.clone());

let response = rpc_client.send_transaction(request).await?.into_inner();

assert_eq!(response, expected_response);
}

// Check if some transaction is sent to mempool,
// Fails if there are only coinbase transactions in the first 50 future blocks
tracing::info!("waiting for mempool to verify some transactions...");
zebrad.expect_stdout_line_matches("sending mempool transaction broadcast")?;

// Wait for more transactions to verify, `GetMempoolTx` only returns txs where tx.HasShieldedElements()
// <https://github.com/zcash/lightwalletd/blob/master/frontend/service.go#L537>
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
sleep_until_lwd_last_mempool_refresh.await;

tracing::info!("calling GetMempoolTx gRPC to fetch transactions...");
let mut transactions_stream = rpc_client
.get_mempool_tx(Exclude { txid: vec![] })
.await?
.into_inner();

// Sometimes lightwalletd doesn't check the mempool, and waits for the next block instead.
// If that happens, we skip the rest of the test.
tracing::info!("checking if lightwalletd has queried the mempool...");

// We need a short timeout here, because sometimes this message is not logged.
zebrad = zebrad.with_timeout(Duration::from_secs(60));
let tx_log =
zebrad.expect_stdout_line_matches("answered mempool request .*req.*=.*TransactionIds");
// Reset the failed timeout and give the rest of the test enough time to finish.
#[allow(unused_assignments)]
{
zebrad = zebrad.with_timeout(LARGE_CHECKPOINT_TIMEOUT);
}

if tx_log.is_err() {
tracing::info!(
"lightwalletd didn't query the mempool, skipping mempool contents checks"
);
return Ok(());
}

tracing::info!("checking the mempool contains some of the sent transactions...");
let mut counter = 0;
while let Some(tx) = transactions_stream.message().await? {
let hash: [u8; 32] = tx.hash.clone().try_into().expect("hash is correct length");
let hash = transaction::Hash::from_bytes_in_display_order(&hash);

assert!(
transaction_hashes.contains(&hash),
"unexpected transaction {hash:?}\n\
in isolated mempool: {tx:?}",
);
);

counter += 1;
}
counter += 1;
}

// GetMempoolTx: make sure at least one of the transactions were inserted into the mempool.
//
// TODO: Update `load_transactions_from_future_blocks()` to return block height offsets and,
// only check if a transaction from the first block has shielded elements
assert!(
// GetMempoolTx: make sure at least one of the transactions were inserted into the mempool.
//
// TODO: Update `load_transactions_from_future_blocks()` to return block height offsets and,
// only check if a transaction from the first block has shielded elements
assert!(
!has_tx_with_shielded_elements || counter >= 1,
"failed to read v4+ transactions with shielded elements from future blocks in mempool via lightwalletd"
);

// TODO: GetMempoolStream: make sure at least one of the transactions were inserted into the mempool.
tracing::info!("calling GetMempoolStream gRPC to fetch transactions...");
let mut transaction_stream = rpc_client.get_mempool_stream(Empty {}).await?.into_inner();
// TODO: GetMempoolStream: make sure at least one of the transactions were inserted into the mempool.
tracing::info!("calling GetMempoolStream gRPC to fetch transactions...");
let mut transaction_stream = rpc_client.get_mempool_stream(Empty {}).await?.into_inner();

let mut _counter = 0;
while let Some(_tx) = transaction_stream.message().await? {
// TODO: check tx.data or tx.height here?
_counter += 1;
}

let mut _counter = 0;
while let Some(_tx) = transaction_stream.message().await? {
// TODO: check tx.data or tx.height here?
_counter += 1;
zebrad_rpc_client.submit_block(block).await?;
}

Ok(())
Expand Down

0 comments on commit 4234291

Please sign in to comment.