Skip to content

Commit

Permalink
Use the new Firehose info endpoint (#5672)
Browse files Browse the repository at this point in the history
* graph: refactor provider manager to support extended block checks

* firehose: use endpoint info

* node: add optional extended block checks for providers

* node: enable extended blocks checks for all chains by default

* graph: add provider check strategy to make api nicer

* graph: add tests for provider checks

* graph: prepare graphman cli integration & clean-up tests

* node: update graphman provider checks command

* graphman: enable extended blocks check on deployment run
  • Loading branch information
isum authored Nov 13, 2024
1 parent 4c49952 commit 9458fc5
Show file tree
Hide file tree
Showing 49 changed files with 2,934 additions and 1,496 deletions.
14 changes: 12 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ anyhow = "1.0"
async-graphql = { version = "7.0.11", features = ["chrono", "uuid"] }
async-graphql-axum = "7.0.11"
axum = "0.7.5"
bs58 = "0.5.1"
chrono = "0.4.38"
clap = { version = "4.5.4", features = ["derive", "env"] }
derivative = "2.2.0"
Expand Down
4 changes: 2 additions & 2 deletions chain/arweave/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use graph::blockchain::{
EmptyNodeCapabilities, NoopDecoderHook, NoopRuntimeAdapter,
};
use graph::cheap_clone::CheapClone;
use graph::components::adapter::ChainId;
use graph::components::network_provider::ChainName;
use graph::components::store::DeploymentCursorTracker;
use graph::data::subgraph::UnifiedMappingApiVersion;
use graph::env::EnvVars;
Expand Down Expand Up @@ -42,7 +42,7 @@ use graph::blockchain::block_stream::{

pub struct Chain {
logger_factory: LoggerFactory,
name: ChainId,
name: ChainName,
client: Arc<ChainClient<Self>>,
chain_store: Arc<dyn ChainStore>,
metrics_registry: Arc<MetricsRegistry>,
Expand Down
4 changes: 2 additions & 2 deletions chain/cosmos/src/chain.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use graph::blockchain::firehose_block_ingestor::FirehoseBlockIngestor;
use graph::blockchain::{BlockIngestor, NoopDecoderHook};
use graph::components::adapter::ChainId;
use graph::components::network_provider::ChainName;
use graph::env::EnvVars;
use graph::prelude::MetricsRegistry;
use graph::substreams::Clock;
Expand Down Expand Up @@ -37,7 +37,7 @@ use crate::{codec, TriggerFilter};

pub struct Chain {
logger_factory: LoggerFactory,
name: ChainId,
name: ChainName,
client: Arc<ChainClient<Self>>,
chain_store: Arc<dyn ChainStore>,
metrics_registry: Arc<MetricsRegistry>,
Expand Down
3 changes: 1 addition & 2 deletions chain/ethereum/examples/firehose.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use anyhow::Error;
use graph::{
endpoint::EndpointMetrics,
env::env_var,
firehose::{self, FirehoseEndpoint, NoopGenesisDecoder, SubgraphLimit},
firehose::{self, FirehoseEndpoint, SubgraphLimit},
log::logger,
prelude::{prost, tokio, tonic, MetricsRegistry},
};
Expand Down Expand Up @@ -38,7 +38,6 @@ async fn main() -> Result<(), Error> {
false,
SubgraphLimit::Unlimited,
metrics,
NoopGenesisDecoder::boxed(),
));

loop {
Expand Down
6 changes: 3 additions & 3 deletions chain/ethereum/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use graph::blockchain::firehose_block_ingestor::{FirehoseBlockIngestor, Transfor
use graph::blockchain::{
BlockIngestor, BlockTime, BlockchainKind, ChainIdentifier, TriggersAdapterSelector,
};
use graph::components::adapter::ChainId;
use graph::components::network_provider::ChainName;
use graph::components::store::DeploymentCursorTracker;
use graph::data::subgraph::UnifiedMappingApiVersion;
use graph::firehose::{FirehoseEndpoint, ForkStep};
Expand Down Expand Up @@ -288,7 +288,7 @@ impl RuntimeAdapterBuilder for EthereumRuntimeAdapterBuilder {

pub struct Chain {
logger_factory: LoggerFactory,
pub name: ChainId,
pub name: ChainName,
node_id: NodeId,
registry: Arc<MetricsRegistry>,
client: Arc<ChainClient<Self>>,
Expand All @@ -315,7 +315,7 @@ impl Chain {
/// Creates a new Ethereum [`Chain`].
pub fn new(
logger_factory: LoggerFactory,
name: ChainId,
name: ChainName,
node_id: NodeId,
registry: Arc<MetricsRegistry>,
chain_store: Arc<dyn ChainStore>,
Expand Down
8 changes: 4 additions & 4 deletions chain/ethereum/src/ingestor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::{chain::BlockFinality, ENV_VARS};
use crate::{EthereumAdapter, EthereumAdapterTrait as _};
use graph::blockchain::client::ChainClient;
use graph::blockchain::BlockchainKind;
use graph::components::adapter::ChainId;
use graph::components::network_provider::ChainName;
use graph::futures03::compat::Future01CompatExt as _;
use graph::slog::o;
use graph::util::backoff::ExponentialBackoff;
Expand All @@ -22,7 +22,7 @@ pub struct PollingBlockIngestor {
chain_client: Arc<ChainClient<crate::chain::Chain>>,
chain_store: Arc<dyn ChainStore>,
polling_interval: Duration,
network_name: ChainId,
network_name: ChainName,
}

impl PollingBlockIngestor {
Expand All @@ -32,7 +32,7 @@ impl PollingBlockIngestor {
chain_client: Arc<ChainClient<crate::chain::Chain>>,
chain_store: Arc<dyn ChainStore>,
polling_interval: Duration,
network_name: ChainId,
network_name: ChainName,
) -> Result<PollingBlockIngestor, Error> {
Ok(PollingBlockIngestor {
logger,
Expand Down Expand Up @@ -266,7 +266,7 @@ impl BlockIngestor for PollingBlockIngestor {
}
}

fn network_name(&self) -> ChainId {
fn network_name(&self) -> ChainName {
self.network_name.clone()
}

Expand Down
78 changes: 41 additions & 37 deletions chain/ethereum/src/network.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
use anyhow::{anyhow, bail};
use graph::blockchain::ChainIdentifier;
use graph::components::adapter::{ChainId, NetIdentifiable, ProviderManager, ProviderName};
use graph::components::network_provider::ChainName;
use graph::components::network_provider::NetworkDetails;
use graph::components::network_provider::ProviderManager;
use graph::components::network_provider::ProviderName;
use graph::endpoint::EndpointMetrics;
use graph::firehose::{AvailableCapacity, SubgraphLimit};
use graph::prelude::rand::seq::IteratorRandom;
use graph::prelude::rand::{self, Rng};
use itertools::Itertools;
use std::sync::Arc;

pub use graph::impl_slog_value;
Expand All @@ -29,13 +33,18 @@ pub struct EthereumNetworkAdapter {
}

#[async_trait]
impl NetIdentifiable for EthereumNetworkAdapter {
async fn net_identifiers(&self) -> Result<ChainIdentifier, anyhow::Error> {
self.adapter.net_identifiers().await
}
impl NetworkDetails for EthereumNetworkAdapter {
fn provider_name(&self) -> ProviderName {
self.adapter.provider().into()
}

async fn chain_identifier(&self) -> Result<ChainIdentifier, Error> {
self.adapter.net_identifiers().await
}

async fn provides_extended_blocks(&self) -> Result<bool, Error> {
Ok(true)
}
}

impl EthereumNetworkAdapter {
Expand Down Expand Up @@ -72,7 +81,7 @@ impl EthereumNetworkAdapter {

#[derive(Debug, Clone)]
pub struct EthereumNetworkAdapters {
chain_id: ChainId,
chain_id: ChainName,
manager: ProviderManager<EthereumNetworkAdapter>,
call_only_adapters: Vec<EthereumNetworkAdapter>,
// Percentage of request that should be used to retest errored adapters.
Expand All @@ -96,10 +105,10 @@ impl EthereumNetworkAdapters {
) -> Self {
use std::cmp::Ordering;

use graph::components::network_provider::ProviderCheckStrategy;
use graph::slog::{o, Discard, Logger};

use graph::components::adapter::NoopIdentValidator;
let chain_id: ChainId = "testing".into();
let chain_id: ChainName = "testing".into();
adapters.sort_by(|a, b| {
a.capabilities
.partial_cmp(&b.capabilities)
Expand All @@ -109,15 +118,14 @@ impl EthereumNetworkAdapters {
let provider = ProviderManager::new(
Logger::root(Discard, o!()),
vec![(chain_id.clone(), adapters)].into_iter(),
Arc::new(NoopIdentValidator),
ProviderCheckStrategy::MarkAsValid,
);
provider.mark_all_valid().await;

Self::new(chain_id, provider, call_only, None)
}

pub fn new(
chain_id: ChainId,
chain_id: ChainName,
manager: ProviderManager<EthereumNetworkAdapter>,
call_only_adapters: Vec<EthereumNetworkAdapter>,
retest_percent: Option<f64>,
Expand Down Expand Up @@ -159,8 +167,9 @@ impl EthereumNetworkAdapters {
) -> impl Iterator<Item = &EthereumNetworkAdapter> + '_ {
let all = self
.manager
.get_all(&self.chain_id)
.providers(&self.chain_id)
.await
.map(|adapters| adapters.collect_vec())
.unwrap_or_default();

Self::available_with_capabilities(all, required_capabilities)
Expand All @@ -172,7 +181,10 @@ impl EthereumNetworkAdapters {
&self,
required_capabilities: &NodeCapabilities,
) -> impl Iterator<Item = &EthereumNetworkAdapter> + '_ {
let all = self.manager.get_all_unverified(&self.chain_id);
let all = self
.manager
.providers_unchecked(&self.chain_id)
.collect_vec();

Self::available_with_capabilities(all, required_capabilities)
}
Expand Down Expand Up @@ -242,10 +254,10 @@ impl EthereumNetworkAdapters {
// EthereumAdapters are sorted by their NodeCapabilities when the EthereumNetworks
// struct is instantiated so they do not need to be sorted here
self.manager
.get_all(&self.chain_id)
.providers(&self.chain_id)
.await
.map(|mut adapters| adapters.next())
.unwrap_or_default()
.first()
.map(|ethereum_network_adapter| ethereum_network_adapter.adapter.clone())
}

Expand Down Expand Up @@ -299,7 +311,9 @@ impl EthereumNetworkAdapters {
#[cfg(test)]
mod tests {
use graph::cheap_clone::CheapClone;
use graph::components::adapter::{NoopIdentValidator, ProviderManager, ProviderName};
use graph::components::network_provider::ProviderCheckStrategy;
use graph::components::network_provider::ProviderManager;
use graph::components::network_provider::ProviderName;
use graph::data::value::Word;
use graph::http::HeaderMap;
use graph::{
Expand Down Expand Up @@ -746,18 +760,14 @@ mod tests {
.collect(),
)]
.into_iter(),
Arc::new(NoopIdentValidator),
ProviderCheckStrategy::MarkAsValid,
);
manager.mark_all_valid().await;

let no_retest_adapters = EthereumNetworkAdapters::new(
chain_id.clone(),
manager.cheap_clone(),
vec![],
Some(0f64),
);
let no_retest_adapters =
EthereumNetworkAdapters::new(chain_id.clone(), manager.clone(), vec![], Some(0f64));

let always_retest_adapters =
EthereumNetworkAdapters::new(chain_id, manager.cheap_clone(), vec![], Some(1f64));
EthereumNetworkAdapters::new(chain_id, manager.clone(), vec![], Some(1f64));

assert_eq!(
no_retest_adapters
Expand Down Expand Up @@ -842,16 +852,12 @@ mod tests {
.iter()
.cloned()
.map(|a| (chain_id.clone(), vec![a])),
Arc::new(NoopIdentValidator),
ProviderCheckStrategy::MarkAsValid,
);
manager.mark_all_valid().await;

let always_retest_adapters = EthereumNetworkAdapters::new(
chain_id.clone(),
manager.cheap_clone(),
vec![],
Some(1f64),
);
let always_retest_adapters =
EthereumNetworkAdapters::new(chain_id.clone(), manager.clone(), vec![], Some(1f64));

assert_eq!(
always_retest_adapters
.cheapest_with(&NodeCapabilities {
Expand All @@ -870,9 +876,8 @@ mod tests {
.iter()
.cloned()
.map(|a| (chain_id.clone(), vec![a])),
Arc::new(NoopIdentValidator),
ProviderCheckStrategy::MarkAsValid,
);
manager.mark_all_valid().await;

let no_retest_adapters =
EthereumNetworkAdapters::new(chain_id.clone(), manager, vec![], Some(0f64));
Expand Down Expand Up @@ -912,9 +917,8 @@ mod tests {
no_available_adapter.iter().cloned().collect(),
)]
.into_iter(),
Arc::new(NoopIdentValidator),
ProviderCheckStrategy::MarkAsValid,
);
manager.mark_all_valid().await;

let no_available_adapter = EthereumNetworkAdapters::new(chain_id, manager, vec![], None);
let res = no_available_adapter
Expand Down
2 changes: 1 addition & 1 deletion chain/ethereum/src/transport.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use graph::components::adapter::ProviderName;
use graph::components::network_provider::ProviderName;
use graph::endpoint::{EndpointMetrics, RequestLabels};
use jsonrpc_core::types::Call;
use jsonrpc_core::Value;
Expand Down
4 changes: 2 additions & 2 deletions chain/near/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use graph::blockchain::{
NoopRuntimeAdapter,
};
use graph::cheap_clone::CheapClone;
use graph::components::adapter::ChainId;
use graph::components::network_provider::ChainName;
use graph::components::store::DeploymentCursorTracker;
use graph::data::subgraph::UnifiedMappingApiVersion;
use graph::env::EnvVars;
Expand Down Expand Up @@ -161,7 +161,7 @@ impl BlockStreamBuilder<Chain> for NearStreamBuilder {

pub struct Chain {
logger_factory: LoggerFactory,
name: ChainId,
name: ChainName,
client: Arc<ChainClient<Self>>,
chain_store: Arc<dyn ChainStore>,
metrics_registry: Arc<MetricsRegistry>,
Expand Down
Loading

0 comments on commit 9458fc5

Please sign in to comment.