diff --git a/README.md b/README.md index 860ae1e..d272384 100644 --- a/README.md +++ b/README.md @@ -11,6 +11,9 @@ FLAGS: -V, --version Prints version information OPTIONS: + --epoch-block-oracle-subgraph + Graphql endpoint to the epoch block oracle subgraph used for fetching supported networks [env: EPOCH_BLOCK_ORACLE_SUBGRAPH=] + --grace-period Grace period, in seconds from subgraph creation, for which subgraphs will not be checked [env: ORACLE_GRACE_PERIOD=] [default: 0] @@ -22,7 +25,7 @@ OPTIONS: --ipfs-timeout IPFS timeout after which a file will be considered unavailable [env: ORACLE_IPFS_TIMEOUT_SECS=] [default: 30] - + --metrics-port [env: ORACLE_METRICS_PORT=] [default: 8090] @@ -49,9 +52,6 @@ OPTIONS: --supported-data-source-kinds ... a comma separated list of the supported data source kinds [env: SUPPORTED_DATA_SOURCE_KINDS=] [default: ethereum,ethereum/contract,file/ipfs,substreams,file/arweave] - - -s, --supported-networks ... - a comma separated list of the supported network ids [env: SUPPORTED_NETWORKS=] [default: mainnet] --url RPC url for the network [env: RPC_URL=] diff --git a/availability-oracle/src/epoch_block_oracle_subgraph.rs b/availability-oracle/src/epoch_block_oracle_subgraph.rs new file mode 100644 index 0000000..40c41f7 --- /dev/null +++ b/availability-oracle/src/epoch_block_oracle_subgraph.rs @@ -0,0 +1,116 @@ +use common::prelude::*; +use futures::stream; +use futures::Stream; +use reqwest::Client; +use serde_derive::{Deserialize, Serialize}; +use std::collections::BTreeMap; +use std::pin::Pin; +use std::sync::Arc; + +pub trait EpochBlockOracleSubgraph { + fn supported_networks(self: Arc) -> Pin>>>; +} + +pub struct EpochBlockOracleSubgraphImpl { + logger: Logger, + endpoint: String, + client: Client, +} + +impl EpochBlockOracleSubgraphImpl { + pub fn new(logger: Logger, endpoint: String) -> Arc { + Arc::new(EpochBlockOracleSubgraphImpl { + logger, + endpoint, + client: Client::new(), + }) + } +} + +#[derive(Serialize)] +struct GraphqlRequest { + query: String, + variables: BTreeMap, +} + +#[derive(Deserialize)] +struct GraphqlResponse { + data: Option>, + errors: Option>, +} + +const SUPPORTED_NETWORKS_QUERY: &str = r#" +query Networks($skip: Int!) { + networks(first: 1000, skip: $skip) { + id + alias + } + } +"#; + +impl EpochBlockOracleSubgraph for EpochBlockOracleSubgraphImpl { + fn supported_networks(self: Arc) -> Pin>>> { + stream::iter((0..).step_by(1000)) + .then(move |skip| { + let this = self.clone(); + async move { + let req = GraphqlRequest { + query: SUPPORTED_NETWORKS_QUERY.to_string(), + variables: vec![("skip".to_string(), skip.into())] + .into_iter() + .collect(), + }; + + let res: GraphqlResponse = this + .client + .post(&this.endpoint) + .json(&req) + .send() + .await? + .error_for_status()? + .json() + .await?; + + if let Some(errs) = res.errors.filter(|errs| !errs.is_empty()) { + return Err(anyhow!( + "error querying supported networks from subgraph {}", + serde_json::to_string(&errs)? + )); + } + + let data = res + .data + .ok_or_else(|| anyhow!("Data field is missing in the response"))? + .remove("networks") + .ok_or_else(|| anyhow!("'networks' field is missing in the data"))?; + + #[derive(Deserialize)] + #[allow(non_snake_case)] + struct RawNetwork { + id: String, + alias: String, + } + + let page: Vec = serde_json::from_value(data)?; + let page: Vec = page + .into_iter() + .flat_map(|raw_network| vec![raw_network.id, raw_network.alias]) + .collect(); + + trace!(this.logger, "networks page"; "page_size" => page.len()); + + Ok(page) + } + }) + .take_while(|networks| { + let keep_paginating = match networks { + Ok(networks) => !networks.is_empty(), + Err(_) => true, + }; + async move { keep_paginating } + }) + .map_ok(|networks| stream::iter(networks.into_iter().map(Ok))) + .try_flatten() + .boxed() + } +} diff --git a/availability-oracle/src/main.rs b/availability-oracle/src/main.rs index 5396f55..397906c 100644 --- a/availability-oracle/src/main.rs +++ b/availability-oracle/src/main.rs @@ -1,4 +1,5 @@ mod contract; +mod epoch_block_oracle_subgraph; mod ipfs; mod manifest; mod network_subgraph; @@ -8,6 +9,7 @@ mod util; use common::prelude::*; use common::prometheus; use contract::*; +use epoch_block_oracle_subgraph::{EpochBlockOracleSubgraph, EpochBlockOracleSubgraphImpl}; use ethers::abi::Address; use ethers::signers::LocalWallet; use ethers::signers::Signer; @@ -105,14 +107,11 @@ struct Config { metrics_port: u16, #[structopt( - short, long, - default_value = "mainnet", - value_delimiter = ",", - env = "SUPPORTED_NETWORKS", - help = "a comma separated list of the supported network ids" + env = "EPOCH_BLOCK_ORACLE_SUBGRAPH", + help = "Graphql endpoint to the epoch block oracle subgraph" )] - supported_networks: Vec, + epoch_block_oracle_subgraph: String, // Note: `ethereum/contract` is a valid alias for `ethereum` #[structopt( @@ -157,6 +156,8 @@ async fn main() -> Result<()> { async fn run(logger: Logger, config: Config) -> Result<()> { let ipfs = IpfsImpl::new(config.ipfs, config.ipfs_concurrency, config.ipfs_timeout); let subgraph = NetworkSubgraphImpl::new(logger.clone(), config.subgraph); + let epoch_subgraph = + EpochBlockOracleSubgraphImpl::new(logger.clone(), config.epoch_block_oracle_subgraph); let contract: Box = if config.dry_run { Box::new(StateManagerDryRun::new(logger.clone())) } else { @@ -194,7 +195,7 @@ async fn run(logger: Logger, config: Config) -> Result<()> { subgraph.clone(), config.min_signal, grace_period, - &config.supported_networks, + epoch_subgraph.clone(), &config.supported_data_source_kinds, ) .await @@ -227,7 +228,7 @@ async fn run(logger: Logger, config: Config) -> Result<()> { subgraph, config.min_signal, grace_period, - &config.supported_networks, + epoch_subgraph.clone(), &config.supported_data_source_kinds, ) .await @@ -278,18 +279,33 @@ pub async fn reconcile_deny_list( subgraph: Arc, min_signal: u64, grace_period: Duration, - supported_network_ids: &[String], + epoch_subgraph: Arc, supported_ds_kinds: &[String], ) -> Result<(), Error> { let logger = logger.clone(); + // Fetch supported networks + let mut supported_networks = Vec::new(); + let networks_stream = epoch_subgraph.supported_networks(); + futures::pin_mut!(networks_stream); + while let Some(network) = networks_stream.next().await { + match network { + Ok(network_id) => supported_networks.push(network_id), + Err(e) => Err(e)?, + } + } + + info!(logger, "Supported networks"; + "alias" => supported_networks.join(", ") + ); + // Check the availability status of all subgraphs, and gather which should flip the deny flag. let status_changes: Vec<([u8; 32], bool)> = subgraph .deployments_over_threshold(min_signal, grace_period) .map(|deployment| async { let deployment = deployment?; let id = bytes32_to_cid_v0(deployment.id); - let validity = match check(ipfs, id, supported_network_ids, supported_ds_kinds).await { + let validity = match check(ipfs, id, &supported_networks, supported_ds_kinds).await { Ok(()) => Valid::Yes, Err(CheckError::Invalid(e)) => Valid::No(e), Err(CheckError::Other(e)) => return Err(e), @@ -419,7 +435,7 @@ impl From for CheckError { async fn check( ipfs: &impl Ipfs, deployment_id: Cid, - supported_network_ids: &[String], + supported_networks: &[String], supported_ds_kinds: &[String], ) -> Result<(), CheckError> { fn check_link(file: &manifest::Link) -> Result { @@ -483,7 +499,7 @@ async fn check( // - That network is listed in the `supported_networks` list match (network, ds_network) { (None, Some(ds_network)) => { - if !supported_network_ids.contains(ds_network) { + if !supported_networks.contains(ds_network) { return Err(Invalid::UnsupportedNetwork(ds_network.clone()).into()); } network = Some(ds_network) diff --git a/availability-oracle/src/test.rs b/availability-oracle/src/test.rs index 2809766..a24b9b5 100644 --- a/availability-oracle/src/test.rs +++ b/availability-oracle/src/test.rs @@ -1,4 +1,5 @@ use crate::contract; +use crate::epoch_block_oracle_subgraph::*; use crate::ipfs::*; use crate::network_subgraph::*; use crate::util::bytes32_to_cid_v0; @@ -51,7 +52,7 @@ async fn test_reconcile() { Arc::new(MockSubgraph), 0, Duration::default(), - &vec!["mainnet".into()], + Arc::new(MockEBOSubgraph), &vec![ "ethereum".into(), "ethereum/contract".into(), @@ -95,6 +96,14 @@ impl NetworkSubgraph for MockSubgraph { } } +struct MockEBOSubgraph; + +impl EpochBlockOracleSubgraph for MockEBOSubgraph { + fn supported_networks(self: Arc) -> Pin>>> { + futures::stream::iter(vec![Ok("mainnet".to_string())]).boxed() + } +} + struct MockIpfs; #[async_trait]