Skip to content

Commit

Permalink
fixes a concurrency bug, adds more detailed errors.
Browse files Browse the repository at this point in the history
  • Loading branch information
arya2 committed Nov 29, 2024
1 parent 65982f7 commit 07fa64e
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 2 deletions.
2 changes: 1 addition & 1 deletion zebrad/src/components/mempool/downloads.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ pub enum TransactionDownloadVerifyError {
#[error("transaction download / verification was cancelled")]
Cancelled,

#[error("transaction did not pass consensus validation")]
#[error("transaction did not pass consensus validation, error: {0}")]
Invalid(#[from] zebra_consensus::error::TransactionError),
}

Expand Down
21 changes: 21 additions & 0 deletions zebrad/tests/common/lightwalletd/send_transaction_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ pub async fn run() -> Result<()> {
?lightwalletd_rpc_port,
"connecting gRPC client to lightwalletd...",
);
zebrad.wait_for_stdout_line(None);

let mut rpc_client = connect_to_lightwalletd(lightwalletd_rpc_port).await?;

Expand All @@ -181,11 +182,14 @@ pub async fn run() -> Result<()> {
.get_mempool_tx(Exclude { txid: vec![] })
.await?
.into_inner();
zebrad.wait_for_stdout_line(None);

let mut has_tx_with_shielded_elements = false;
let mut counter = 0;

for block in blocks {
zebrad.wait_for_stdout_line(None);

let (zebrad_child, has_shielded_elements, count) = send_transactions_from_block(
zebrad,
&mut rpc_client,
Expand All @@ -195,6 +199,8 @@ pub async fn run() -> Result<()> {
.await?;

zebrad = zebrad_child;
zebrad.wait_for_stdout_line(None);

has_tx_with_shielded_elements |= has_shielded_elements;
counter += count;

Expand All @@ -218,6 +224,8 @@ pub async fn run() -> Result<()> {
/// can be found in the mempool via lightwalletd, and commits the block to Zebra's chainstate.
///
/// Returns the zebrad test child that's handling the RPC requests.
#[tracing::instrument(skip_all)]
async fn send_transactions_from_block(
mut zebrad: TestChild<tempfile::TempDir>,
rpc_client: &mut CompactTxStreamerClient<tonic::transport::Channel>,
Expand Down Expand Up @@ -264,6 +272,8 @@ async fn send_transactions_from_block(
};

tracing::info!(?transaction_hash, "sending transaction...");
// TODO: This may consume expected lines, try increasing the tracing buffer size instead if a full stdout pipe is causing panics.
zebrad.wait_for_stdout_line(None);

let request = prepare_send_transaction_request(transaction.clone());

Expand All @@ -284,18 +294,24 @@ async fn send_transactions_from_block(
// Fails if there are only coinbase transactions in the first 50 future blocks
tracing::info!("waiting for mempool to verify some transactions...");
zebrad.expect_stdout_line_matches("sending mempool transaction broadcast")?;
} else {
zebrad.wait_for_stdout_line(None);
}

// Wait for more transactions to verify, `GetMempoolTx` only returns txs where tx.HasShieldedElements()
// <https://github.com/zcash/lightwalletd/blob/master/frontend/service.go#L537>
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
zebrad.wait_for_stdout_line(None);

sleep_until_lwd_last_mempool_refresh.await;
zebrad.wait_for_stdout_line(None);

tracing::info!("calling GetMempoolTx gRPC to fetch transactions...");
let mut transactions_stream = rpc_client
.get_mempool_tx(Exclude { txid: vec![] })
.await?
.into_inner();
zebrad.wait_for_stdout_line(None);

// Sometimes lightwalletd doesn't check the mempool, and waits for the next block instead.
// If that happens, we skip the rest of the test.
Expand All @@ -319,6 +335,8 @@ async fn send_transactions_from_block(
tracing::info!("checking the mempool contains some of the sent transactions...");
let mut counter = 0;
while let Some(tx) = transactions_stream.message().await? {
zebrad.wait_for_stdout_line(None);

let hash: [u8; 32] = tx.hash.clone().try_into().expect("hash is correct length");
let hash = transaction::Hash::from_bytes_in_display_order(&hash);

Expand All @@ -330,15 +348,18 @@ async fn send_transactions_from_block(

counter += 1;
}
zebrad.wait_for_stdout_line(None);

// TODO: GetMempoolStream: make sure at least one of the transactions were inserted into the mempool.
tracing::info!("calling GetMempoolStream gRPC to fetch transactions...");
let mut transaction_stream = rpc_client.get_mempool_stream(Empty {}).await?.into_inner();
zebrad.wait_for_stdout_line(None);

let mut _counter = 0;
while let Some(_tx) = transaction_stream.message().await? {
// TODO: check tx.data or tx.height here?
_counter += 1;
zebrad.wait_for_stdout_line(None);
}

Ok((zebrad, has_tx_with_shielded_elements, counter))
Expand Down
2 changes: 1 addition & 1 deletion zebrad/tests/common/lightwalletd/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ pub fn wait_for_zebrad_and_lightwalletd_sync<
wait_for_zebrad_mempool: bool,
wait_for_zebrad_tip: bool,
) -> Result<(TestChild<TempDir>, TestChild<P>)> {
let is_zebrad_finished = AtomicBool::new(false);
let is_zebrad_finished = AtomicBool::new(!wait_for_zebrad_tip);
let is_lightwalletd_finished = AtomicBool::new(false);

let is_zebrad_finished = &is_zebrad_finished;
Expand Down

0 comments on commit 07fa64e

Please sign in to comment.