diff --git a/availability-oracle/src/main.rs b/availability-oracle/src/main.rs index b5d4eec..efe91d1 100644 --- a/availability-oracle/src/main.rs +++ b/availability-oracle/src/main.rs @@ -18,6 +18,7 @@ use manifest::{Abi, DataSource, Manifest, Mapping}; use network_subgraph::*; use secp256k1::SecretKey; use std::sync::Arc; +use std::time::SystemTime; use std::time::{Duration, Instant}; use std::{fmt::Display, str::FromStr}; use structopt::StructOpt; @@ -148,6 +149,8 @@ struct Config { pub oracle_index: Option, } +const VALID_DEPLOYMENT_CACHE_TTL: Duration = Duration::from_secs(60 * 60 * 24); + #[tokio::main] async fn main() -> Result<()> { common::main(run).await @@ -159,6 +162,10 @@ async fn run(logger: Logger, config: Config) -> Result<()> { let epoch_subgraph = EpochBlockOracleSubgraphImpl::new(logger.clone(), config.epoch_block_oracle_subgraph); let contract: Box = if config.dry_run { + info!( + logger, + "Running in dry mode: no transactions will be submitted on chain!" + ); Box::new(StateManagerDryRun::new(logger.clone())) } else { let signing_key: &SecretKey = &config.signing_key.unwrap().parse()?; @@ -181,6 +188,10 @@ async fn run(logger: Logger, config: Config) -> Result<()> { if config.period > Duration::from_secs(0) { let mut interval = tokio::time::interval(config.period); interval.set_missed_tick_behavior(MissedTickBehavior::Skip); + + // Valid deployments get checked only every VALID_DEPLOYMENT_CACHE_TTL seconds + let mut valid_deployment_cache: Vec<(Cid, SystemTime)> = Vec::new(); + loop { interval.tick().await; @@ -197,11 +208,16 @@ async fn run(logger: Logger, config: Config) -> Result<()> { grace_period, epoch_subgraph.clone(), &config.supported_data_source_kinds, + valid_deployment_cache.clone(), ) .await { - Ok(()) => { + Ok(updated_deployment_cache) => { METRICS.reconcile_runs_ok.inc(); + valid_deployment_cache = updated_deployment_cache; + info!(logger, "Deployment cache updated"; + "count" => valid_deployment_cache.len() + ); } Err(e) => { METRICS.reconcile_runs_err.inc(); @@ -221,7 +237,7 @@ async fn run(logger: Logger, config: Config) -> Result<()> { ipfs.invalidate_cache(); } } - reconcile_deny_list( + match reconcile_deny_list( &logger, &ipfs, &*contract, @@ -230,8 +246,13 @@ async fn run(logger: Logger, config: Config) -> Result<()> { grace_period, epoch_subgraph.clone(), &config.supported_data_source_kinds, + Vec::new(), ) .await + { + Ok(_) => return Ok(()), + Err(e) => return Err(e), + } } // This function is used to create a state manager based on the configuration. @@ -281,7 +302,8 @@ pub async fn reconcile_deny_list( grace_period: Duration, epoch_subgraph: Arc, supported_ds_kinds: &[String], -) -> Result<(), Error> { + valid_deployment_cache: Vec<(Cid, SystemTime)>, +) -> Result, Error> { let logger = logger.clone(); // Fetch supported networks @@ -300,20 +322,35 @@ pub async fn reconcile_deny_list( ); // Check the availability status of all subgraphs, and gather which should flip the deny flag. - let status_changes: Vec<([u8; 32], bool)> = subgraph + let deployment_status: Vec<([u8; 32], bool, bool, SystemTime)> = subgraph .deployments_over_threshold(min_signal, grace_period) .map(|deployment| async { let deployment = deployment?; let id = bytes32_to_cid_v0(deployment.id); - let validity = match check(ipfs, id, &supported_networks, supported_ds_kinds).await { - Ok(()) => Valid::Yes, - Err(CheckError::Invalid(e)) => Valid::No(e), - Err(CheckError::Other(e)) => return Err(e), + + // Valid subgraphs are only checked every VALID_DEPLOYMENT_CACHE_TTL seconds to reduce IPFS requests + let cached = valid_deployment_cache + .iter() + .filter(|(_, last_validated)| { + last_validated.elapsed().unwrap() < VALID_DEPLOYMENT_CACHE_TTL + }) + .find(|(cid, _)| *cid == id); + + if cached.is_some() { + METRICS.valid_deployment_cache_hits.inc(); + return Ok((deployment, Valid::Yes, cached.unwrap().1)); + } else { + let validity = match check(ipfs, id, &supported_networks, supported_ds_kinds).await + { + Ok(()) => Valid::Yes, + Err(CheckError::Invalid(e)) => Valid::No(e), + Err(CheckError::Other(e)) => return Err(e), + }; + return Ok((deployment, validity, SystemTime::now())); }; - Result::<_, Error>::Ok((deployment, validity)) }) .buffered(100) - .try_filter_map(move |(deployment, validity)| { + .try_filter_map(move |(deployment, validity, last_validated)| { let logger = logger.clone(); async move { info!(logger, "Check subgraph"; @@ -336,7 +373,7 @@ pub async fn reconcile_deny_list( ); } }; - None + Some((deployment.id, should_deny, false, last_validated)) } // The validity status changed, flip the deny flag. @@ -347,7 +384,7 @@ pub async fn reconcile_deny_list( "status" => should_deny, "reason" => validity.to_string(), ); - Some((deployment.id, should_deny)) + Some((deployment.id, should_deny, true, last_validated)) } }) } @@ -355,7 +392,24 @@ pub async fn reconcile_deny_list( .try_collect() .await?; - state_manager.deny_many(status_changes).await + // Flip on chain status for those deployments that changed + let changed_deployments = deployment_status + .iter() + .filter(|(_, _, status_changed, _)| *status_changed) + .map(|(cid, should_deny, _, _)| (*cid, *should_deny)) + .collect(); + match state_manager.deny_many(changed_deployments).await { + Ok(_) => {} + Err(e) => return Err(e), + }; + + // Return updated deployment cache + let updated_deployment_cache: Vec<(Cid, SystemTime)> = deployment_status + .iter() + .filter(|(_, should_deny, _, _)| !*should_deny) + .map(|(cid, _, _, last_validated)| (bytes32_to_cid_v0(*cid), *last_validated)) + .collect(); + Ok(updated_deployment_cache) } enum Valid { @@ -537,6 +591,7 @@ struct Metrics { reconcile_runs_total: prometheus::IntCounter, reconcile_runs_ok: prometheus::IntCounter, reconcile_runs_err: prometheus::IntCounter, + valid_deployment_cache_hits: prometheus::IntCounter, } lazy_static! { @@ -561,6 +616,11 @@ impl Metrics { "Total reconcile runs with errors" ) .unwrap(), + valid_deployment_cache_hits: prometheus::register_int_counter!( + "valid_deployment_cache_hits", + "Total valid deployment cache hits" + ) + .unwrap(), } } } diff --git a/availability-oracle/src/test.rs b/availability-oracle/src/test.rs index 32c3a1c..fd13864 100644 --- a/availability-oracle/src/test.rs +++ b/availability-oracle/src/test.rs @@ -61,9 +61,10 @@ mod tests { "file/ipfs".into(), "substreams".into(), ], + vec![], ) .await - .unwrap() + .unwrap(); } struct MockSubgraph;