Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Implement Optimistic block grpc streams #1709

Open
wants to merge 32 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
376efa0
move optimistic block protos to v1
bharath-123 Nov 19, 2024
f01e946
move optimistic block protos to v1
bharath-123 Nov 5, 2024
c8d1282
implement optimistic block apis
bharath-123 Dec 9, 2024
d25ab44
send commited block from finalize_block
bharath-123 Oct 22, 2024
40b9cd1
add ASTRIA_SEQUENCER_NO_OPTIMISTIC_BLOCK to configmaps
bharath-123 Oct 22, 2024
bde1443
fix lint issues
bharath-123 Oct 22, 2024
fcb0ca3
remove debug trait from OptimisticBlockChannels
bharath-123 Oct 22, 2024
f1718d8
minor updates
bharath-123 Oct 22, 2024
c9457ba
update sequencer chart version
bharath-123 Oct 29, 2024
2f98dee
minor changes
bharath-123 Oct 22, 2024
2a0c342
just use Ok while sending data over grpc stream instead of type casti…
bharath-123 Oct 22, 2024
60a73b0
update optimistic block imports
bharath-123 Dec 9, 2024
715be7f
minor updates
bharath-123 Oct 29, 2024
7f98e48
move optimistic block grpc wrappers to v1
bharath-123 Oct 30, 2024
b0f42db
rebase from pr 1707
bharath-123 Dec 9, 2024
819341d
address review comments
bharath-123 Dec 9, 2024
776bc1f
minor updates
bharath-123 Nov 18, 2024
2ec212e
address review comments
bharath-123 Nov 18, 2024
4e3a529
make task functions free standing
bharath-123 Nov 18, 2024
975fc5c
minor renaming
bharath-123 Nov 19, 2024
f781ffc
rebase protos
bharath-123 Dec 9, 2024
ac4ced8
rename optimistic_block to optimistic
bharath-123 Nov 19, 2024
9d2fb0b
address review comments
bharath-123 Dec 9, 2024
81d91e4
use in_scope to emit events under a specific span
bharath-123 Nov 26, 2024
3db6a8b
add a comment
bharath-123 Nov 26, 2024
5479159
add documentation to event bus
bharath-123 Nov 26, 2024
0347354
run fmt + lint
bharath-123 Nov 26, 2024
04327a6
fix comment
bharath-123 Nov 26, 2024
b4191ba
lint + fmt
bharath-123 Dec 9, 2024
61b98ed
only stream new values for the optimistic block and block commitment
bharath-123 Dec 9, 2024
529c645
add some comments
bharath-123 Dec 11, 2024
b356f59
address review comments - 4
bharath-123 Dec 13, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion charts/sequencer/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ version: 1.0.1
# incremented each time you make changes to the application. Versions are not expected to
# follow Semantic Versioning. They should reflect the version the application is using.
# It is recommended to use it with quotes.
appVersion: "1.0.0"
appVersion: "1.0.1"

dependencies:
- name: sequencer-relayer
Expand Down
1 change: 1 addition & 0 deletions charts/sequencer/templates/configmaps.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -75,5 +75,6 @@ data:
OTEL_SERVICE_NAME: "{{ tpl .Values.sequencer.otel.serviceName . }}"
{{- if not .Values.global.dev }}
{{- else }}
ASTRIA_SEQUENCER_NO_OPTIMISTIC_BLOCKS: "{{ not .Values.sequencer.optimisticBlockApis.enabled }}"
{{- end }}
---
2 changes: 2 additions & 0 deletions charts/sequencer/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ sequencer:
tracesTimeout: 10
otlpHeaders:
traceHeaders:
optimisticBlockApis:
enabled: false

cometbft:
config:
Expand Down
1 change: 1 addition & 0 deletions crates/astria-core/src/sequencerblock/v1/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
pub mod block;
pub mod celestia;
pub mod optimistic;

