Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(composer)!: update to work with appside mempool #1643

Merged
merged 21 commits into from
Oct 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion charts/composer/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ type: application
# This is the chart version. This version number should be incremented each time you make changes
# to the chart and its templates, including the app version.
# Versions are expected to follow Semantic Versioning (https://semver.org/)
version: 0.1.5
version: 0.1.6

# This is the version number of the application being deployed. This version number should be
# incremented each time you make changes to the application. Versions are not expected to
Expand Down
4 changes: 3 additions & 1 deletion charts/composer/templates/configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ data:
ASTRIA_COMPOSER_API_LISTEN_ADDR: "0.0.0.0:{{ .Values.ports.healthApi }}"
ASTRIA_COMPOSER_GRPC_ADDR: "0.0.0.0:{{ .Values.ports.grpc }}"
ASTRIA_COMPOSER_SEQUENCER_CHAIN_ID: "{{ tpl .Values.config.sequencerChainId . }}"
ASTRIA_COMPOSER_SEQUENCER_URL: "{{ tpl .Values.config.sequencerRpc . }}"
ASTRIA_COMPOSER_ROLLUPS: "{{ include "composer.rollups" . }}"
ASTRIA_COMPOSER_PRIVATE_KEY_FILE: "/var/secrets/{{ .Values.config.privateKey.secret.filename }}"
ASTRIA_COMPOSER_MAX_BYTES_PER_BUNDLE: "{{ .Values.config.maxBytesPerBundle }}"
Expand All @@ -30,7 +29,10 @@ data:
OTEL_EXPORTER_OTLP_TRACE_HEADERS: "{{ tpl .Values.otel.traceHeaders . }}"
OTEL_SERVICE_NAME: "{{ tpl .Values.otel.serviceName . }}"
{{- if not .Values.global.dev }}
Lilyjjo marked this conversation as resolved.
Show resolved Hide resolved
ASTRIA_COMPOSER_SEQUENCER_URL: "{{ tpl .Values.config.sequencerRpc . }}"
{{- else }}
ASTRIA_COMPOSER_SEQUENCER_ABCI_ENDPOINT: "{{ tpl .Values.config.sequencerRpc . }}"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when .Values.global.dev sets to true it marks a breaking change to configs. Therefore, new env variables should be added to else bracket while values being removed should be added to {{- if not .Values.global.dev }} bracket.

ASTRIA_COMPOSER_SEQUENCER_GRPC_ENDPOINT: "{{ tpl .Values.config.sequencerGrpc . }}"
{{- end }}
---
{{- if not .Values.secretProvider.enabled }}
Expand Down
1 change: 1 addition & 0 deletions charts/composer/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ config:
sequencerAddressPrefix: astria
sequencerNativeAssetBaseDenomination: "nria"
sequencerRpc: ""
sequencerGrpc: ""
sequencerChainId: ""
privateKey:
devContent: ""
Expand Down
6 changes: 3 additions & 3 deletions charts/evm-stack/Chart.lock
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ dependencies:
version: 0.27.6
- name: composer
repository: file://../composer
version: 0.1.5
version: 0.1.6
- name: evm-faucet
repository: file://../evm-faucet
version: 0.1.2
Expand All @@ -20,5 +20,5 @@ dependencies:
- name: blockscout-stack
repository: https://blockscout.github.io/helm-charts
version: 1.6.2
digest: sha256:0428a6d56fd86c170e322ad79c7b5f87628b6187a1df5ad47ae7c2281b7f12da
generated: "2024-10-14T15:11:45.153501+02:00"
digest: sha256:80a70740a70f834b6ff6cdcfbb5f4a3504d6963f784ff678d1d52a7284b1dc20
generated: "2024-10-14T16:04:40.995885+02:00"
4 changes: 2 additions & 2 deletions charts/evm-stack/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ type: application
# This is the chart version. This version number should be incremented each time you make changes
# to the chart and its templates, including the app version.
# Versions are expected to follow Semantic Versioning (https://semver.org/)
version: 0.6.3
version: 0.6.4

dependencies:
- name: celestia-node
Expand All @@ -26,7 +26,7 @@ dependencies:
version: 0.27.6
repository: "file://../evm-rollup"
- name: composer
version: 0.1.5
version: 0.1.6
repository: "file://../composer"
condition: composer.enabled
- name: evm-faucet
Expand Down
7 changes: 6 additions & 1 deletion crates/astria-composer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,11 @@ name = "astria-composer"

