Skip to content

Commit

Permalink
fix(composer)!: update to work with appside mempool (#1643)
Browse files Browse the repository at this point in the history
## Summary
Updates composer to work with the new appside mempool. New ABCI error
codes were added and now nonces can be stacked, so the submission logic
should use the GRPC `pending_nonce()` instead of the RPC
`get_latest_nonce()`.

## Background
New ABCI error codes were added in PR
#1515 which need to be handled
in the composer's submission logic.

New Error codes:
- `NONCE_TAKEN` -> try resubmitting with new top-of-pending. If this is
happening because the composer is out of funds, that is okay as the
composer has it's own fund monitoring logic
- `PARKED_FULL` -> normal error case ok
- `ACCOUNT_SIZE_LIMIT` -> normal error case ok 
- `ALREADY_PRESENT` -> normal error case ok, shouldn't be returned 

## Background
We changed the internals of the mempool which resulted in the users of
the mempool to have changes as well.

## Testing
Smoke tests

## Related Issues
closes #1633

---------

Co-authored-by: quasystaty <[email protected]>
  • Loading branch information
Lilyjjo and quasystaty1 authored Oct 14, 2024
1 parent 80b8cf9 commit acfd370
Show file tree
Hide file tree
Showing 18 changed files with 326 additions and 836 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion charts/composer/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ type: application
# This is the chart version. This version number should be incremented each time you make changes
# to the chart and its templates, including the app version.
# Versions are expected to follow Semantic Versioning (https://semver.org/)
version: 0.1.5
version: 0.1.6

# This is the version number of the application being deployed. This version number should be
# incremented each time you make changes to the application. Versions are not expected to
Expand Down
4 changes: 3 additions & 1 deletion charts/composer/templates/configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ data:
ASTRIA_COMPOSER_API_LISTEN_ADDR: "0.0.0.0:{{ .Values.ports.healthApi }}"
ASTRIA_COMPOSER_GRPC_ADDR: "0.0.0.0:{{ .Values.ports.grpc }}"
ASTRIA_COMPOSER_SEQUENCER_CHAIN_ID: "{{ tpl .Values.config.sequencerChainId . }}"
ASTRIA_COMPOSER_SEQUENCER_URL: "{{ tpl .Values.config.sequencerRpc . }}"
ASTRIA_COMPOSER_ROLLUPS: "{{ include "composer.rollups" . }}"
ASTRIA_COMPOSER_PRIVATE_KEY_FILE: "/var/secrets/{{ .Values.config.privateKey.secret.filename }}"
ASTRIA_COMPOSER_MAX_BYTES_PER_BUNDLE: "{{ .Values.config.maxBytesPerBundle }}"
Expand All @@ -30,7 +29,10 @@ data:
OTEL_EXPORTER_OTLP_TRACE_HEADERS: "{{ tpl .Values.otel.traceHeaders . }}"
OTEL_SERVICE_NAME: "{{ tpl .Values.otel.serviceName . }}"
{{- if not .Values.global.dev }}
ASTRIA_COMPOSER_SEQUENCER_URL: "{{ tpl .Values.config.sequencerRpc . }}"
{{- else }}
ASTRIA_COMPOSER_SEQUENCER_ABCI_ENDPOINT: "{{ tpl .Values.config.sequencerRpc . }}"
ASTRIA_COMPOSER_SEQUENCER_GRPC_ENDPOINT: "{{ tpl .Values.config.sequencerGrpc . }}"
{{- end }}
---
{{- if not .Values.secretProvider.enabled }}
Expand Down
1 change: 1 addition & 0 deletions charts/composer/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ config:
sequencerAddressPrefix: astria
sequencerNativeAssetBaseDenomination: "nria"
sequencerRpc: ""
sequencerGrpc: ""
sequencerChainId: ""
privateKey:
devContent: ""
Expand Down
6 changes: 3 additions & 3 deletions charts/evm-stack/Chart.lock
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ dependencies:
version: 0.27.6
- name: composer
repository: file://../composer
version: 0.1.5
version: 0.1.6
- name: evm-faucet
repository: file://../evm-faucet
version: 0.1.2
Expand All @@ -20,5 +20,5 @@ dependencies:
- name: blockscout-stack
repository: https://blockscout.github.io/helm-charts
version: 1.6.2
digest: sha256:0428a6d56fd86c170e322ad79c7b5f87628b6187a1df5ad47ae7c2281b7f12da
generated: "2024-10-14T15:11:45.153501+02:00"
digest: sha256:80a70740a70f834b6ff6cdcfbb5f4a3504d6963f784ff678d1d52a7284b1dc20
generated: "2024-10-14T16:04:40.995885+02:00"
4 changes: 2 additions & 2 deletions charts/evm-stack/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ type: application
# This is the chart version. This version number should be incremented each time you make changes
# to the chart and its templates, including the app version.
# Versions are expected to follow Semantic Versioning (https://semver.org/)
version: 0.6.3
version: 0.6.4

dependencies:
- name: celestia-node
Expand All @@ -26,7 +26,7 @@ dependencies:
version: 0.27.6
repository: "file://../evm-rollup"
- name: composer
version: 0.1.5
version: 0.1.6
repository: "file://../composer"
condition: composer.enabled
- name: evm-faucet
Expand Down
7 changes: 6 additions & 1 deletion crates/astria-composer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,11 @@ name = "astria-composer"

[dependencies]
astria-build-info = { path = "../astria-build-info", features = ["runtime"] }
astria-core = { path = "../astria-core", features = ["serde", "server"] }
astria-core = { path = "../astria-core", features = [
"client",
"serde",
"server",
] }
astria-eyre = { path = "../astria-eyre" }
config = { package = "astria-config", path = "../astria-config" }
telemetry = { package = "astria-telemetry", path = "../astria-telemetry", features = [
Expand Down Expand Up @@ -59,6 +63,7 @@ path = "../astria-sequencer-client"
features = ["http"]

[dev-dependencies]
astria-grpc-mock = { path = "../astria-grpc-mock" }
config = { package = "astria-config", path = "../astria-config", features = [
"tests",
] }
Expand Down
7 changes: 5 additions & 2 deletions crates/astria-composer/local.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,11 @@ NO_COLOR=
# Address of the API server
ASTRIA_COMPOSER_API_LISTEN_ADDR="0.0.0.0:0"

# Address of the RPC server for the sequencer chain
ASTRIA_COMPOSER_SEQUENCER_URL="http://127.0.0.1:26657"
# Address of the ABCI server for the sequencer chain
ASTRIA_COMPOSER_SEQUENCER_ABCI_ENDPOINT="http://127.0.0.1:26657"

# Address of the gRPC server for the sequencer chain
ASTRIA_COMPOSER_SEQUENCER_GRPC_ENDPOINT="http://127.0.0.1:8080"

# Chain ID of the sequencer chain which transactions are submitted to.
ASTRIA_COMPOSER_SEQUENCER_CHAIN_ID="astria-dev-1"
Expand Down
3 changes: 2 additions & 1 deletion crates/astria-composer/src/composer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,8 @@ impl Composer {
let shutdown_token = CancellationToken::new();

let (executor, executor_handle) = executor::Builder {
sequencer_url: cfg.sequencer_url.clone(),
sequencer_abci_endpoint: cfg.sequencer_abci_endpoint.clone(),
sequencer_grpc_endpoint: cfg.sequencer_grpc_endpoint.clone(),
sequencer_chain_id: cfg.sequencer_chain_id.clone(),
private_key_file: cfg.private_key_file.clone(),
sequencer_address_prefix: cfg.sequencer_address_prefix.clone(),
Expand Down
7 changes: 5 additions & 2 deletions crates/astria-composer/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,11 @@ pub struct Config {
/// Address of the API server
pub api_listen_addr: SocketAddr,

/// Address of the RPC server for the sequencer chain
pub sequencer_url: String,
/// Address of the ABCI server for the sequencer chain
pub sequencer_abci_endpoint: String,

/// Address of the GRPC server for the sequencer chain
pub sequencer_grpc_endpoint: String,

/// The chain ID of the sequencer chain
pub sequencer_chain_id: String,
Expand Down
31 changes: 26 additions & 5 deletions crates/astria-composer/src/executor/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::{

use astria_core::{
crypto::SigningKey,
generated::sequencerblock::v1alpha1::sequencer_service_client::SequencerServiceClient,
primitive::v1::Address,
protocol::transaction::v1alpha1::action::Sequence,
};
Expand All @@ -24,7 +25,8 @@ use crate::{
};

pub(crate) struct Builder {
pub(crate) sequencer_url: String,
pub(crate) sequencer_abci_endpoint: String,
pub(crate) sequencer_grpc_endpoint: String,
pub(crate) sequencer_chain_id: String,
pub(crate) private_key_file: String,
pub(crate) sequencer_address_prefix: String,
Expand All @@ -38,7 +40,8 @@ pub(crate) struct Builder {
impl Builder {
pub(crate) fn build(self) -> eyre::Result<(super::Executor, executor::Handle)> {
let Self {
sequencer_url,
sequencer_abci_endpoint,
sequencer_grpc_endpoint,
sequencer_chain_id,
private_key_file,
sequencer_address_prefix,
Expand All @@ -48,8 +51,14 @@ impl Builder {
shutdown_token,
metrics,
} = self;
let sequencer_client = sequencer_client::HttpClient::new(sequencer_url.as_str())
.wrap_err("failed constructing sequencer client")?;
let abci_client = sequencer_client::HttpClient::new(sequencer_abci_endpoint.as_str())
.wrap_err("failed constructing sequencer http client")?;

let grpc_client =
connect_sequencer_grpc(sequencer_grpc_endpoint.as_str()).wrap_err_with(|| {
format!("failed to connect to sequencer over gRPC at `{sequencer_grpc_endpoint}`")
})?;

let (status, _) = watch::channel(Status::new());

let sequencer_key = read_signing_key_from_file(&private_key_file).wrap_err_with(|| {
Expand All @@ -69,7 +78,8 @@ impl Builder {
super::Executor {
status,
serialized_rollup_transactions: serialized_rollup_transaction_rx,
sequencer_client,
abci_client,
grpc_client,
sequencer_chain_id,
sequencer_key,
address: sequencer_address,
Expand All @@ -91,3 +101,14 @@ fn read_signing_key_from_file<P: AsRef<Path>>(path: P) -> eyre::Result<SigningKe
.map_err(|_| eyre!("invalid private key length; must be 32 bytes"))?;
Ok(SigningKey::from(private_key_bytes))
}

fn connect_sequencer_grpc(
grpc_endpoint: &str,
) -> eyre::Result<SequencerServiceClient<tonic::transport::Channel>> {
let uri: tonic::transport::Uri = grpc_endpoint
.parse()
.wrap_err("failed to parse endpoint as URI")?;
Ok(SequencerServiceClient::new(
tonic::transport::Endpoint::from(uri).connect_lazy(),
))
}
73 changes: 45 additions & 28 deletions crates/astria-composer/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,13 @@ use std::{

use astria_core::{
crypto::SigningKey,
generated::sequencerblock::v1alpha1::{
sequencer_service_client::{
self,
SequencerServiceClient,
},
GetPendingNonceRequest,
},
protocol::{
abci::AbciErrorCode,
transaction::v1alpha1::{
Expand Down Expand Up @@ -64,6 +71,7 @@ use tokio::{
},
};
use tokio_util::sync::CancellationToken;
use tonic::transport::Channel;
use tracing::{
debug,
error,
Expand All @@ -89,8 +97,6 @@ use crate::{
mod bundle_factory;

pub(crate) mod builder;
#[cfg(test)]
mod tests;

pub(crate) use builder::Builder;

Expand Down Expand Up @@ -122,8 +128,10 @@ pub(super) struct Executor {
// Channel for receiving `SequenceAction`s to be bundled.
serialized_rollup_transactions: mpsc::Receiver<Sequence>,
// The client for submitting wrapped and signed pending eth transactions to the astria
// sequencer.
sequencer_client: sequencer_client::HttpClient,
// sequencer via the ABCI client.
abci_client: sequencer_client::HttpClient,
// The grpc client for grabbing the latest nonce from.
grpc_client: sequencer_service_client::SequencerServiceClient<Channel>,
// The chain id used for submission of transactions to the sequencer.
sequencer_chain_id: String,
// Private key used to sign sequencer transactions
Expand Down Expand Up @@ -197,7 +205,8 @@ impl Executor {
metrics: &'static Metrics,
) -> Fuse<Instrumented<SubmitFut>> {
SubmitFut {
client: self.sequencer_client.clone(),
abci_client: self.abci_client.clone(),
grpc_client: self.grpc_client.clone(),
address: self.address,
nonce,
chain_id: self.sequencer_chain_id.clone(),
Expand Down Expand Up @@ -332,7 +341,7 @@ impl Executor {
self.ensure_chain_id_is_correct()
.await
.wrap_err("failed to validate chain id")?;
let nonce = get_latest_nonce(self.sequencer_client.clone(), self.address, self.metrics)
let nonce = get_pending_nonce(self.grpc_client.clone(), self.address, self.metrics)
.await
.wrap_err("failed getting initial nonce from sequencer")?;
Ok(nonce)
Expand Down Expand Up @@ -378,10 +387,9 @@ impl Executor {
futures::future::ready(())
},
);
let client_genesis: tendermint::Genesis =
tryhard::retry_fn(|| self.sequencer_client.genesis())
.with_config(retry_config)
.await?;
let client_genesis: tendermint::Genesis = tryhard::retry_fn(|| self.abci_client.genesis())
.with_config(retry_config)
.await?;
Ok(client_genesis.chain_id)
}

Expand Down Expand Up @@ -460,23 +468,21 @@ impl Executor {
}
}

/// Queries the sequencer for the latest nonce with an exponential backoff
#[instrument(name = "get latest nonce", skip_all, fields(%address), err)]
async fn get_latest_nonce(
client: sequencer_client::HttpClient,
/// Queries the sequencer for the latest pending nonce with an exponential backoff
#[instrument(name = "get pending nonce", skip_all, fields(%address), err)]
async fn get_pending_nonce(
client: sequencer_service_client::SequencerServiceClient<Channel>,
address: Address,
metrics: &Metrics,
) -> eyre::Result<u32> {
debug!("fetching latest nonce from sequencer");
debug!("fetching pending nonce from sequencer");
let span = Span::current();
let start = Instant::now();
let retry_config = tryhard::RetryFutureConfig::new(1024)
.exponential_backoff(Duration::from_millis(200))
.max_delay(Duration::from_secs(60))
.on_retry(
|attempt,
next_delay: Option<Duration>,
err: &sequencer_client::extension_trait::Error| {
|attempt, next_delay: Option<Duration>, err: &tonic::Status| {
metrics.increment_nonce_fetch_failure_count();

let wait_duration = next_delay
Expand All @@ -493,14 +499,22 @@ async fn get_latest_nonce(
},
);
let res = tryhard::retry_fn(|| {
let client = client.clone();
let span = info_span!(parent: span.clone(), "attempt get nonce");
let mut client = client.clone();
let span = info_span!(parent: span.clone(), "attempt get pending nonce");
metrics.increment_nonce_fetch_count();
async move { client.get_latest_nonce(address).await.map(|rsp| rsp.nonce) }.instrument(span)
async move {
client
.get_pending_nonce(GetPendingNonceRequest {
address: Some(address.into_raw()),
})
.await
.map(|rsp| rsp.into_inner().inner)
}
.instrument(span)
})
.with_config(retry_config)
.await
.wrap_err("failed getting latest nonce from sequencer after 1024 attempts");
.wrap_err("failed getting pending nonce from sequencer after 1024 attempts");

metrics.record_nonce_fetch_latency(start.elapsed());

Expand Down Expand Up @@ -637,7 +651,8 @@ pin_project! {
/// If the sequencer returned a non-zero abci code (albeit not `INVALID_NONCE`), this future will return with
/// that nonce it used to submit the non-zero abci code request.
struct SubmitFut {
client: sequencer_client::HttpClient,
abci_client: sequencer_client::HttpClient,
grpc_client: SequencerServiceClient<tonic::transport::Channel>,
address: Address,
chain_id: String,
nonce: u32,
Expand Down Expand Up @@ -670,6 +685,8 @@ impl Future for SubmitFut {
// FIXME (https://github.com/astriaorg/astria/issues/1572): This function is too long and should be refactored.
fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
const INVALID_NONCE: Code = Code::Err(AbciErrorCode::INVALID_NONCE.value());
const NONCE_TAKEN: Code = Code::Err(AbciErrorCode::NONCE_TAKEN.value());

loop {
let this = self.as_mut().project();

Expand All @@ -686,7 +703,7 @@ impl Future for SubmitFut {
"submitting transaction to sequencer",
);
SubmitState::WaitingForSend {
fut: submit_tx(this.client.clone(), tx, self.metrics).boxed(),
fut: submit_tx(this.abci_client.clone(), tx, self.metrics).boxed(),
}
}

Expand All @@ -708,14 +725,14 @@ impl Future for SubmitFut {
.checked_add(1)
.expect("nonce should not overflow")));
}
INVALID_NONCE => {
INVALID_NONCE | NONCE_TAKEN => {
info!(
"sequencer rejected transaction due to invalid nonce; fetching \
new nonce"
);
SubmitState::WaitingForNonce {
fut: get_latest_nonce(
this.client.clone(),
fut: get_pending_nonce(
this.grpc_client.clone(),
*this.address,
self.metrics,
)
Expand Down Expand Up @@ -759,7 +776,7 @@ impl Future for SubmitFut {
"resubmitting transaction to sequencer with new nonce",
);
SubmitState::WaitingForSend {
fut: submit_tx(this.client.clone(), tx, self.metrics).boxed(),
fut: submit_tx(this.abci_client.clone(), tx, self.metrics).boxed(),
}
}
Err(error) => {
Expand Down
Loading

0 comments on commit acfd370

Please sign in to comment.