Skip to content

Commit

Permalink
address review comments - 4
Browse files Browse the repository at this point in the history
  • Loading branch information
bharath-123 committed Dec 13, 2024
1 parent 529c645 commit b356f59
Show file tree
Hide file tree
Showing 6 changed files with 78 additions and 68 deletions.
3 changes: 1 addition & 2 deletions crates/astria-sequencer/local.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@ ASTRIA_SEQUENCER_METRICS_HTTP_LISTENER_ADDR="127.0.0.1:9000"
# `ASTRIA_SEQUENCER_FORCE_STDOUT` is set to `true`.
ASTRIA_SEQUENCER_PRETTY_PRINT=false

# If true disables streaming optimistic blocks to clients.
# if false enables the optimistic block grpc stream
# Disables streaming optimistic blocks to clients.
ASTRIA_SEQUENCER_NO_OPTIMISTIC_BLOCKS=false

# If set to any non-empty value removes ANSI escape characters from the pretty
Expand Down
36 changes: 17 additions & 19 deletions crates/astria-sequencer/src/app/event_bus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,9 @@ pub(crate) struct EventReceiver<T> {
// We receive an Option over T because the sender side of the watch is designed to send
// Option values. This allows the sender value to send objects which do not have a `Default`
// implementation.
receiver: Receiver<Option<T>>,
// A token that is resolved when the sender side of the watch is initialized.
// It is used to wait for the sender side of the watch to be initialized before the
// receiver side can start receiving valid values.
inner: Receiver<Option<T>>,
// To signal subscribers that the event bus is initialized, i.e. that the value in `sender` was
// set.
is_init: CancellationToken,
}

Expand All @@ -33,21 +32,22 @@ where
// This is useful in situations where we want to ignore the current value of the watch
// and wait for the next value.
pub(crate) fn mark_latest_event_as_seen(&mut self) {
self.receiver.mark_unchanged();
self.inner.mark_unchanged();
}

