Skip to content

Commit

Permalink
Merge pull request #17 from graphprotocol/mde/use-ebo-subgraph-for-su…
Browse files Browse the repository at this point in the history
…pported-networks

feat: use epoch block oracle subgraph for supported networks
  • Loading branch information
Maikol authored May 16, 2024
2 parents c9108c1 + 8389340 commit e3e7e49
Show file tree
Hide file tree
Showing 4 changed files with 158 additions and 17 deletions.
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ FLAGS:
-V, --version Prints version information
OPTIONS:
--epoch-block-oracle-subgraph <subgraph>
Graphql endpoint to the epoch block oracle subgraph used for fetching supported networks [env: EPOCH_BLOCK_ORACLE_SUBGRAPH=]
--grace-period <grace-period>
Grace period, in seconds from subgraph creation, for which subgraphs will not be checked [env: ORACLE_GRACE_PERIOD=] [default: 0]
Expand All @@ -22,7 +25,7 @@ OPTIONS:
--ipfs-timeout <ipfs-timeout>
IPFS timeout after which a file will be considered unavailable [env: ORACLE_IPFS_TIMEOUT_SECS=] [default: 30]
--metrics-port <metrics-port>
[env: ORACLE_METRICS_PORT=] [default: 8090]
Expand All @@ -49,9 +52,6 @@ OPTIONS:
--supported-data-source-kinds <supported-data-source-kinds>...
a comma separated list of the supported data source kinds [env: SUPPORTED_DATA_SOURCE_KINDS=] [default: ethereum,ethereum/contract,file/ipfs,substreams,file/arweave]
-s, --supported-networks <supported-networks>...
a comma separated list of the supported network ids [env: SUPPORTED_NETWORKS=] [default: mainnet]
--url <url>
RPC url for the network [env: RPC_URL=]
Expand Down
116 changes: 116 additions & 0 deletions availability-oracle/src/epoch_block_oracle_subgraph.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
use common::prelude::*;
use futures::stream;
use futures::Stream;
use reqwest::Client;
use serde_derive::{Deserialize, Serialize};
use std::collections::BTreeMap;
use std::pin::Pin;
use std::sync::Arc;

pub trait EpochBlockOracleSubgraph {
fn supported_networks(self: Arc<Self>) -> Pin<Box<dyn Stream<Item = Result<String, Error>>>>;
}

pub struct EpochBlockOracleSubgraphImpl {
logger: Logger,
endpoint: String,
client: Client,
}

impl EpochBlockOracleSubgraphImpl {
pub fn new(logger: Logger, endpoint: String) -> Arc<Self> {
Arc::new(EpochBlockOracleSubgraphImpl {
logger,
endpoint,
client: Client::new(),
})
}
}

#[derive(Serialize)]
struct GraphqlRequest {
query: String,
variables: BTreeMap<String, serde_json::Value>,
}

#[derive(Deserialize)]
struct GraphqlResponse {
data: Option<BTreeMap<String, serde_json::Value>>,
errors: Option<Vec<serde_json::Value>>,
}

const SUPPORTED_NETWORKS_QUERY: &str = r#"
query Networks($skip: Int!) {
networks(first: 1000, skip: $skip) {
id
alias
}
}
"#;

impl EpochBlockOracleSubgraph for EpochBlockOracleSubgraphImpl {
fn supported_networks(self: Arc<Self>) -> Pin<Box<dyn Stream<Item = Result<String, Error>>>> {
stream::iter((0..).step_by(1000))
.then(move |skip| {
let this = self.clone();
async move {
let req = GraphqlRequest {
query: SUPPORTED_NETWORKS_QUERY.to_string(),
variables: vec![("skip".to_string(), skip.into())]
.into_iter()
.collect(),
};

let res: GraphqlResponse = this
.client
.post(&this.endpoint)
.json(&req)
.send()
.await?
.error_for_status()?
.json()
.await?;

if let Some(errs) = res.errors.filter(|errs| !errs.is_empty()) {
return Err(anyhow!(
"error querying supported networks from subgraph {}",
serde_json::to_string(&errs)?
));
}

let data = res
.data
.ok_or_else(|| anyhow!("Data field is missing in the response"))?
.remove("networks")
.ok_or_else(|| anyhow!("'networks' field is missing in the data"))?;

#[derive(Deserialize)]
#[allow(non_snake_case)]
struct RawNetwork {
id: String,
alias: String,
}

let page: Vec<RawNetwork> = serde_json::from_value(data)?;
let page: Vec<String> = page
.into_iter()
.flat_map(|raw_network| vec![raw_network.id, raw_network.alias])
.collect();

trace!(this.logger, "networks page"; "page_size" => page.len());

Ok(page)
}
})
.take_while(|networks| {
let keep_paginating = match networks {
Ok(networks) => !networks.is_empty(),
Err(_) => true,
};
async move { keep_paginating }
})
.map_ok(|networks| stream::iter(networks.into_iter().map(Ok)))
.try_flatten()
.boxed()
}
}
40 changes: 28 additions & 12 deletions availability-oracle/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
mod contract;
mod epoch_block_oracle_subgraph;
mod ipfs;
mod manifest;
mod network_subgraph;
Expand All @@ -8,6 +9,7 @@ mod util;
use common::prelude::*;
use common::prometheus;
use contract::*;
use epoch_block_oracle_subgraph::{EpochBlockOracleSubgraph, EpochBlockOracleSubgraphImpl};
use ethers::abi::Address;
use ethers::signers::LocalWallet;
use ethers::signers::Signer;
Expand Down Expand Up @@ -105,14 +107,11 @@ struct Config {
metrics_port: u16,

#[structopt(
short,
long,
default_value = "mainnet",
value_delimiter = ",",
env = "SUPPORTED_NETWORKS",
help = "a comma separated list of the supported network ids"
env = "EPOCH_BLOCK_ORACLE_SUBGRAPH",
help = "Graphql endpoint to the epoch block oracle subgraph"
)]
supported_networks: Vec<String>,
epoch_block_oracle_subgraph: String,

