Skip to content

Commit

Permalink
test(bridge withdrawer): remove unit tests and add blackbox tests (#1232
Browse files Browse the repository at this point in the history
)

## Summary
changes made in #1215 required adding a mock gRPC server. Moving to
black box tests was the next step planned so this will replace the
invalidated unit tests with valid black box tests.

## Background
Brief background on why these changes were made, ie "why?"

## Changes
* Add black box tests for native and erc20 sequencer and ics20
withdrawals
* test helpers for the mock cometbft server
* test helpers for the mock sequencer gRPC server
* test helpers for managing anvil & ethereum node
* remove the "unit tests" in `submitter/tests.rs`
* some logs i found useful to add while putting this together

## Testing
* native withdrawal to the sequencer works
* native withdrawal with ics20 works
* erc20 to the sequencer works
* erc20 with ics20 works

## Related Issues
Link any issues that are related, prefer full github links.

closes #1227

---------

Co-authored-by: Richard Janis Goldschmidt <[email protected]>
  • Loading branch information
itamarreif and SuperFluffy authored Jul 29, 2024
1 parent c19812d commit 8fa15ce
Show file tree
Hide file tree
Showing 13 changed files with 1,780 additions and 99 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions crates/astria-bridge-withdrawer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,14 @@ telemetry = { package = "astria-telemetry", path = "../astria-telemetry", featur

[dev-dependencies]
astria-core = { path = "../astria-core", features = ["server", "test-utils"] }
astria-grpc-mock = { path = "../astria-grpc-mock" }
config = { package = "astria-config", path = "../astria-config", features = [
"tests",
] }
reqwest = { workspace = true, features = ["json"] }
tempfile = { workspace = true }
tendermint-rpc = { workspace = true }
tokio-stream = { workspace = true, features = ["net"] }
wiremock = { workspace = true }

[build-dependencies]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ use ethers::{
use tokio::select;
use tokio_util::sync::CancellationToken;
use tracing::{
debug,
info,
instrument,
warn,
};

Expand Down Expand Up @@ -99,38 +101,25 @@ pub(crate) struct Watcher {
state: Arc<State>,
}

struct FullyInitialized {
shutdown_token: CancellationToken,
submitter_handle: submitter::Handle,
state: Arc<State>,
provider: Arc<Provider<Ws>>,
action_fetcher: GetWithdrawalActions<Provider<Ws>>,
starting_rollup_height: u64,
}

impl Watcher {
pub(crate) async fn run(mut self) -> Result<()> {
let (provider, action_fetcher, next_rollup_block_height) = self
pub(crate) async fn run(self) -> Result<()> {
let fully_init = self
.startup()
.await
.wrap_err("watcher failed to start up")?;

let Self {
state,
shutdown_token,
submitter_handle,
..
} = self;

state.set_watcher_ready();
fully_init.state.set_watcher_ready();

tokio::select! {
res = watch_for_blocks(
provider,
action_fetcher,
next_rollup_block_height,
submitter_handle,
shutdown_token.clone(),
) => {
info!("block handler exited");
res.context("block handler exited")
}
() = shutdown_token.cancelled() => {
info!("watcher shutting down");
Ok(())
}
}
fully_init.run().await
}

/// Gets the startup data from the submitter and connects to the Ethereum node.
Expand All @@ -142,23 +131,39 @@ impl Watcher {
/// - If the fee asset ID provided in the config is not a valid fee asset on the sequencer.
/// - If the Ethereum node cannot be connected to after several retries.
/// - If the asset withdrawal decimals cannot be fetched.
async fn startup(
&mut self,
) -> eyre::Result<(Arc<Provider<Ws>>, GetWithdrawalActions<Provider<Ws>>, u64)> {
#[instrument(skip_all, err)]
async fn startup(self) -> eyre::Result<FullyInitialized> {
let Self {
shutdown_token,
mut startup_handle,
submitter_handle,
contract_address,
ethereum_rpc_endpoint,
rollup_asset_denom,
bridge_address,
state,
} = self;

let startup::Info {
fee_asset,
starting_rollup_height,
..
} = select! {
() = self.shutdown_token.cancelled() => {
() = shutdown_token.cancelled() => {
return Err(eyre!("watcher received shutdown signal while waiting for startup"));
}

startup_info = self.startup_handle.get_info() => {
startup_info = startup_handle.get_info() => {
startup_info.wrap_err("failed to receive startup info")?
}
};

debug!(
fee_asset = %fee_asset,
starting_rollup_height = starting_rollup_height,
"received startup info"
);

// connect to eth node
let retry_config = tryhard::RetryFutureConfig::new(1024)
.exponential_backoff(Duration::from_millis(500))
Expand All @@ -179,7 +184,7 @@ impl Watcher {
);

let provider = tryhard::retry_fn(|| {
let url = self.ethereum_rpc_endpoint.clone();
let url = ethereum_rpc_endpoint.clone();
async move {
let websocket_client = Ws::connect_with_reconnects(url, 0).await?;
Ok(Provider::new(websocket_client))
Expand All @@ -190,62 +195,79 @@ impl Watcher {
.wrap_err("failed connecting to rollup after several retries; giving up")?;

let provider = Arc::new(provider);
let ics20_asset_to_withdraw = if self.rollup_asset_denom.last_channel().is_some() {
let ics20_asset_to_withdraw = if rollup_asset_denom.last_channel().is_some() {
info!(
rollup_asset_denom = %self.rollup_asset_denom,
%rollup_asset_denom,
"configured rollup asset contains an ics20 channel; ics20 withdrawals will be emitted"
);
Some(self.rollup_asset_denom.clone())
Some(rollup_asset_denom.clone())
} else {
info!(
rollup_asset_denom = %self.rollup_asset_denom,
%rollup_asset_denom,
"configured rollup asset does not contain an ics20 channel; ics20 withdrawals will not be emitted"
);
None
};
let action_fetcher = GetWithdrawalActionsBuilder::new()
.provider(provider.clone())
.fee_asset(fee_asset)
.contract_address(self.contract_address)
.bridge_address(self.bridge_address)
.sequencer_asset_to_withdraw(self.rollup_asset_denom.clone().into())
.contract_address(contract_address)
.bridge_address(bridge_address)
.sequencer_asset_to_withdraw(rollup_asset_denom.clone().into())
.set_ics20_asset_to_withdraw(ics20_asset_to_withdraw)
.try_build()
.await
.wrap_err("failed to construct contract event to sequencer action fetcher")?;

self.state.set_watcher_ready();
Ok(FullyInitialized {
shutdown_token,
submitter_handle,
state,
provider,
action_fetcher,
starting_rollup_height,
})
}
}

Ok((provider.clone(), action_fetcher, starting_rollup_height))
impl FullyInitialized {
async fn run(self) -> eyre::Result<()> {
tokio::select! {
res = watch_for_blocks(
self.provider,
self.action_fetcher,
self.starting_rollup_height,
self.submitter_handle,
self.shutdown_token.clone(),
) => {
res.context("block handler exited")
}
() = self.shutdown_token.cancelled() => {
Ok(())
}
}
}
}

async fn sync_from_next_rollup_block_height(
#[instrument(skip_all, fields(from_rollup_height, to_rollup_height), err)]
async fn sync_unprocessed_rollup_heights(
provider: Arc<Provider<Ws>>,
action_fetcher: &GetWithdrawalActions<Provider<Ws>>,
submitter_handle: &submitter::Handle,
next_rollup_block_height_to_check: u64,
current_rollup_block_height: u64,
from_rollup_height: u64,
to_rollup_height: u64,
) -> Result<()> {
if current_rollup_block_height < next_rollup_block_height_to_check {
return Ok(());
}

for i in next_rollup_block_height_to_check..=current_rollup_block_height {
let Some(block) = provider
for i in from_rollup_height..=to_rollup_height {
let block = provider
.get_block(i)
.await
.wrap_err("failed to get block")?
else {
bail!("block with number {i} missing");
};

get_and_send_events_at_block(action_fetcher, block, submitter_handle)
.map_err(eyre::Report::new)
.and_then(|block| block.ok_or_eyre("block is missing"))
.wrap_err_with(|| format!("failed to get block at rollup height `{i}`"))?;
get_and_forward_block_events(action_fetcher, block, submitter_handle)
.await
.wrap_err("failed to get and send events at block")?;
}

info!("synced from {next_rollup_block_height_to_check} to {current_rollup_block_height}");
Ok(())
}

Expand All @@ -271,9 +293,15 @@ async fn watch_for_blocks(
bail!("current rollup block missing block number")
};

info!(
block.height = current_rollup_block_height.as_u64(),
block.hash = current_rollup_block.hash.map(tracing::field::display),
"got current block"
);

// sync any blocks missing between `next_rollup_block_height` and the current latest
// (inclusive).
sync_from_next_rollup_block_height(
sync_unprocessed_rollup_heights(
provider.clone(),
&action_fetcher,
&submitter_handle,
Expand All @@ -291,7 +319,7 @@ async fn watch_for_blocks(
}
block = block_rx.next() => {
if let Some(block) = block {
get_and_send_events_at_block(
get_and_forward_block_events(
&action_fetcher,
block,
&submitter_handle,
Expand All @@ -306,7 +334,11 @@ async fn watch_for_blocks(
}
}

async fn get_and_send_events_at_block(
#[instrument(skip_all, fields(
block.hash = block.hash.map(tracing::field::display),
block.number = block.number.map(tracing::field::display),
), err)]
async fn get_and_forward_block_events(
actions_fetcher: &GetWithdrawalActions<Provider<Ws>>,
block: Block<H256>,
submitter_handle: &submitter::Handle,
Expand All @@ -319,12 +351,7 @@ async fn get_and_send_events_at_block(
let actions = actions_fetcher
.get_for_block_hash(block_hash)
.await
.wrap_err_with(|| {
format!(
"failed getting actions for block; block hash: `{block_hash}`, block height: \
`{rollup_height}`"
)
})?;
.wrap_err("failed getting actions for block")?;

if actions.is_empty() {
info!(
Expand Down
10 changes: 7 additions & 3 deletions crates/astria-bridge-withdrawer/src/bridge_withdrawer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,10 @@ impl BridgeWithdrawer {
Ok((service, shutdown_handle))
}

pub fn local_addr(&self) -> SocketAddr {
self.api_server.local_addr()
}

// Panic won't happen because `startup_task` is unwraped lazily after checking if it's `Some`.
#[allow(clippy::missing_panics_doc)]
pub async fn run(self) {
Expand Down Expand Up @@ -346,12 +350,12 @@ impl Shutdown {
.await
.map(flatten_result)
{
Ok(Ok(())) => info!("withdrawer exited gracefully"),
Ok(Err(error)) => error!(%error, "withdrawer exited with an error"),
Ok(Ok(())) => info!("submitter exited gracefully"),
Ok(Err(error)) => error!(%error, "submitter exited with an error"),
Err(_) => {
error!(
timeout_secs = limit.as_secs(),
"watcher did not shut down within timeout; killing it"
"submitter did not shut down within timeout; killing it"
);
submitter_task.abort();
}
Expand Down
Loading

0 comments on commit 8fa15ce

Please sign in to comment.