pub use block::{
RollupTransactions,
Expand Down
83 changes: 83 additions & 0 deletions crates/astria-core/src/sequencerblock/v1/optimistic.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
use bytes::Bytes;

use crate::{
generated::astria::sequencerblock::optimistic::v1alpha1 as raw,
Protobuf,
};

#[derive(Debug, thiserror::Error)]
#[error(transparent)]
pub struct SequencerBlockCommitError(SequencerBlockCommitErrorKind);

impl SequencerBlockCommitError {
fn invalid_block_hash(len: usize) -> Self {
Self(SequencerBlockCommitErrorKind::InvalidBlockHash(len))
}
}

#[derive(Debug, thiserror::Error)]
enum SequencerBlockCommitErrorKind {
#[error("invalid block hash length: {0}")]
InvalidBlockHash(usize),
}

#[derive(Clone, Debug)]
pub struct SequencerBlockCommit {
height: u64,
block_hash: [u8; 32],
}

impl SequencerBlockCommit {
#[must_use]
pub fn new(height: u64, block_hash: [u8; 32]) -> Self {
Self {
height,
block_hash,
}
}

#[must_use]
pub fn height(&self) -> u64 {
self.height
}

#[must_use]
pub fn block_hash(&self) -> &[u8; 32] {
&self.block_hash
}
}

impl From<SequencerBlockCommit> for raw::SequencerBlockCommit {
fn from(value: SequencerBlockCommit) -> Self {
value.to_raw()
}
}

impl Protobuf for SequencerBlockCommit {
type Error = SequencerBlockCommitError;
type Raw = raw::SequencerBlockCommit;

fn try_from_raw_ref(raw: &Self::Raw) -> Result<Self, Self::Error> {
let Self::Raw {
height,
block_hash,
} = raw;

let block_hash = block_hash
.as_ref()
.try_into()
.map_err(|_| SequencerBlockCommitError::invalid_block_hash(block_hash.len()))?;

Ok(SequencerBlockCommit {
height: *height,
block_hash,
})
}

fn to_raw(&self) -> Self::Raw {
raw::SequencerBlockCommit {
height: self.height(),
block_hash: Bytes::copy_from_slice(self.block_hash()),
}
}
}
1 change: 1 addition & 0 deletions crates/astria-sequencer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ tendermint-proto = { workspace = true }
tendermint = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true, features = ["rt", "tracing"] }
tokio-util = { workspace = true, features = ["rt"] }
tonic = { workspace = true }
tracing = { workspace = true }

Expand Down
3 changes: 3 additions & 0 deletions crates/astria-sequencer/local.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ ASTRIA_SEQUENCER_METRICS_HTTP_LISTENER_ADDR="127.0.0.1:9000"
# `ASTRIA_SEQUENCER_FORCE_STDOUT` is set to `true`.
ASTRIA_SEQUENCER_PRETTY_PRINT=false

# Disables streaming optimistic blocks to clients.
ASTRIA_SEQUENCER_NO_OPTIMISTIC_BLOCKS=false

# If set to any non-empty value removes ANSI escape characters from the pretty
# printed output. Note that this does nothing unless `ASTRIA_SEQUENCER_PRETTY_PRINT`
# is set to `true`.
Expand Down
158 changes: 158 additions & 0 deletions crates/astria-sequencer/src/app/event_bus.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
use std::sync::Arc;

use astria_core::sequencerblock::v1::SequencerBlock;
use astria_eyre::eyre::WrapErr as _;
use tendermint::abci::request::FinalizeBlock;
use tokio::sync::watch::{
Receiver,
Sender,
};
use tokio_util::sync::CancellationToken;

/// `EventReceiver` contains the receiver side of the events sent by the Sequencer App.
/// The listeners of the events can receive the latest value of the event by calling the
/// `receive` method.
#[derive(Clone)]
pub(crate) struct EventReceiver<T> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add explanations on what the fields do (specifically, is_init because it might not be immediately obvious to a reader).

// The receiver side of the watch which is read for the latest value of the event.
// We receive an Option over T because the sender side of the watch is designed to send
// Option values. This allows the sender value to send objects which do not have a `Default`
// implementation.
inner: Receiver<Option<T>>,
// To signal subscribers that the event bus is initialized, i.e. that the value in `sender` was
// set.
is_init: CancellationToken,
}

impl<T> EventReceiver<T>
where
T: Clone,
{
// Marks the current message in the receiver end of the watch as seen.
// This is useful in situations where we want to ignore the current value of the watch
// and wait for the next value.
pub(crate) fn mark_latest_event_as_seen(&mut self) {
self.inner.mark_unchanged();
}

// Returns the latest value of the event, waiting for the value to change if it hasn't already.
pub(crate) async fn receive(&mut self) -> astria_eyre::Result<T> {
// This will get resolved when the sender side of the watch is initialized by sending
// the first value. We wait till the sender side of the watch has initialized the watch.
// Once the sender side has been initialized, it will get resolved immediately.
self.is_init.cancelled().await;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a note that this will resolve immediately once initialized.

// We want to only receive the latest value through the receiver, so we wait for the
// current value in the watch to change before we return it.
self.inner
.changed()
.await
.wrap_err("error waiting for latest event")?;
Ok(self.inner.borrow_and_update().clone().expect(
"events must be set after is_init is triggered; this means an invariant was violated",
))
}
}

