Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle null blocks #1

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ jobs:

integration-tests:
name: Run integration tests
if: false
runs-on: ubuntu-latest
timeout-minutes: 60
services:
Expand Down Expand Up @@ -222,4 +223,4 @@ jobs:
uses: actions-rs/cargo@v1
with:
command: check
args: --release
args: --release
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# Graph Node


[![Build Status](https://github.com/graphprotocol/graph-node/actions/workflows/ci.yml/badge.svg)](https://github.com/graphprotocol/graph-node/actions/workflows/ci.yml?query=branch%3Amaster)
[![Getting Started Docs](https://img.shields.io/badge/docs-getting--started-brightgreen.svg)](docs/getting-started.md)

Expand Down
5 changes: 4 additions & 1 deletion chain/arweave/src/chain.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
// Portions copyright (2023) Vulcanize, Inc.

use graph::anyhow;
use graph::blockchain::client::ChainClient;
use graph::blockchain::firehose_block_ingestor::FirehoseBlockIngestor;
Expand Down Expand Up @@ -188,7 +190,7 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
_from: BlockNumber,
_to: BlockNumber,
_filter: &TriggerFilter,
) -> Result<Vec<BlockWithTriggers<Chain>>, Error> {
) -> Result<(Vec<BlockWithTriggers<Chain>>, BlockNumber), Error> {
panic!("Should never be called since not used by FirehoseBlockStream")
}

Expand Down Expand Up @@ -237,6 +239,7 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
&self,
_ptr: BlockPtr,
_offset: BlockNumber,
_root: Option<BlockHash>,
) -> Result<Option<codec::Block>, Error> {
panic!("Should never be called since FirehoseBlockStream cannot resolve it")
}
Expand Down
5 changes: 4 additions & 1 deletion chain/cosmos/src/chain.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
// Portions copyright (2023) Vulcanize, Inc.

use graph::blockchain::firehose_block_ingestor::FirehoseBlockIngestor;
use graph::blockchain::BlockIngestor;
use graph::env::EnvVars;
Expand Down Expand Up @@ -183,6 +185,7 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
&self,
_ptr: BlockPtr,
_offset: BlockNumber,
_root: Option<BlockHash>,
) -> Result<Option<codec::Block>, Error> {
panic!("Should never be called since not used by FirehoseBlockStream")
}
Expand All @@ -192,7 +195,7 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
_from: BlockNumber,
_to: BlockNumber,
_filter: &TriggerFilter,
) -> Result<Vec<BlockWithTriggers<Chain>>, Error> {
) -> Result<(Vec<BlockWithTriggers<Chain>>, BlockNumber), Error> {
panic!("Should never be called since not used by FirehoseBlockStream")
}

Expand Down
23 changes: 15 additions & 8 deletions chain/ethereum/src/adapter.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
// Portions copyright (2023) Vulcanize, Inc.

use anyhow::Error;
use ethabi::{Error as ABIError, Function, ParamType, Token};
use futures::Future;
Expand Down Expand Up @@ -932,14 +934,8 @@ pub trait EthereumAdapter: Send + Sync + 'static {
block: LightEthereumBlock,
) -> Pin<Box<dyn std::future::Future<Output = Result<EthereumBlock, bc::IngestorError>> + Send>>;

/// Load block pointer for the specified `block number`.
fn block_pointer_from_number(
&self,
logger: &Logger,
block_number: BlockNumber,
) -> Box<dyn Future<Item = BlockPtr, Error = bc::IngestorError> + Send>;

/// Find a block by its number, according to the Ethereum node.
/// Find a block by its number, according to the Ethereum node. If `retries` is passed, limits
/// the number of attempts.
///
/// Careful: don't use this function without considering race conditions.
/// Chain reorgs could happen at any time, and could affect the answer received.
Expand All @@ -954,6 +950,17 @@ pub trait EthereumAdapter: Send + Sync + 'static {
block_number: BlockNumber,
) -> Box<dyn Future<Item = Option<H256>, Error = Error> + Send>;

/// Finds the hash and number of the lowest non-null block with height greater than or equal to
/// the given number.
///
/// Note that the same caveats on reorgs apply as for `block_hash_by_block_number`, and must
/// also be considered for the resolved block, in case it is higher than the requested number.
async fn nearest_block_hash_to_number(
&self,
logger: &Logger,
block_number: BlockNumber,
) -> Result<BlockPtr, Error>;

