-
Notifications
You must be signed in to change notification settings - Fork 77
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Implement Optimistic block grpc streams #1709
base: main
Are you sure you want to change the base?
Conversation
.wrap_err("failed to run post execute transactions handler")?; | ||
|
||
if let Some(optimistic_block_channels) = &self.optimistic_block_channels { | ||
if let Err(e) = optimistic_block_channels |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we do not want to fail process_proposal if sending on the watch fails.
@@ -892,7 +952,7 @@ impl App { | |||
proposer_address: account::Id, | |||
txs: Vec<bytes::Bytes>, | |||
tx_results: Vec<ExecTxResult>, | |||
) -> Result<()> { | |||
) -> Result<SequencerBlock> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe this would be faster since we do not have again read the SequencerBlock
from state when we want to send it via the optimistic block channels.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd rather not change this function, at least for now. As mentioned below, you are not in fact writing or reading to disk at this point.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@bharath-123 What was the decision for this? Why did we decide to keep it for now instead of just reading it from the ephemeral state?
@@ -942,7 +1002,7 @@ impl App { | |||
) | |||
.wrap_err("failed to convert block info and data to SequencerBlock")?; | |||
state_tx | |||
.put_sequencer_block(sequencer_block) | |||
.put_sequencer_block(sequencer_block.clone()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this may or may not be a bad idea, since we are cloning a potentially big struct? In my mind, reads and writes to memory are much faster than disk but I do not have a proper understanding on how the statedb works in our sequencer app.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are not writing to disk. You are at most incurrring another deserialization step.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as for my comment above: we briefly discussed this, but I don't recall why we decided to clone the block here instead of reading it from the ephemeral state.
}; | ||
|
||
let (tx, rx) = | ||
tokio::sync::mpsc::channel::<Result<GetOptimisticBlockStreamResponse, Status>>(128); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
128 seems arbitrary, but what is the best way to decide this number?
6359d42
to
28a62d9
Compare
This PR is blocked on #1707 |
|
||
match tx.send(Ok(get_optimistic_block_stream_response)).await { | ||
Ok(()) => { | ||
debug!("sent optimistic block"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we log this? this will log multiple times with each new listener of the stream.
28a62d9
to
61988df
Compare
@@ -1,5 +1,6 @@ | |||
pub mod block; | |||
pub mod celestia; | |||
pub mod optimistic_block; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Final proto definitions are being worked on at #1707
9ea11d9
to
1e06c3c
Compare
pub(crate) fn send_committed_block(&self, block: Option<SequencerBlockCommit>) { | ||
if self.committed_block_sender().receiver_count() > 0 { | ||
if let Err(e) = self.committed_block_sender.send(block) { | ||
error!(error = %e, "failed to send committed block"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should not error out process_proposal
or finalize_block
if sending a block on the watch fails
d066e78
to
b356f59
Compare
Summary
This PR implements the Optimistic block grpc stream apis protos of which were defined in #1519. These stream apis are used by auctioneer in order to optimistically simulate incoming bundles which are competing to be placed at the top-of-block for the given rollup.
Background
We have use cases where users of a given rollup would want to simulate a bundle on top of the latest block. The issue with simulating for rollup users is that the sequencer block times are too small and it would be difficult to simulate a bundle on top of the current latest block and submit it to the sequencer in time to get the bundle included in the next subsequent block.
To increase the time available for bundle simulation, the rollup can simulate on top of a block which has not yet been committed yet i.e a block which has not yet been finalized and is undergoing consensus.
This PR provides two grpc streams in order to enable that:
getOptimisticBlockStream
: This grpc method returns a stream given a rollup id. The returned stream produces the latest optimistic blocks which are blocks which have finishedprocess_proposal
.getBlockCommitmentStream
: This grpc methods returns a stream which produces the block hash and block height of the latest committed blocks. These are blocks which have finishedfinalize_block
.The Optimistic block grpc streams are flagged behind
ASTRIA_SEQUENCER_NO_OPTIMISTIC_BLOCKS
. Ideally the grpc streams would run on a dedicated full node which is not validating.Changes
ASTRIA_SEQUENCER_NO_OPTIMISTIC_BLOCKS
to the sequencer environment which feature flags the optimistic block grpc streamsEventBus
which can be subscribed to, to listen to blocks undergoing consensus in process_proposal and blocks being finalized in finalize_block.EventReceivers
andEventSender
types which are generic types which contains watches over which the specified data can be sent.OptimisticBlockFacade
which is the struct over which the optimistic block service apis are implemented.OptimisticBlockServiceInner
which is a struct which contains a Joinset of the stream tasks. This struct defines a long lived task which in turn spawns stream tasks on the joinset when clients try to subscribe to the optimistic block api streams.start_grpc_server
tocrate::grpc::mod.rs
and renamed it tostart_server
Testing
Deployed a sequencer network locally and wrote a grpc client to read from the stream.
Related Issues
closes #1563