// Note: `ethereum/contract` is a valid alias for `ethereum`
#[structopt(
Expand Down Expand Up @@ -157,6 +156,8 @@ async fn main() -> Result<()> {
async fn run(logger: Logger, config: Config) -> Result<()> {
let ipfs = IpfsImpl::new(config.ipfs, config.ipfs_concurrency, config.ipfs_timeout);
let subgraph = NetworkSubgraphImpl::new(logger.clone(), config.subgraph);
let epoch_subgraph =
EpochBlockOracleSubgraphImpl::new(logger.clone(), config.epoch_block_oracle_subgraph);
let contract: Box<dyn StateManager> = if config.dry_run {
Box::new(StateManagerDryRun::new(logger.clone()))
} else {
Expand Down Expand Up @@ -194,7 +195,7 @@ async fn run(logger: Logger, config: Config) -> Result<()> {
subgraph.clone(),
config.min_signal,
grace_period,
&config.supported_networks,
epoch_subgraph.clone(),
&config.supported_data_source_kinds,
)
.await
Expand Down Expand Up @@ -227,7 +228,7 @@ async fn run(logger: Logger, config: Config) -> Result<()> {
subgraph,
config.min_signal,
grace_period,
&config.supported_networks,
epoch_subgraph.clone(),
&config.supported_data_source_kinds,
)
.await
Expand Down Expand Up @@ -278,18 +279,33 @@ pub async fn reconcile_deny_list(
subgraph: Arc<impl NetworkSubgraph>,
min_signal: u64,
grace_period: Duration,
supported_network_ids: &[String],
epoch_subgraph: Arc<impl EpochBlockOracleSubgraph>,
supported_ds_kinds: &[String],
) -> Result<(), Error> {
let logger = logger.clone();

// Fetch supported networks
let mut supported_networks = Vec::new();
let networks_stream = epoch_subgraph.supported_networks();
futures::pin_mut!(networks_stream);
while let Some(network) = networks_stream.next().await {
match network {
Ok(network_id) => supported_networks.push(network_id),
Err(e) => Err(e)?,
}
}

info!(logger, "Supported networks";
"alias" => supported_networks.join(", ")
);

// Check the availability status of all subgraphs, and gather which should flip the deny flag.
let status_changes: Vec<([u8; 32], bool)> = subgraph
.deployments_over_threshold(min_signal, grace_period)
.map(|deployment| async {
let deployment = deployment?;
let id = bytes32_to_cid_v0(deployment.id);
let validity = match check(ipfs, id, supported_network_ids, supported_ds_kinds).await {
let validity = match check(ipfs, id, &supported_networks, supported_ds_kinds).await {
Ok(()) => Valid::Yes,
Err(CheckError::Invalid(e)) => Valid::No(e),
Err(CheckError::Other(e)) => return Err(e),
Expand Down Expand Up @@ -419,7 +435,7 @@ impl From<Invalid> for CheckError {
async fn check(
ipfs: &impl Ipfs,
deployment_id: Cid,
supported_network_ids: &[String],
supported_networks: &[String],
supported_ds_kinds: &[String],
) -> Result<(), CheckError> {
fn check_link(file: &manifest::Link) -> Result<Cid, Invalid> {
Expand Down Expand Up @@ -483,7 +499,7 @@ async fn check(
// - That network is listed in the `supported_networks` list
match (network, ds_network) {
(None, Some(ds_network)) => {
if !supported_network_ids.contains(ds_network) {
if !supported_networks.contains(ds_network) {
return Err(Invalid::UnsupportedNetwork(ds_network.clone()).into());
}
network = Some(ds_network)
Expand Down
11 changes: 10 additions & 1 deletion availability-oracle/src/test.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::contract;
use crate::epoch_block_oracle_subgraph::*;
use crate::ipfs::*;
use crate::network_subgraph::*;
use crate::util::bytes32_to_cid_v0;
Expand Down Expand Up @@ -51,7 +52,7 @@ async fn test_reconcile() {
Arc::new(MockSubgraph),
0,
Duration::default(),
&vec!["mainnet".into()],
Arc::new(MockEBOSubgraph),
&vec![
"ethereum".into(),
"ethereum/contract".into(),
Expand Down Expand Up @@ -95,6 +96,14 @@ impl NetworkSubgraph for MockSubgraph {
}
}

struct MockEBOSubgraph;

impl EpochBlockOracleSubgraph for MockEBOSubgraph {
fn supported_networks(self: Arc<Self>) -> Pin<Box<dyn Stream<Item = Result<String, Error>>>> {
futures::stream::iter(vec![Ok("mainnet".to_string())]).boxed()
}
}

struct MockIpfs;

#[async_trait]
Expand Down

0 comments on commit e3e7e49

Please sign in to comment.