// Returns the latest value of the event, waiting for the value to change if it hasn't already.
pub(crate) async fn receive(&mut self) -> astria_eyre::Result<T> {
// This will get resolved on the first send through the sender side of the watch
// i.e when the sender is initialized.
// This will get resolved when the sender side of the watch is initialized by sending
// the first value. We wait till the sender side of the watch has initialized the watch.
// Once the sender side has been initialized, it will get resolved immediately.
self.is_init.cancelled().await;
// We want to only receive the latest value through the receiver, so we wait for the
// current value in the watch to change before we return it.
self.receiver
self.inner
.changed()
.await
.wrap_err("error waiting for latest event")?;
Ok(self.receiver.borrow_and_update().clone().expect(
Ok(self.inner.borrow_and_update().clone().expect(
"events must be set after is_init is triggered; this means an invariant was violated",
))
}
Expand All @@ -56,21 +56,19 @@ where
/// `EventSender` contains the sender side of the events sent by the Sequencer App.
/// At any given time, it sends the latest value of the event.
struct EventSender<T> {
// The sender side of the watch which is used to send the latest value of the event.
// We use an Option here to allow for the sender to be initialized with a None value
// which allows the type to not have a Default value.
sender: Sender<Option<T>>,
// A token that is resolved when the sender side of the watch is initialized. It is used to
// wait for the sender side of the watch to be initialized before the receiver side can start
// receiving valid values.
// A watch channel that is always starts unset. Once set, the `is_init` token is cancelled
// and value in the channel will never be unset.
inner: Sender<Option<T>>,
// To signal subscribers that the event bus is initialized, i.e. that the value in `sender` was
// set.
is_init: CancellationToken,
}

impl<T> EventSender<T> {
fn new() -> Self {
let (sender, _) = tokio::sync::watch::channel(None);
Self {
sender,
inner: sender,
is_init: CancellationToken::new(),
}
}
Expand All @@ -79,14 +77,14 @@ impl<T> EventSender<T> {
// used to receive the latest value of the event.
fn subscribe(&self) -> EventReceiver<T> {
EventReceiver {
receiver: self.sender.subscribe(),
inner: self.inner.subscribe(),
is_init: self.is_init.clone(),
}
}

// Sends the event to all the subscribers.
fn send(&self, event: T) {
self.sender.send_replace(Some(event));
self.inner.send_replace(Some(event));
// after sending the first value, we resolve the is_init token to signal that the sender
// side of the watch is initialized. The receiver side can now start receiving valid
// values.
Expand Down
18 changes: 9 additions & 9 deletions crates/astria-sequencer/src/app/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ impl App {
})
}

pub(crate) fn event_bus_subscription(&self) -> EventBusSubscription {
pub(crate) fn subscribe_to_events(&self) -> EventBusSubscription {
self.event_bus.subscribe()
}

Expand Down Expand Up @@ -447,6 +447,7 @@ impl App {
bail!("execution results must be present after executing transactions")
};

// FIXME - avoid duplicate calls to post_execute_transactions. refer to: https://github.com/astriaorg/astria/issues/1835
let sequencer_block = self
.post_execute_transactions(
process_proposal.hash,
Expand Down Expand Up @@ -552,7 +553,7 @@ impl App {

self.executed_proposal_hash = process_proposal.hash;

// TODO - avoid duplicate calls to post_execute_transactions. refer to: https://github.com/astriaorg/astria/issues/1835
// FIXME - avoid duplicate calls to post_execute_transactions. refer to: https://github.com/astriaorg/astria/issues/1835
let sequencer_block = self
.post_execute_transactions(
process_proposal.hash,
Expand Down Expand Up @@ -952,6 +953,7 @@ impl App {
finalize_block_tx_results.extend(std::iter::repeat(ExecTxResult::default()).take(2));
finalize_block_tx_results.extend(tx_results);

// FIXME - avoid duplicate calls to post_execute_transactions. refer to: https://github.com/astriaorg/astria/issues/1835
let sequencer_block = SequencerBlock::try_from_block_info_and_data(
block_hash,
chain_id,
Expand Down Expand Up @@ -994,8 +996,6 @@ impl App {
finalize_block: abci::request::FinalizeBlock,
storage: Storage,
) -> Result<abci::response::FinalizeBlock> {
let finalize_block_arc = Arc::new(finalize_block.clone());

// If we previously executed txs in a different proposal than is being processed,
// reset cached state changes.
if self.executed_proposal_hash != finalize_block.hash {
Expand All @@ -1008,8 +1008,8 @@ impl App {
rollup IDs commitment"
);

let height = finalize_block.height;
let block_hash = finalize_block.hash;
// FIXME: refactor to avoid cloning the finalize block
let finalize_block_arc = Arc::new(finalize_block.clone());

// When the hash is not empty, we have already executed and cached the results
if self.executed_proposal_hash.is_empty() {
Expand All @@ -1021,7 +1021,7 @@ impl App {
// we haven't executed anything yet, so set up the state for execution.
let block_data = BlockData {
misbehavior: finalize_block.misbehavior,
height,
height: finalize_block.height,
time,
next_validators_hash: finalize_block.next_validators_hash,
proposer_address,
Expand Down Expand Up @@ -1064,8 +1064,8 @@ impl App {
}

self.post_execute_transactions(
block_hash,
height,
finalize_block.hash,
finalize_block.height,
time,
proposer_address,
finalize_block.txs,
Expand Down
65 changes: 33 additions & 32 deletions crates/astria-sequencer/src/grpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,8 @@ pub(crate) mod storage;

use std::time::Duration;

use astria_core::generated::astria::sequencerblock::{
optimistic::v1alpha1::optimistic_block_service_server::OptimisticBlockServiceServer,
v1::sequencer_service_server::SequencerServiceServer,
};
use futures::{
future::Fuse,
FutureExt,
};
use astria_core::generated::astria::sequencerblock::v1::sequencer_service_server::SequencerServiceServer;
use astria_eyre::eyre::WrapErr as _;
pub(crate) use state_ext::{
StateReadExt,
StateWriteExt,
Expand Down Expand Up @@ -62,17 +56,15 @@ pub(crate) fn start_server(

let optimistic_streams_cancellation_token = CancellationToken::new();

let (optimistic_block_facade, mut optimistic_block_service_inner) = optimistic::new_service(
event_bus_subscription,
optimistic_streams_cancellation_token.child_token(),
);
let (optimistic_block_service, mut optimistic_block_task) = if no_optimistic_blocks {
(None, None)
} else {
let (optimistic_block_service, optimistic_block_task) = optimistic::new_service(
event_bus_subscription,
optimistic_streams_cancellation_token.child_token(),
);

let optimistic_block_service_server = {
if no_optimistic_blocks {
None
} else {
Some(OptimisticBlockServiceServer::new(optimistic_block_facade))
}
(Some(optimistic_block_service), Some(optimistic_block_task))
};

// TODO: setup HTTPS?
Expand All @@ -97,15 +89,7 @@ pub(crate) fn start_server(
.add_service(ChannelQueryServer::new(ibc.clone()))
.add_service(ConnectionQueryServer::new(ibc.clone()))
.add_service(SequencerServiceServer::new(sequencer_api))
.add_optional_service(optimistic_block_service_server);

let optimistic_block_inner_handle = {
if no_optimistic_blocks {
Fuse::terminated()
} else {
tokio::task::spawn(async move { optimistic_block_service_inner.run().await }).fuse()
}
};
.add_optional_service(optimistic_block_service);

info!(grpc_addr = grpc_addr.to_string(), "starting grpc server");

Expand All @@ -115,22 +99,39 @@ pub(crate) fn start_server(
_ = shutdown_rx => {
Ok("grpc server shutting down")
},
_ = optimistic_block_inner_handle => {
Err("optimistic block inner handle task exited")
res = async { optimistic_block_task.as_mut().unwrap().await }, if optimistic_block_task.is_some() => {
match res {
Ok(()) => {
Ok("optimistic block inner handle task exited successfully")
},
Err(e) => {
Err(e).wrap_err("optimistic block inner handle task exited with error")
}
}
}
};
optimistic_streams_cancellation_token.cancel();

// give time for the optimistic block service to shutdown all the streaming tasks.
tokio::time::sleep(SHUTDOWN_TIMEOUT).await;
if optimistic_block_task.is_some() {
// give time for the optimistic block service to shutdown all the streaming tasks.
tokio::time::sleep(SHUTDOWN_TIMEOUT).await;

let optimistic_block_stream_task_handle = optimistic_block_task.as_mut().unwrap();
if optimistic_block_stream_task_handle.is_finished() {
info!("optimistic block service tasks shutdown gracefully");
} else {
warn!("optimistic block service tasks didn't shutdown gracefully, aborting the task manually");
optimistic_block_stream_task_handle.abort();
}
}

info_span!(SHUTDOWN_SPAN).in_scope(|| {
match reason {
Ok(reason) => {
info!(reason);
}
Err(reason) => {
warn!(reason);
warn!("{}", reason.to_string());
}
};
});
Expand Down
22 changes: 17 additions & 5 deletions crates/astria-sequencer/src/grpc/optimistic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ use std::{

use astria_core::{
generated::astria::sequencerblock::optimistic::v1alpha1::{
optimistic_block_service_server::OptimisticBlockService,
optimistic_block_service_server::{
OptimisticBlockService,
OptimisticBlockServiceServer,
},
GetBlockCommitmentStreamRequest,
GetBlockCommitmentStreamResponse,
GetOptimisticBlockStreamRequest,
Expand All @@ -29,7 +32,10 @@ use tendermint::{
};
use tokio::{
sync::mpsc,
task::JoinSet,
task::{
JoinHandle,
JoinSet,
},
};
use tokio_util::sync::CancellationToken;
use tonic::{
Expand Down Expand Up @@ -64,13 +70,19 @@ type GrpcStream<T> = Pin<Box<dyn Stream<Item = Result<T, Status>> + Send>>;
pub(super) fn new_service(
event_bus_subscription: EventBusSubscription,
cancellation_token: CancellationToken,
) -> (OptimisticBlockFacade, OptimisticBlockStreamRunner) {
) -> (
OptimisticBlockServiceServer<OptimisticBlockFacade>,
JoinHandle<()>,
) {
let (tx, rx) = mpsc::channel(128);

let facade = OptimisticBlockFacade::new(tx);
let inner = OptimisticBlockStreamRunner::new(event_bus_subscription, rx, cancellation_token);

(facade, inner)
let inner_task = tokio::spawn(inner.run());
let server = OptimisticBlockServiceServer::new(facade);

(server, inner_task)
}

struct StartOptimisticBlockStreamRequest {
Expand Down Expand Up @@ -140,7 +152,7 @@ impl OptimisticBlockStreamRunner {
));
}

pub(super) async fn run(&mut self) {
pub(super) async fn run(mut self) {
loop {
tokio::select! {
biased;
Expand Down
2 changes: 1 addition & 1 deletion crates/astria-sequencer/src/sequencer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ impl Sequencer {
.await
.wrap_err("failed to initialize app")?;

let event_bus_subscription = app.event_bus_subscription();
let event_bus_subscription = app.subscribe_to_events();

let consensus_service = tower::ServiceBuilder::new()
.layer(request_span::layer(|req: &ConsensusRequest| {
Expand Down

0 comments on commit b356f59

Please sign in to comment.