/// Call the function of a smart contract.
fn contract_call(
&self,
Expand Down
13 changes: 8 additions & 5 deletions chain/ethereum/src/chain.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
// Portions copyright (2023) Vulcanize, Inc.

use anyhow::{anyhow, bail, Result};
use anyhow::{Context, Error};
use graph::blockchain::client::ChainClient;
Expand Down Expand Up @@ -424,9 +426,9 @@ impl Blockchain for Chain {
.clone();

adapter
.block_pointer_from_number(logger, number)
.compat()
.nearest_block_hash_to_number(logger, number)
.await
.map_err(From::from)
}
}
}
Expand Down Expand Up @@ -617,7 +619,7 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
from: BlockNumber,
to: BlockNumber,
filter: &TriggerFilter,
) -> Result<Vec<BlockWithTriggers<Chain>>, Error> {
) -> Result<(Vec<BlockWithTriggers<Chain>>, BlockNumber), Error> {
blocks_with_triggers(
self.chain_client.rpc()?.cheapest_with(&self.capabilities)?,
self.logger.clone(),
Expand Down Expand Up @@ -651,7 +653,7 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
BlockFinality::Final(_) => {
let adapter = self.chain_client.rpc()?.cheapest_with(&self.capabilities)?;
let block_number = block.number() as BlockNumber;
let blocks = blocks_with_triggers(
let (blocks, _) = blocks_with_triggers(
adapter,
logger.clone(),
self.chain_store.clone(),
Expand Down Expand Up @@ -691,11 +693,12 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
&self,
ptr: BlockPtr,
offset: BlockNumber,
root: Option<BlockHash>,
) -> Result<Option<BlockFinality>, Error> {
let block: Option<EthereumBlock> = self
.chain_store
.cheap_clone()
.ancestor_block(ptr, offset)
.ancestor_block(ptr, offset, root)
.await?
.map(json::from_value)
.transpose()?;
Expand Down
143 changes: 91 additions & 52 deletions chain/ethereum/src/ethereum_adapter.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
// Portions copyright (2023) Vulcanize, Inc.

use futures::future;
use futures::prelude::*;
use futures03::{future::BoxFuture, stream::FuturesUnordered};
Expand Down Expand Up @@ -42,6 +44,7 @@ use std::iter::FromIterator;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Instant;
// use back_to_the_future::futures_await;

use crate::adapter::ProviderStatus;
use crate::chain::BlockFinality;
Expand Down Expand Up @@ -613,6 +616,7 @@ impl EthereumAdapter {
stream::iter_ok::<_, Error>(block_nums.into_iter().map(move |block_num| {
let web3 = web3.clone();
retry(format!("load block ptr {}", block_num), &logger)
.when(|res| !res.is_ok() && !detect_null_block(res))
.no_limit()
.timeout_secs(ENV_VARS.json_rpc_timeout.as_secs())
.run(move || {
Expand All @@ -632,8 +636,16 @@ impl EthereumAdapter {
.boxed()
.compat()
.from_err()
.then(|res| {
if detect_null_block(&res) {
Ok(None)
} else {
Some(res).transpose()
}
})
}))
.buffered(ENV_VARS.block_batch_size)
.filter_map(|b| b)
.map(|b| b.into())
}

Expand All @@ -652,13 +664,12 @@ impl EthereumAdapter {
logger: &Logger,
block_ptr: BlockPtr,
) -> Result<bool, Error> {
let block_hash = self
.block_hash_by_block_number(logger, block_ptr.number)
.compat()
// TODO: This considers null blocks, but we could instead bail if we encounter one as a
// small optimization.
let canonical_block = self
.nearest_block_hash_to_number(logger, block_ptr.number)
.await?;
block_hash
.ok_or_else(|| anyhow!("Ethereum node is missing block #{}", block_ptr.number))
.map(|block_hash| block_hash == block_ptr.hash_as_h256())
Ok(canonical_block == block_ptr)
}

pub(crate) fn logs_in_block_range(
Expand Down Expand Up @@ -901,6 +912,16 @@ impl EthereumAdapter {
}
}

// Detects null blocks as can occur on Filecoin EVM chains, by checking for the FEVM-specific
// error returned when requesting such a null round. Ideally there should be a defined reponse or
// message for this case, or a check that is less dependent on the Filecoin implementation.
fn detect_null_block<T>(res: &Result<T, Error>) -> bool {
match res {
Ok(_) => false,
Err(e) => e.to_string().contains("requested epoch was a null round"),
}
}

#[async_trait]
impl EthereumAdapterTrait for EthereumAdapter {
fn provider(&self) -> &str {
Expand Down Expand Up @@ -1190,26 +1211,6 @@ impl EthereumAdapterTrait for EthereumAdapter {
Box::pin(block_future)
}

fn block_pointer_from_number(
&self,
logger: &Logger,
block_number: BlockNumber,
) -> Box<dyn Future<Item = BlockPtr, Error = IngestorError> + Send> {
Box::new(
self.block_hash_by_block_number(logger, block_number)
.and_then(move |block_hash_opt| {
block_hash_opt.ok_or_else(|| {
anyhow!(
"Ethereum node could not find start block hash by block number {}",
&block_number
)
})
})
.from_err()
.map(move |block_hash| BlockPtr::from((block_hash, block_number))),
)
}

fn block_hash_by_block_number(
&self,
logger: &Logger,
Expand Down Expand Up @@ -1247,6 +1248,54 @@ impl EthereumAdapterTrait for EthereumAdapter {
)
}

async fn nearest_block_hash_to_number(
&self,
logger: &Logger,
block_number: BlockNumber,
) -> Result<BlockPtr, Error> {
let mut next_number = block_number;
loop {
let retry_log_message = format!(
"eth_getBlockByNumber RPC call for block number {}",
next_number
);
let web3 = self.web3.clone();
let logger = logger.clone();
let res = retry(retry_log_message, &logger)
.when(|res| !res.is_ok() && !detect_null_block(res))
.no_limit()
.timeout_secs(ENV_VARS.json_rpc_timeout.as_secs())
.run(move || {
let web3 = web3.cheap_clone();
async move {
web3.eth()
.block(BlockId::Number(next_number.into()))
.await
.map(|block_opt| block_opt.and_then(|block| block.hash))
.map_err(Error::from)
}
})
.await
.map_err(move |e| {
e.into_inner().unwrap_or_else(move || {
anyhow!(
"Ethereum node took too long to return data for block #{}",
next_number
)
})
});
if detect_null_block(&res) {
next_number += 1;
continue;
}
return match res {
Ok(Some(hash)) => Ok(BlockPtr::new(hash.into(), next_number)),
Ok(None) => Err(anyhow!("Block {} does not contain hash", next_number)),
Err(e) => Err(e),
};
}
}

fn contract_call(
&self,
logger: &Logger,
Expand Down Expand Up @@ -1396,9 +1445,10 @@ impl EthereumAdapterTrait for EthereumAdapter {
}
}

/// Returns blocks with triggers, corresponding to the specified range and filters.
/// Returns blocks with triggers, corresponding to the specified range and filters; and the resolved
/// `to` block, which is the nearest non-null block greater than or equal to the passed `to` block.
/// If a block contains no triggers, there may be no corresponding item in the stream.
/// However the `to` block will always be present, even if triggers are empty.
/// However the (resolved) `to` block will always be present, even if triggers are empty.
///
/// Careful: don't use this function without considering race conditions.
/// Chain reorgs could happen at any time, and could affect the answer received.
Expand All @@ -1418,7 +1468,7 @@ pub(crate) async fn blocks_with_triggers(
to: BlockNumber,
filter: &TriggerFilter,
unified_api_version: UnifiedMappingApiVersion,
) -> Result<Vec<BlockWithTriggers<crate::Chain>>, Error> {
) -> Result<(Vec<BlockWithTriggers<crate::Chain>>, BlockNumber), Error> {
// Each trigger filter needs to be queried for the same block range
// and the blocks yielded need to be deduped. If any error occurs
// while searching for a trigger type, the entire operation fails.
Expand All @@ -1429,6 +1479,12 @@ pub(crate) async fn blocks_with_triggers(
let trigger_futs: FuturesUnordered<BoxFuture<Result<Vec<EthereumTrigger>, anyhow::Error>>> =
FuturesUnordered::new();

// Resolve the nearest non-null "to" block
debug!(logger, "Finding nearest valid `to` block to {}", to);

let to_ptr = eth.nearest_block_hash_to_number(&logger, to).await?;
let to_hash = to_ptr.hash_as_h256();
let to = to_ptr.block_number();
// This is for `start` triggers which can be initialization handlers which needs to be run
// before all other triggers
if filter.block.trigger_every_block {
Expand Down Expand Up @@ -1497,28 +1553,11 @@ pub(crate) async fn blocks_with_triggers(
trigger_futs.push(block_future)
}

// Get hash for "to" block
let to_hash_fut = eth
.block_hash_by_block_number(&logger, to)
.and_then(|hash| match hash {
Some(hash) => Ok(hash),
None => {
warn!(logger,
"Ethereum endpoint is behind";
"url" => eth.provider()
);
bail!("Block {} not found in the chain", to)
}
})
.compat();

// Join on triggers and block hash resolution
let (triggers, to_hash) = futures03::join!(trigger_futs.try_concat(), to_hash_fut);

// Unpack and handle possible errors in the previously joined futures
let triggers =
triggers.with_context(|| format!("Failed to obtain triggers for block {}", to))?;
let to_hash = to_hash.with_context(|| format!("Failed to infer hash for block {}", to))?;
// Join on triggers, unpack and handle possible errors
let triggers = trigger_futs
.try_concat()
.await
.with_context(|| format!("Failed to obtain triggers for block {}", to))?;

let mut block_hashes: HashSet<H256> =
triggers.iter().map(EthereumTrigger::block_hash).collect();
Expand Down Expand Up @@ -1583,7 +1622,7 @@ pub(crate) async fn blocks_with_triggers(
));
}

Ok(blocks)
Ok((blocks, to))
}

pub(crate) async fn get_calls(
Expand Down
Loading
Loading