Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement deployment cache to reduce IPFS request load #30

Merged
merged 4 commits into from
Jul 31, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 73 additions & 13 deletions availability-oracle/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use manifest::{Abi, DataSource, Manifest, Mapping};
use network_subgraph::*;
use secp256k1::SecretKey;
use std::sync::Arc;
use std::time::SystemTime;
use std::time::{Duration, Instant};
use std::{fmt::Display, str::FromStr};
use structopt::StructOpt;
Expand Down Expand Up @@ -148,6 +149,8 @@ struct Config {
pub oracle_index: Option<u64>,
}

const DEPLOYMENT_CACHE_TTL: Duration = Duration::from_secs(60 * 60 * 24);

#[tokio::main]
async fn main() -> Result<()> {
common::main(run).await
Expand All @@ -159,6 +162,10 @@ async fn run(logger: Logger, config: Config) -> Result<()> {
let epoch_subgraph =
EpochBlockOracleSubgraphImpl::new(logger.clone(), config.epoch_block_oracle_subgraph);
let contract: Box<dyn StateManager> = if config.dry_run {
info!(
logger,
"Running in dry mode: no transactions will be submitted on chain!"
);
Box::new(StateManagerDryRun::new(logger.clone()))
} else {
let signing_key: &SecretKey = &config.signing_key.unwrap().parse()?;
Expand All @@ -181,6 +188,10 @@ async fn run(logger: Logger, config: Config) -> Result<()> {
if config.period > Duration::from_secs(0) {
let mut interval = tokio::time::interval(config.period);
interval.set_missed_tick_behavior(MissedTickBehavior::Skip);

// Valid deployments get checked only every DEPLOYMENT_CACHE_TTL seconds
let mut deployment_cache: Vec<(Cid, SystemTime)> = Vec::new();

loop {
interval.tick().await;

Expand All @@ -197,11 +208,16 @@ async fn run(logger: Logger, config: Config) -> Result<()> {
grace_period,
epoch_subgraph.clone(),
&config.supported_data_source_kinds,
deployment_cache.clone(),
)
.await
{
Ok(()) => {
Ok(updated_deployment_cache) => {
METRICS.reconcile_runs_ok.inc();
deployment_cache = updated_deployment_cache;
info!(logger, "Deployment cache updated";
"count" => deployment_cache.len()
);
}
Err(e) => {
METRICS.reconcile_runs_err.inc();
Expand All @@ -221,7 +237,7 @@ async fn run(logger: Logger, config: Config) -> Result<()> {
ipfs.invalidate_cache();
}
}
reconcile_deny_list(
match reconcile_deny_list(
&logger,
&ipfs,
&*contract,
Expand All @@ -230,8 +246,13 @@ async fn run(logger: Logger, config: Config) -> Result<()> {
grace_period,
epoch_subgraph.clone(),
&config.supported_data_source_kinds,
Vec::new(),
)
.await
{
Ok(_) => return Ok(()),
Err(e) => return Err(e),
}
}

// This function is used to create a state manager based on the configuration.
Expand Down Expand Up @@ -281,7 +302,8 @@ pub async fn reconcile_deny_list(
grace_period: Duration,
epoch_subgraph: Arc<impl EpochBlockOracleSubgraph>,
supported_ds_kinds: &[String],
) -> Result<(), Error> {
deployment_cache: Vec<(Cid, SystemTime)>,
) -> Result<Vec<(Cid, SystemTime)>, Error> {
let logger = logger.clone();

// Fetch supported networks
Expand All @@ -300,20 +322,35 @@ pub async fn reconcile_deny_list(
);

// Check the availability status of all subgraphs, and gather which should flip the deny flag.
let status_changes: Vec<([u8; 32], bool)> = subgraph
let deployment_status: Vec<([u8; 32], bool, bool, SystemTime)> = subgraph
.deployments_over_threshold(min_signal, grace_period)
.map(|deployment| async {
let deployment = deployment?;
let id = bytes32_to_cid_v0(deployment.id);
let validity = match check(ipfs, id, &supported_networks, supported_ds_kinds).await {
Ok(()) => Valid::Yes,
Err(CheckError::Invalid(e)) => Valid::No(e),
Err(CheckError::Other(e)) => return Err(e),

// Valid subgraphs are only checked every DEPLOYMENT_CACHE_TTL seconds to reduce IPFS requests
let cached = deployment_cache
.iter()
.filter(|(_, last_validated)| {
last_validated.elapsed().unwrap() < DEPLOYMENT_CACHE_TTL
})
.find(|(cid, _)| *cid == id);

if cached.is_some() {
METRICS.deployment_cache_hits.inc();
return Ok((deployment, Valid::Yes, cached.unwrap().1));
Maikol marked this conversation as resolved.
Show resolved Hide resolved
} else {
let validity = match check(ipfs, id, &supported_networks, supported_ds_kinds).await
{
Ok(()) => Valid::Yes,
Err(CheckError::Invalid(e)) => Valid::No(e),
Err(CheckError::Other(e)) => return Err(e),
};
return Ok((deployment, validity, SystemTime::now()));
};
Result::<_, Error>::Ok((deployment, validity))
})
.buffered(100)
.try_filter_map(move |(deployment, validity)| {
.try_filter_map(move |(deployment, validity, last_validated)| {
let logger = logger.clone();
async move {
info!(logger, "Check subgraph";
Expand All @@ -336,7 +373,7 @@ pub async fn reconcile_deny_list(
);
}
};
None
Some((deployment.id, should_deny, false, last_validated))
}

// The validity status changed, flip the deny flag.
Expand All @@ -347,15 +384,32 @@ pub async fn reconcile_deny_list(
"status" => should_deny,
"reason" => validity.to_string(),
);
Some((deployment.id, should_deny))
Some((deployment.id, should_deny, true, last_validated))
}
})
}
})
.try_collect()
.await?;

state_manager.deny_many(status_changes).await
// Flip on chain status for those deployments that changed
let changed_deployments = deployment_status
.iter()
.filter(|(_, _, status_changed, _)| *status_changed)
.map(|(cid, should_deny, _, _)| (*cid, *should_deny))
.collect();
match state_manager.deny_many(changed_deployments).await {
Ok(_) => {}
Err(e) => return Err(e),
};

// Return updated deployment cache
let updated_deployment_cache: Vec<(Cid, SystemTime)> = deployment_status
.iter()
.filter(|(_, should_deny, _, _)| !*should_deny)
.map(|(cid, _, _, last_validated)| (bytes32_to_cid_v0(*cid), *last_validated))
.collect();
Ok(updated_deployment_cache)
}

enum Valid {
Expand Down Expand Up @@ -537,6 +591,7 @@ struct Metrics {
reconcile_runs_total: prometheus::IntCounter,
reconcile_runs_ok: prometheus::IntCounter,
reconcile_runs_err: prometheus::IntCounter,
deployment_cache_hits: prometheus::IntCounter,
}

lazy_static! {
Expand All @@ -561,6 +616,11 @@ impl Metrics {
"Total reconcile runs with errors"
)
.unwrap(),
deployment_cache_hits: prometheus::register_int_counter!(
"deployment_cache_hits",
"Total deployment cache hits"
)
.unwrap(),
}
}
}
4 changes: 3 additions & 1 deletion availability-oracle/src/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ mod tests {
use futures::Stream;
use std::sync::Arc;
use std::time::Duration;
use std::time::SystemTime;
use std::{pin::Pin, str::FromStr};
use tiny_cid::Cid;

Expand Down Expand Up @@ -61,9 +62,10 @@ mod tests {
"file/ipfs".into(),
"substreams".into(),
],
vec![]
)
.await
.unwrap()
.unwrap();
}

struct MockSubgraph;
Expand Down
Loading