Skip to content

Commit

Permalink
Scan blocks with sapling keys and write the results to the database (#…
Browse files Browse the repository at this point in the history
…8040)

* Fix availability of tokio::time in scanner

* Create a function that parses a key into a list of keys

* Pass a ChainTipChange to the scanner function

* Convert a scanned block to a sapling result

* Make it easier to pass keys and blocks

* Increase scanner wait times

* Parse keys once at the start of the scan

* Get a block from the state instead of the tip

* Don't log secret keys, only log every 100,000 blocks

* Scan each block and add the results to storage

* Move blocking tasks into spawn_blocking()

* Update the acceptance test

* Use a dummy sapling tree size

* Use a larger dummy size
  • Loading branch information
teor2345 authored Dec 3, 2023
1 parent fc2576b commit 4306a00
Show file tree
Hide file tree
Showing 10 changed files with 225 additions and 52 deletions.
1 change: 1 addition & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -5802,6 +5802,7 @@ dependencies = [
"ff",
"group",
"indexmap 2.1.0",
"itertools 0.12.0",
"jubjub",
"rand 0.8.5",
"semver 1.0.20",
Expand Down
3 changes: 2 additions & 1 deletion zebra-scan/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@ categories = ["cryptography::cryptocurrencies"]

color-eyre = "0.6.2"
indexmap = { version = "2.0.1", features = ["serde"] }
itertools = "0.12.0"
semver = "1.0.20"
serde = { version = "1.0.193", features = ["serde_derive"] }
tokio = "1.34.0"
tokio = { version = "1.34.0", features = ["time"] }
tower = "0.4.13"
tracing = "0.1.39"

Expand Down
4 changes: 3 additions & 1 deletion zebra-scan/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,15 @@ use crate::storage::SaplingScanningKey;
/// Configuration for scanning.
pub struct Config {
/// The sapling keys to scan for and the birthday height of each of them.
///
/// Currently only supports Extended Full Viewing Keys in ZIP-32 format.
//
// TODO: allow keys without birthdays
pub sapling_keys_to_scan: IndexMap<SaplingScanningKey, u32>,

/// The scanner results database config.
//
// TODO: Remove fields that are only used by the state to create a common database config.
// TODO: Remove fields that are only used by the state, and create a common database config.
#[serde(flatten)]
db_config: DbConfig,
}
Expand Down
15 changes: 12 additions & 3 deletions zebra-scan/src/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use tokio::task::JoinHandle;
use tracing::Instrument;

use zebra_chain::{diagnostic::task::WaitForPanics, parameters::Network};
use zebra_state::ChainTipChange;

use crate::{scan, storage::Storage, Config};

Expand All @@ -15,19 +16,27 @@ pub fn spawn_init(
config: &Config,
network: Network,
state: scan::State,
chain_tip_change: ChainTipChange,
) -> JoinHandle<Result<(), Report>> {
let config = config.clone();
tokio::spawn(init(config, network, state).in_current_span())

// TODO: spawn an entirely new executor here, to avoid timing attacks.
tokio::spawn(init(config, network, state, chain_tip_change).in_current_span())
}

/// Initialize the scanner based on its config.
///
/// TODO: add a test for this function.
pub async fn init(config: Config, network: Network, state: scan::State) -> Result<(), Report> {
pub async fn init(
config: Config,
network: Network,
state: scan::State,
chain_tip_change: ChainTipChange,
) -> Result<(), Report> {
let storage = tokio::task::spawn_blocking(move || Storage::new(&config, network))
.wait_for_panics()
.await;

// TODO: add more tasks here?
scan::start(state, storage).await
scan::start(state, chain_tip_change, storage).await
}
221 changes: 183 additions & 38 deletions zebra-scan/src/scan.rs
Original file line number Diff line number Diff line change
@@ -1,81 +1,184 @@
//! The scanner task and scanning APIs.
use std::{sync::Arc, time::Duration};
use std::{collections::HashMap, sync::Arc, time::Duration};

use color_eyre::{eyre::eyre, Report};
use itertools::Itertools;
use tower::{buffer::Buffer, util::BoxService, Service, ServiceExt};
use tracing::info;

use zcash_client_backend::{
data_api::ScannedBlock,
encoding::decode_extended_full_viewing_key,
proto::compact_formats::{
ChainMetadata, CompactBlock, CompactSaplingOutput, CompactSaplingSpend, CompactTx,
},
scanning::{ScanError, ScanningKey},
};
use zcash_primitives::zip32::AccountId;
use zcash_primitives::{
constants::*,
sapling::SaplingIvk,
zip32::{AccountId, DiversifiableFullViewingKey, Scope},
};

use zebra_chain::{
block::Block, diagnostic::task::WaitForPanics, parameters::Network,
serialization::ZcashSerialize, transaction::Transaction,
block::Block,
chain_tip::ChainTip,
diagnostic::task::WaitForPanics,
parameters::Network,
serialization::ZcashSerialize,
transaction::{self, Transaction},
};
use zebra_state::{ChainTipChange, SaplingScannedResult};

use crate::storage::Storage;
use crate::storage::{SaplingScanningKey, Storage};

/// The generic state type used by the scanner.
pub type State = Buffer<
BoxService<zebra_state::Request, zebra_state::Response, zebra_state::BoxError>,
zebra_state::Request,
>;

/// Wait a few seconds at startup so tip height is always `Some`.
const INITIAL_WAIT: Duration = Duration::from_secs(10);
/// Wait a few seconds at startup for some blocks to get verified.
///
/// But sometimes the state might be empty if the network is slow.
const INITIAL_WAIT: Duration = Duration::from_secs(15);

/// The amount of time between checking and starting new scans.
/// The amount of time between checking for new blocks and starting new scans.
///
/// This is just under half the target block interval.
const CHECK_INTERVAL: Duration = Duration::from_secs(30);

/// Start the scan task given state and storage.
///
/// - This function is dummy at the moment. It just makes sure we can read the storage and the state.
/// - Modifications here might have an impact in the `scan_task_starts` test.
/// - Real scanning code functionality will be added in the future here.
pub async fn start(mut state: State, storage: Storage) -> Result<(), Report> {
// We want to make sure the state has a tip height available before we start scanning.
/// We log an info log with progress after this many blocks.
const INFO_LOG_INTERVAL: u32 = 100_000;

/// Start a scan task that reads blocks from `state`, scans them with the configured keys in
/// `storage`, and then writes the results to `storage`.
pub async fn start(
mut state: State,
chain_tip_change: ChainTipChange,
storage: Storage,
) -> Result<(), Report> {
let network = storage.network();
let mut height = storage.min_sapling_birthday_height();

// Read keys from the storage on disk, which can block async execution.
let key_storage = storage.clone();
let key_birthdays = tokio::task::spawn_blocking(move || key_storage.sapling_keys())
.wait_for_panics()
.await;

// Parse and convert keys once, then use them to scan all blocks.
// There is some cryptography here, but it should be fast even with thousands of keys.
let parsed_keys: HashMap<
SaplingScanningKey,
(Vec<DiversifiableFullViewingKey>, Vec<SaplingIvk>),
> = key_birthdays
.keys()
.map(|key| {
let parsed_keys = sapling_key_to_scan_block_keys(key, network)?;
Ok::<_, Report>((key.clone(), parsed_keys))
})
.try_collect()?;

// Give empty states time to verify some blocks before we start scanning.
tokio::time::sleep(INITIAL_WAIT).await;

loop {
// Make sure we can query the state
let request = state
// Get a block from the state.
// We can't use ServiceExt::oneshot() here, because it causes lifetime errors in init().
let block = state
.ready()
.await
.map_err(|e| eyre!(e))?
.call(zebra_state::Request::Tip)
.call(zebra_state::Request::Block(height.into()))
.await
.map_err(|e| eyre!(e));
.map_err(|e| eyre!(e))?;

let tip = match request? {
zebra_state::Response::Tip(tip) => tip,
let block = match block {
zebra_state::Response::Block(Some(block)) => block,
zebra_state::Response::Block(None) => {
// If we've reached the tip, sleep for a while then try and get the same block.
tokio::time::sleep(CHECK_INTERVAL).await;
continue;
}
_ => unreachable!("unmatched response to a state::Tip request"),
};

// Read keys from the storage on disk, which can block.
let key_storage = storage.clone();
let available_keys = tokio::task::spawn_blocking(move || key_storage.sapling_keys())
.wait_for_panics()
.await;
// Only log at info level every 100,000 blocks
let is_info_log =
height == storage.min_sapling_birthday_height() || height.0 % INFO_LOG_INTERVAL == 0;

for key in available_keys {
// TODO: add debug logs?
if is_info_log {
info!(
"Scanning the blockchain for key {} from block {:?} to {:?}",
key.0, key.1, tip,
"Scanning the blockchain: now at block {:?}, current tip {:?}",
height,
chain_tip_change
.latest_chain_tip()
.best_tip_height_and_hash(),
);
}

tokio::time::sleep(CHECK_INTERVAL).await;
for (key_num, (sapling_key, birthday_height)) in key_birthdays.iter().enumerate() {
// # Security
//
// We can't log `sapling_key` here because it is a private viewing key. Anyone who reads
// the logs could use the key to view those transactions.
if is_info_log {
info!(
"Scanning the blockchain for key {}, started at block {:?}",
key_num, birthday_height,
);
}

// Get the pre-parsed keys for this configured key.
let (dfvks, ivks) = parsed_keys.get(sapling_key).cloned().unwrap_or_default();

// Scan the block, which blocks async execution until the scan is complete.
//
// TODO: skip scanning before birthday height (#8022)
// TODO: scan each key in parallel (after MVP?)
let sapling_key = sapling_key.clone();
let block = block.clone();
let mut storage = storage.clone();

// We use a dummy size of the Sapling note commitment tree.
//
// We can't set the size to zero, because the underlying scanning function would return
// `zcash_client_backeng::scanning::ScanError::TreeSizeUnknown`.
//
// And we can't set them close to 0, because the scanner subtracts the number of notes
// in the block, and panics with "attempt to subtract with overflow". The number of
// notes in a block must be less than this value, this is a consensus rule.
let sapling_tree_size = 1 << 16;

tokio::task::spawn_blocking(move || {
let dfvk_res =
scan_block(network, &block, sapling_tree_size, &dfvks).map_err(|e| eyre!(e))?;
let ivk_res =
scan_block(network, &block, sapling_tree_size, &ivks).map_err(|e| eyre!(e))?;

let dfvk_res = scanned_block_to_db_result(dfvk_res);
let ivk_res = scanned_block_to_db_result(ivk_res);

storage.add_sapling_result(sapling_key.clone(), height, dfvk_res);
storage.add_sapling_result(sapling_key, height, ivk_res);

Ok::<_, Report>(())
})
.wait_for_panics()
.await?;
}

height = height
.next()
.expect("a valid blockchain never reaches the max height");
}
}

/// Returns transactions belonging to the given `ScanningKey`.
/// Returns transactions belonging to the given `ScanningKey`. This list of keys should come from
/// a single configured `SaplingScanningKey`.
///
/// # Performance / Hangs
///
Expand All @@ -88,9 +191,9 @@ pub async fn start(mut state: State, storage: Storage) -> Result<(), Report> {
/// - Add prior block metadata once we have access to Zebra's state.
pub fn scan_block<K: ScanningKey>(
network: Network,
block: Arc<Block>,
block: &Arc<Block>,
sapling_tree_size: u32,
scanning_key: &K,
scanning_keys: &[K],
) -> Result<ScannedBlock<K::Nf>, ScanError> {
// TODO: Implement a check that returns early when the block height is below the Sapling
// activation height.
Expand All @@ -105,29 +208,61 @@ pub fn scan_block<K: ScanningKey>(

// Use a dummy `AccountId` as we don't use accounts yet.
let dummy_account = AccountId::from(0);

// We only support scanning one key and one block per function call for now.
let scanning_keys = vec![(&dummy_account, scanning_key)];
let scanning_keys: Vec<_> = scanning_keys
.iter()
.map(|key| (&dummy_account, key))
.collect();

zcash_client_backend::scanning::scan_block(
&network,
block_to_compact(block, chain_metadata),
&scanning_keys,
scanning_keys.as_slice(),
// Ignore whether notes are change from a viewer's own spends for now.
&[],
// Ignore previous blocks for now.
None,
)
}

/// Converts a Zebra-format scanning key into some `scan_block()` keys.
///
/// Currently only accepts extended full viewing keys, and returns both their diversifiable full
/// viewing key and their individual viewing key, for testing purposes.
///
/// TODO: work out what string format is used for SaplingIvk, if any, and support it here
/// performance: stop returning both the dfvk and ivk for the same key
pub fn sapling_key_to_scan_block_keys(
sapling_key: &SaplingScanningKey,
network: Network,
) -> Result<(Vec<DiversifiableFullViewingKey>, Vec<SaplingIvk>), Report> {
let hrp = if network.is_a_test_network() {
// Assume custom testnets have the same HRP
//
// TODO: add the regtest HRP here
testnet::HRP_SAPLING_EXTENDED_FULL_VIEWING_KEY
} else {
mainnet::HRP_SAPLING_EXTENDED_FULL_VIEWING_KEY
};

let efvk = decode_extended_full_viewing_key(hrp, sapling_key).map_err(|e| eyre!(e))?;

// Just return all the keys for now, so we can be sure our code supports them.
let dfvk = efvk.to_diversifiable_full_viewing_key();
let eivk = dfvk.to_ivk(Scope::External);
let iivk = dfvk.to_ivk(Scope::Internal);

Ok((vec![dfvk], vec![eivk, iivk]))
}

/// Converts a zebra block and meta data into a compact block.
pub fn block_to_compact(block: Arc<Block>, chain_metadata: ChainMetadata) -> CompactBlock {
pub fn block_to_compact(block: &Arc<Block>, chain_metadata: ChainMetadata) -> CompactBlock {
CompactBlock {
height: block
.coinbase_height()
.expect("verified block should have a valid height")
.0
.into(),
// TODO: performance: look up the block hash from the state rather than recalculating it
hash: block.hash().bytes_in_display_order().to_vec(),
prev_hash: block
.header
Expand Down Expand Up @@ -164,6 +299,7 @@ fn transaction_to_compact((index, tx): (usize, Arc<Transaction>)) -> CompactTx {
index: index
.try_into()
.expect("tx index in block should fit in u64"),
// TODO: performance: look up the tx hash from the state rather than recalculating it
hash: tx.hash().bytes_in_display_order().to_vec(),

// `fee` is not checked by the `scan_block` function. It is allowed to be unset.
Expand Down Expand Up @@ -202,3 +338,12 @@ fn transaction_to_compact((index, tx): (usize, Arc<Transaction>)) -> CompactTx {
actions: vec![],
}
}

/// Convert a scanned block to a list of scanner database results.
fn scanned_block_to_db_result<Nf>(scanned_block: ScannedBlock<Nf>) -> Vec<SaplingScannedResult> {
scanned_block
.transactions()
.iter()
.map(|tx| transaction::Hash::from_bytes_in_display_order(tx.txid.as_ref()))
.collect()
}
Loading

0 comments on commit 4306a00

Please sign in to comment.