Skip to content
This repository has been archived by the owner on May 25, 2023. It is now read-only.

Commit

Permalink
integrate gossipnet (#20)
Browse files Browse the repository at this point in the history
  • Loading branch information
noot authored May 11, 2023
1 parent ff6580f commit 6fb7ed0
Show file tree
Hide file tree
Showing 13 changed files with 2,963 additions and 158 deletions.
2,679 changes: 2,628 additions & 51 deletions Cargo.lock

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ execution_rpc_url = "http://localhost:50051"

* run `cargo run`

* alternatively, you can do `cargo build && ./target/debug/conductor`.

* to connect directly to a node via p2p, you can use the `--bootnodes` flag, eg. `--bootnodes=/ip4/127.0.0.1/tcp/33900` or `--bootnodes=/ip4/127.0.0.1/tcp/34471/p2p/12D3KooWDCHwgGetpJuHknJqv2dNbYpe3LqgH8BKrsYHV9ALpAj8`.

### Running dependencies with podman

First, ensure your local `ConductorConfig.toml` matches the values below.
Expand Down
11 changes: 5 additions & 6 deletions astria-conductor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,8 @@ name = "astria-conductor"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
astria-execution-apis-rpc = { git = "https://github.com/astriaorg/astria-rpc", rev = "b67d6e1a7e61d2143ad395f9c1c3fa1f515a45ce" }
astria-execution-apis-rpc = { git = "https://github.com/astriaorg/astria-rpc", rev = "bf833e6990e8eaee31ab5ea6801ba2746c4df1ef" }
bech32 = "0.9"
clap = { version = "4.1.9", features = ["derive"] }
color-eyre = "0.6.2"
Expand All @@ -15,20 +13,21 @@ figment = { version = "0.10.8", features = ["toml", "env"] }
flexi_logger = "0.24.2"
futures = "0.3"
hex = "0.4"
gossipnet = { git = "https://github.com/astriaorg/gossipnet", features = [ "mdns" ], rev = "e916e4b0d4a0caa6fea68f721d4a4a8660a46af6" }
log = "0.4.17"
prost-types = "0.11.8"
reqwest = { version = "0.11", default-features = false, features = ["json", "rustls-tls"] }
sequencer-relayer = { git = "https://github.com/astriaorg/sequencer-relayer", rev = "8fa5e01" }
sequencer-relayer-proto = { git = "https://github.com/astriaorg/sequencer-relayer", rev = "8fa5e01" }
sequencer-relayer = { git = "https://github.com/astriaorg/sequencer-relayer", rev = "8cd9b9e4268368c48ab3be02d9fa85b46914a32e" }
sequencer-relayer-proto = { git = "https://github.com/astriaorg/sequencer-relayer", rev = "8cd9b9e4268368c48ab3be02d9fa85b46914a32e" }
serde = "1.0.152"
serde_json = "1.0"
thiserror = "1.0.38"
tendermint = "0.30"
tendermint-proto = "0.30.0"
tokio = { version = "1.25.0", features = ["macros", "rt-multi-thread", "signal"] }
tonic = "0.8.3"
tracing = "0.1.37"
tracing-subscriber = "0.3.16"
tendermint-proto = "0.30.0"

[dependencies.rs-cnc]
git = "https://github.com/astriaorg/rs-cnc.git"
Expand Down
8 changes: 7 additions & 1 deletion astria-conductor/src/alert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,14 @@ pub(crate) type AlertSender = UnboundedSender<Alert>;
/// The alerts that the driver may send the driver user.
#[derive(Debug)]
pub enum Alert {
/// Send when a block has been received from the gossip network.
BlockReceivedFromGossipNetwork {
/// The height of the block received
block_height: u64,
},

/// Send when a block has been received from the data layer.
BlockReceived {
BlockReceivedFromDataAvailability {
/// The height of the block received
block_height: u64,
},
Expand Down
65 changes: 59 additions & 6 deletions astria-conductor/src/bin/conductor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ use astria_conductor::{
logger,
};
use clap::Parser;
use color_eyre::eyre::Result;
use color_eyre::eyre::{
eyre,
Result,
};
use figment::{
providers::{
Env,
Expand Down Expand Up @@ -68,7 +71,8 @@ async fn run() -> Result<()> {

// spawn our driver
let (alert_tx, mut alert_rx) = mpsc::unbounded_channel();
let mut driver = Driver::new(conf, alert_tx).await?;
let (mut driver, executor_join_handle, reader_join_handle) =
Driver::new(conf, alert_tx.clone()).await?;
let driver_tx = driver.cmd_tx.clone();

tokio::task::spawn(async move {
Expand All @@ -77,8 +81,54 @@ async fn run() -> Result<()> {
}
});

// NOTE - this will most likely be replaced by an RPC server that will receive gossip
// messages from the sequencer
let executor_alert_tx = alert_tx.clone();
tokio::task::spawn(async move {
match executor_join_handle.await {
Ok(run_res) => match run_res {
Ok(_) => {
_ = executor_alert_tx.send(Alert::DriverError(eyre!(
"executor task exited unexpectedly"
)));
}
Err(e) => {
_ = executor_alert_tx.send(Alert::DriverError(eyre!(
"executor exited with error: {}",
e
)));
}
},
Err(e) => {
_ = executor_alert_tx.send(Alert::DriverError(eyre!(
"received JoinError from executor task: {}",
e
)));
}
}
});

if reader_join_handle.is_some() {
tokio::task::spawn(async move {
match reader_join_handle.unwrap().await {
Ok(run_res) => match run_res {
Ok(_) => {
_ = alert_tx
.send(Alert::DriverError(eyre!("reader task exited unexpectedly")));
}
Err(e) => {
_ = alert_tx
.send(Alert::DriverError(eyre!("reader exited with error: {}", e)));
}
},
Err(e) => {
_ = alert_tx.send(Alert::DriverError(eyre!(
"received JoinError from reader task: {}",
e
)));
}
}
});
}

let mut interval = time::interval(Duration::from_secs(3));

loop {
Expand Down Expand Up @@ -106,8 +156,11 @@ async fn run() -> Result<()> {
error!("error: {}", error_string);
break;
}
Alert::BlockReceived{block_height} => {
info!("block received from DA layer; DA layer height: {}", block_height);
Alert::BlockReceivedFromGossipNetwork{block_height} => {
info!("sequencer block received from p2p network; height: {}", block_height);
}
Alert::BlockReceivedFromDataAvailability{block_height} => {
info!("sequencer block received from DA layer; height: {}", block_height);
}
}
}
Expand Down
6 changes: 6 additions & 0 deletions astria-conductor/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@ pub struct Cli {
#[serde(skip_serializing_if = "::std::option::Option::is_none")]
pub execution_rpc_url: Option<String>,

#[arg(long = "bootnodes")]
pub bootnodes: Vec<String>,

#[arg(long = "disable-finalization")]
pub disable_finalization: bool,

/// Log level. One of debug, info, warn, or error
#[arg(long = "log", default_value = "info")]
pub log_level: String,
Expand Down
7 changes: 7 additions & 0 deletions astria-conductor/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,13 @@ pub struct Config {
/// Address of the RPC server for execution
#[serde(default = "default_execution_rpc_url")]
pub execution_rpc_url: String,

/// Disable reading from the DA layer and block finalization
#[serde(default)]
pub disable_finalization: bool,

/// Bootnodes for the P2P network
pub bootnodes: Vec<String>,
}

// NOTE - using default fns instead of defaults in Cli because defaults
Expand Down
Loading

0 comments on commit 6fb7ed0

Please sign in to comment.