[dependencies]
astria-build-info = { path = "../astria-build-info", features = ["runtime"] }
astria-core = { path = "../astria-core", features = ["serde", "server"] }
astria-core = { path = "../astria-core", features = [
"client",
"serde",
"server",
] }
astria-eyre = { path = "../astria-eyre" }
config = { package = "astria-config", path = "../astria-config" }
telemetry = { package = "astria-telemetry", path = "../astria-telemetry", features = [
Expand Down Expand Up @@ -59,6 +63,7 @@ path = "../astria-sequencer-client"
features = ["http"]

[dev-dependencies]
astria-grpc-mock = { path = "../astria-grpc-mock" }
config = { package = "astria-config", path = "../astria-config", features = [
"tests",
] }
Expand Down
7 changes: 5 additions & 2 deletions crates/astria-composer/local.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,11 @@ NO_COLOR=
# Address of the API server
ASTRIA_COMPOSER_API_LISTEN_ADDR="0.0.0.0:0"

# Address of the RPC server for the sequencer chain
ASTRIA_COMPOSER_SEQUENCER_URL="http://127.0.0.1:26657"
# Address of the ABCI server for the sequencer chain
ASTRIA_COMPOSER_SEQUENCER_ABCI_ENDPOINT="http://127.0.0.1:26657"

# Address of the gRPC server for the sequencer chain
ASTRIA_COMPOSER_SEQUENCER_GRPC_ENDPOINT="http://127.0.0.1:8080"

# Chain ID of the sequencer chain which transactions are submitted to.
ASTRIA_COMPOSER_SEQUENCER_CHAIN_ID="astria-dev-1"
Expand Down
3 changes: 2 additions & 1 deletion crates/astria-composer/src/composer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,8 @@ impl Composer {
let shutdown_token = CancellationToken::new();

let (executor, executor_handle) = executor::Builder {
sequencer_url: cfg.sequencer_url.clone(),
sequencer_abci_endpoint: cfg.sequencer_abci_endpoint.clone(),
sequencer_grpc_endpoint: cfg.sequencer_grpc_endpoint.clone(),
sequencer_chain_id: cfg.sequencer_chain_id.clone(),
private_key_file: cfg.private_key_file.clone(),
sequencer_address_prefix: cfg.sequencer_address_prefix.clone(),
Expand Down
7 changes: 5 additions & 2 deletions crates/astria-composer/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,11 @@ pub struct Config {
/// Address of the API server
pub api_listen_addr: SocketAddr,

/// Address of the RPC server for the sequencer chain
pub sequencer_url: String,
/// Address of the ABCI server for the sequencer chain
pub sequencer_abci_endpoint: String,

/// Address of the GRPC server for the sequencer chain
pub sequencer_grpc_endpoint: String,

/// The chain ID of the sequencer chain
pub sequencer_chain_id: String,
Expand Down
31 changes: 26 additions & 5 deletions crates/astria-composer/src/executor/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::{

use astria_core::{
crypto::SigningKey,
generated::sequencerblock::v1alpha1::sequencer_service_client::SequencerServiceClient,
primitive::v1::Address,
protocol::transaction::v1alpha1::action::Sequence,
};
Expand All @@ -24,7 +25,8 @@ use crate::{
};

pub(crate) struct Builder {
pub(crate) sequencer_url: String,
pub(crate) sequencer_abci_endpoint: String,
pub(crate) sequencer_grpc_endpoint: String,
pub(crate) sequencer_chain_id: String,
pub(crate) private_key_file: String,
pub(crate) sequencer_address_prefix: String,
Expand All @@ -38,7 +40,8 @@ pub(crate) struct Builder {
impl Builder {
pub(crate) fn build(self) -> eyre::Result<(super::Executor, executor::Handle)> {
let Self {
sequencer_url,
sequencer_abci_endpoint,
sequencer_grpc_endpoint,
sequencer_chain_id,
private_key_file,
sequencer_address_prefix,
Expand All @@ -48,8 +51,14 @@ impl Builder {
shutdown_token,
metrics,
} = self;
let sequencer_client = sequencer_client::HttpClient::new(sequencer_url.as_str())
.wrap_err("failed constructing sequencer client")?;
let abci_client = sequencer_client::HttpClient::new(sequencer_abci_endpoint.as_str())
.wrap_err("failed constructing sequencer http client")?;

let grpc_client =
connect_sequencer_grpc(sequencer_grpc_endpoint.as_str()).wrap_err_with(|| {
format!("failed to connect to sequencer over gRPC at `{sequencer_grpc_endpoint}`")
})?;

let (status, _) = watch::channel(Status::new());

let sequencer_key = read_signing_key_from_file(&private_key_file).wrap_err_with(|| {
Expand All @@ -69,7 +78,8 @@ impl Builder {
super::Executor {
status,
serialized_rollup_transactions: serialized_rollup_transaction_rx,
sequencer_client,
abci_client,
grpc_client,
sequencer_chain_id,
sequencer_key,
address: sequencer_address,
Expand All @@ -91,3 +101,14 @@ fn read_signing_key_from_file<P: AsRef<Path>>(path: P) -> eyre::Result<SigningKe
.map_err(|_| eyre!("invalid private key length; must be 32 bytes"))?;
Ok(SigningKey::from(private_key_bytes))
}

fn connect_sequencer_grpc(
grpc_endpoint: &str,
) -> eyre::Result<SequencerServiceClient<tonic::transport::Channel>> {
let uri: tonic::transport::Uri = grpc_endpoint
.parse()
.wrap_err("failed to parse endpoint as URI")?;
Ok(SequencerServiceClient::new(
tonic::transport::Endpoint::from(uri).connect_lazy(),
))
}
73 changes: 45 additions & 28 deletions crates/astria-composer/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,13 @@ use std::{

use astria_core::{
crypto::SigningKey,
generated::sequencerblock::v1alpha1::{
sequencer_service_client::{
self,
SequencerServiceClient,
},
GetPendingNonceRequest,
},
protocol::{
abci::AbciErrorCode,
transaction::v1alpha1::{
Expand Down Expand Up @@ -64,6 +71,7 @@ use tokio::{
},
};
use tokio_util::sync::CancellationToken;
use tonic::transport::Channel;
use tracing::{
debug,
error,
Expand All @@ -89,8 +97,6 @@ use crate::{
mod bundle_factory;

pub(crate) mod builder;
#[cfg(test)]
mod tests;

pub(crate) use builder::Builder;

Expand Down Expand Up @@ -122,8 +128,10 @@ pub(super) struct Executor {
// Channel for receiving `SequenceAction`s to be bundled.
serialized_rollup_transactions: mpsc::Receiver<Sequence>,
// The client for submitting wrapped and signed pending eth transactions to the astria
// sequencer.
sequencer_client: sequencer_client::HttpClient,
// sequencer via the ABCI client.
abci_client: sequencer_client::HttpClient,
// The grpc client for grabbing the latest nonce from.
grpc_client: sequencer_service_client::SequencerServiceClient<Channel>,
// The chain id used for submission of transactions to the sequencer.
sequencer_chain_id: String,
// Private key used to sign sequencer transactions
Expand Down Expand Up @@ -197,7 +205,8 @@ impl Executor {
metrics: &'static Metrics,
) -> Fuse<Instrumented<SubmitFut>> {
SubmitFut {
client: self.sequencer_client.clone(),
abci_client: self.abci_client.clone(),
grpc_client: self.grpc_client.clone(),
address: self.address,
nonce,
chain_id: self.sequencer_chain_id.clone(),
Expand Down Expand Up @@ -332,7 +341,7 @@ impl Executor {
self.ensure_chain_id_is_correct()
.await
.wrap_err("failed to validate chain id")?;
let nonce = get_latest_nonce(self.sequencer_client.clone(), self.address, self.metrics)
let nonce = get_pending_nonce(self.grpc_client.clone(), self.address, self.metrics)
.await
.wrap_err("failed getting initial nonce from sequencer")?;
Ok(nonce)
Expand Down Expand Up @@ -378,10 +387,9 @@ impl Executor {
futures::future::ready(())
},
);
let client_genesis: tendermint::Genesis =
tryhard::retry_fn(|| self.sequencer_client.genesis())
.with_config(retry_config)
.await?;
let client_genesis: tendermint::Genesis = tryhard::retry_fn(|| self.abci_client.genesis())
.with_config(retry_config)
.await?;
Ok(client_genesis.chain_id)
}

Expand Down Expand Up @@ -460,23 +468,21 @@ impl Executor {
}
}

/// Queries the sequencer for the latest nonce with an exponential backoff
#[instrument(name = "get latest nonce", skip_all, fields(%address), err)]
async fn get_latest_nonce(
client: sequencer_client::HttpClient,
/// Queries the sequencer for the latest pending nonce with an exponential backoff
#[instrument(name = "get pending nonce", skip_all, fields(%address), err)]
async fn get_pending_nonce(
client: sequencer_service_client::SequencerServiceClient<Channel>,
address: Address,
metrics: &Metrics,
) -> eyre::Result<u32> {
debug!("fetching latest nonce from sequencer");
debug!("fetching pending nonce from sequencer");
let span = Span::current();
let start = Instant::now();
let retry_config = tryhard::RetryFutureConfig::new(1024)
.exponential_backoff(Duration::from_millis(200))
.max_delay(Duration::from_secs(60))
.on_retry(
|attempt,
next_delay: Option<Duration>,
err: &sequencer_client::extension_trait::Error| {
|attempt, next_delay: Option<Duration>, err: &tonic::Status| {
metrics.increment_nonce_fetch_failure_count();

let wait_duration = next_delay
Expand All @@ -493,14 +499,22 @@ async fn get_latest_nonce(
},
);
let res = tryhard::retry_fn(|| {
let client = client.clone();
let span = info_span!(parent: span.clone(), "attempt get nonce");
let mut client = client.clone();
Lilyjjo marked this conversation as resolved.
Show resolved Hide resolved
let span = info_span!(parent: span.clone(), "attempt get pending nonce");
metrics.increment_nonce_fetch_count();
async move { client.get_latest_nonce(address).await.map(|rsp| rsp.nonce) }.instrument(span)
async move {
client
.get_pending_nonce(GetPendingNonceRequest {
address: Some(address.into_raw()),
})
.await
.map(|rsp| rsp.into_inner().inner)
}
.instrument(span)
})
.with_config(retry_config)
.await
.wrap_err("failed getting latest nonce from sequencer after 1024 attempts");
.wrap_err("failed getting pending nonce from sequencer after 1024 attempts");

metrics.record_nonce_fetch_latency(start.elapsed());

Expand Down Expand Up @@ -637,7 +651,8 @@ pin_project! {
/// If the sequencer returned a non-zero abci code (albeit not `INVALID_NONCE`), this future will return with
/// that nonce it used to submit the non-zero abci code request.
struct SubmitFut {
client: sequencer_client::HttpClient,
abci_client: sequencer_client::HttpClient,
grpc_client: SequencerServiceClient<tonic::transport::Channel>,
address: Address,
chain_id: String,
nonce: u32,
Expand Down Expand Up @@ -670,6 +685,8 @@ impl Future for SubmitFut {
// FIXME (https://github.com/astriaorg/astria/issues/1572): This function is too long and should be refactored.
fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
const INVALID_NONCE: Code = Code::Err(AbciErrorCode::INVALID_NONCE.value());
const NONCE_TAKEN: Code = Code::Err(AbciErrorCode::NONCE_TAKEN.value());

loop {
let this = self.as_mut().project();

Expand All @@ -686,7 +703,7 @@ impl Future for SubmitFut {
"submitting transaction to sequencer",
);
SubmitState::WaitingForSend {
fut: submit_tx(this.client.clone(), tx, self.metrics).boxed(),
fut: submit_tx(this.abci_client.clone(), tx, self.metrics).boxed(),
}
}

Expand All @@ -708,14 +725,14 @@ impl Future for SubmitFut {
.checked_add(1)
.expect("nonce should not overflow")));
}
INVALID_NONCE => {
INVALID_NONCE | NONCE_TAKEN => {
info!(
"sequencer rejected transaction due to invalid nonce; fetching \
new nonce"
);
SubmitState::WaitingForNonce {
fut: get_latest_nonce(
this.client.clone(),
fut: get_pending_nonce(
this.grpc_client.clone(),
*this.address,
self.metrics,
)
Expand Down Expand Up @@ -759,7 +776,7 @@ impl Future for SubmitFut {
"resubmitting transaction to sequencer with new nonce",
);
SubmitState::WaitingForSend {
fut: submit_tx(this.client.clone(), tx, self.metrics).boxed(),
fut: submit_tx(this.abci_client.clone(), tx, self.metrics).boxed(),
}
}
Err(error) => {
Expand Down
Loading
Loading