/// `EventSender` contains the sender side of the events sent by the Sequencer App.
/// At any given time, it sends the latest value of the event.
struct EventSender<T> {
// A watch channel that is always starts unset. Once set, the `is_init` token is cancelled
// and value in the channel will never be unset.
inner: Sender<Option<T>>,
// To signal subscribers that the event bus is initialized, i.e. that the value in `sender` was
// set.
is_init: CancellationToken,
}

impl<T> EventSender<T> {
fn new() -> Self {
let (sender, _) = tokio::sync::watch::channel(None);
Self {
inner: sender,
is_init: CancellationToken::new(),
}
}

// Returns a `EventReceiver` object that contains the receiver side of the watch which can be
// used to receive the latest value of the event.
fn subscribe(&self) -> EventReceiver<T> {
EventReceiver {
inner: self.inner.subscribe(),
is_init: self.is_init.clone(),
}
}

// Sends the event to all the subscribers.
fn send(&self, event: T) {
self.inner.send_replace(Some(event));
// after sending the first value, we resolve the is_init token to signal that the sender
// side of the watch is initialized. The receiver side can now start receiving valid
// values.
self.is_init.cancel();
}
}

/// `EventBusSubscription` contains [`EventReceiver`] of various events that can be subscribed.
/// It can be cloned by various components in the sequencer app to receive events.
#[derive(Clone)]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment on the clone (and mentioned in App): I don't think this should be cloned anywhere - only App should send things over these channels. Instead, provide something like:

#[derive(Clone)]
pub(crate) struct EventBusSubscription {
    process_proposal_blocks: EventReceiver<Arc<SequencerBlock>>,
    finalize_blocks: EventReceiver<Arc<SequencerBlockCommit>>,
}

pub(crate) struct EventBusSubscription {
process_proposal_blocks: EventReceiver<Arc<SequencerBlock>>,
finalized_blocks: EventReceiver<Arc<FinalizeBlock>>,
}

impl EventBusSubscription {
pub(crate) fn process_proposal_blocks(&mut self) -> EventReceiver<Arc<SequencerBlock>> {
self.process_proposal_blocks.clone()
}

pub(crate) fn finalized_blocks(&mut self) -> EventReceiver<Arc<FinalizeBlock>> {
self.finalized_blocks.clone()
}
}

/// The Sequencer `EventBus` is used to send and receive events between different components of the
/// sequencer. Components of Sequencer can subscribe to the `EventBus` via the `subscribe` method
/// which returns a [`EventBusSubscription`] objects that contains receivers of various events which
/// are of type [`EventReceiver`].
///
/// The `EventBus` is implemented using [`tokio::sync::watch`] which allows for multiple receivers
/// to receive the event at any given time.
pub(super) struct EventBus {
// Sends a process proposal block event to the subscribers. The event is sent in the form of a
// sequencer block which is created during the process proposal block phase.
process_proposal_block_sender: EventSender<Arc<SequencerBlock>>,
// Sends a finalized block event to the subscribers. The event is sent in the form of the
// finalize block abci request.
finalized_block_sender: EventSender<Arc<FinalizeBlock>>,
}

impl EventBus {
pub(super) fn new() -> Self {
let process_proposal_block_sender = EventSender::new();
let finalized_block_sender = EventSender::new();

Self {
process_proposal_block_sender,
finalized_block_sender,
}
}

// Returns a `EventBusSubscription` object that contains receivers of various events that can
// be subscribed to.
pub(crate) fn subscribe(&self) -> EventBusSubscription {
EventBusSubscription {
process_proposal_blocks: self.process_proposal_block_sender.subscribe(),
finalized_blocks: self.finalized_block_sender.subscribe(),
}
}

// Sends a process proposal block event to the subscribers.
pub(super) fn send_process_proposal_block(&self, sequencer_block: Arc<SequencerBlock>) {
self.process_proposal_block_sender.send(sequencer_block);
}

// Sends a finalized block event to the subscribers.
pub(super) fn send_finalized_block(&self, sequencer_block_commit: Arc<FinalizeBlock>) {
self.finalized_block_sender.send(sequencer_block_commit);
}
}
Loading
Loading