forked from rollkit/rollkit
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
hacking rollkit to work with astria shared sequencer
- Loading branch information
1 parent
d91adbc
commit b618beb
Showing
12 changed files
with
1,490 additions
and
170 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,67 @@ | ||
package execution | ||
|
||
import ( | ||
log "log/slog" | ||
"net" | ||
"sync" | ||
|
||
astriaGrpc "buf.build/gen/go/astria/astria/grpc/go/astria/execution/v1alpha2/executionv1alpha2grpc" | ||
"google.golang.org/grpc" | ||
) | ||
|
||
// GRPCServerHandler is the gRPC server handler. | ||
// It gives us a way to attach the gRPC server to the node so it can be stopped on shutdown. | ||
type GRPCServerHandler struct { | ||
mu sync.Mutex | ||
|
||
endpoint string | ||
server *grpc.Server | ||
executionServiceServerV1a2 *astriaGrpc.ExecutionServiceServer | ||
} | ||
|
||
// NewServer creates a new gRPC server. | ||
// It registers the execution service server. | ||
// It registers the gRPC server with the node so it can be stopped on shutdown. | ||
func NewGRPCServerHandler(execServ astriaGrpc.ExecutionServiceServer, endpoint string) *GRPCServerHandler { | ||
server := grpc.NewServer() | ||
|
||
log.Info("gRPC server enabled", "endpoint", endpoint) | ||
|
||
handler := &GRPCServerHandler{ | ||
endpoint: endpoint, | ||
server: server, | ||
executionServiceServerV1a2: &execServ, | ||
} | ||
|
||
astriaGrpc.RegisterExecutionServiceServer(server, execServ) | ||
return handler | ||
} | ||
|
||
// Start starts the gRPC server if it is enabled. | ||
func (handler *GRPCServerHandler) Start() error { | ||
handler.mu.Lock() | ||
defer handler.mu.Unlock() | ||
|
||
if handler.endpoint == "" { | ||
return nil | ||
} | ||
|
||
// Start the gRPC server | ||
lis, err := net.Listen("tcp", handler.endpoint) | ||
if err != nil { | ||
return err | ||
} | ||
go handler.server.Serve(lis) | ||
log.Info("gRPC server started", "endpoint", handler.endpoint) | ||
return nil | ||
} | ||
|
||
// Stop stops the gRPC server. | ||
func (handler *GRPCServerHandler) Stop() error { | ||
handler.mu.Lock() | ||
defer handler.mu.Unlock() | ||
|
||
handler.server.GracefulStop() | ||
log.Info("gRPC server stopped", "endpoint", handler.endpoint) | ||
return nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,152 @@ | ||
package execution | ||
|
||
import ( | ||
"context" | ||
"sync" | ||
|
||
astriaGrpc "buf.build/gen/go/astria/astria/grpc/go/astria/execution/v1alpha2/executionv1alpha2grpc" | ||
astriaPb "buf.build/gen/go/astria/astria/protocolbuffers/go/astria/execution/v1alpha2" | ||
cmbytes "github.com/cometbft/cometbft/libs/bytes" | ||
"github.com/cometbft/cometbft/libs/log" | ||
"github.com/rollkit/rollkit/block" | ||
"github.com/rollkit/rollkit/store" | ||
"github.com/rollkit/rollkit/types" | ||
codes "google.golang.org/grpc/codes" | ||
status "google.golang.org/grpc/status" | ||
"google.golang.org/protobuf/types/known/timestamppb" | ||
) | ||
|
||
type ExecutionServiceServerV1Alpha2 struct { | ||
// NOTE - from the generated code: All implementations must embed | ||
// UnimplementedExecutionServiceServer for forward compatibility | ||
astriaGrpc.UnimplementedExecutionServiceServer | ||
|
||
store store.Store | ||
blockManager *block.SSManager | ||
logger log.Logger | ||
blockExecutionLock sync.Mutex | ||
} | ||
|
||
func NewExecutionServiceServerV1Alpha2(blockManager *block.SSManager, store store.Store, logger log.Logger) *ExecutionServiceServerV1Alpha2 { | ||
return &ExecutionServiceServerV1Alpha2{ | ||
blockManager: blockManager, | ||
store: store, | ||
logger: logger, | ||
} | ||
} | ||
|
||
// GetBlock retrieves a block by its identifier. | ||
func (s *ExecutionServiceServerV1Alpha2) GetBlock(ctx context.Context, req *astriaPb.GetBlockRequest) (*astriaPb.Block, error) { | ||
s.logger.Info("GetBlock called", "request", req) | ||
|
||
res, err := s.getBlockFromIdentifier(ctx, req.GetIdentifier()) | ||
if err != nil { | ||
s.logger.Error("failed finding block", err) | ||
return nil, err | ||
} | ||
|
||
s.logger.Info("GetBlock completed", "request", req, "response", res) | ||
return res, nil | ||
} | ||
|
||
// BatchGetBlocks will return an array of Blocks given an array of block identifiers. | ||
func (s *ExecutionServiceServerV1Alpha2) BatchGetBlocks(ctx context.Context, req *astriaPb.BatchGetBlocksRequest) (*astriaPb.BatchGetBlocksResponse, error) { | ||
s.logger.Info("BatchGetBlocks called", "request", req) | ||
var blocks []*astriaPb.Block | ||
|
||
ids := req.GetIdentifiers() | ||
for _, id := range ids { | ||
block, err := s.getBlockFromIdentifier(ctx, id) | ||
if err != nil { | ||
s.logger.Error("failed finding block with id", id, "error", err) | ||
return nil, err | ||
} | ||
|
||
blocks = append(blocks, block) | ||
} | ||
|
||
res := &astriaPb.BatchGetBlocksResponse{ | ||
Blocks: blocks, | ||
} | ||
|
||
s.logger.Info("BatchGetBlocks completed", "request", req, "response", res) | ||
return res, nil | ||
} | ||
|
||
// ExecuteBlock drives deterministic derivation of a rollup block from sequencer block data | ||
func (s *ExecutionServiceServerV1Alpha2) ExecuteBlock(ctx context.Context, req *astriaPb.ExecuteBlockRequest) (*astriaPb.Block, error) { | ||
select { | ||
case <-ctx.Done(): | ||
return nil, ctx.Err() | ||
default: | ||
} | ||
|
||
s.logger.Info("ExecuteBlock called", "request", req) | ||
|
||
s.blockExecutionLock.Lock() | ||
defer s.blockExecutionLock.Unlock() | ||
|
||
txs := make(types.Txs, len(req.Transactions)) | ||
for i := range txs { | ||
txs[i] = types.Tx(req.Transactions[i]) | ||
} | ||
|
||
block, err := s.blockManager.PublishBlock(ctx, types.Hash(req.PrevBlockHash), req.Timestamp.AsTime(), txs) | ||
if err != nil { | ||
s.logger.Error("failed to publish block to chain", "hash", block.Hash(), "prevHash", req.PrevBlockHash, "err", err) | ||
return nil, status.Error(codes.Internal, "failed to insert block to chain") | ||
} | ||
|
||
res := &astriaPb.Block{ | ||
Number: uint32(block.Height()), | ||
Hash: cmbytes.HexBytes(block.Hash()), | ||
ParentBlockHash: cmbytes.HexBytes(block.LastHeader()), | ||
Timestamp: timestamppb.New(block.Time()), | ||
} | ||
|
||
s.logger.Info("ExecuteBlock completed", "request", req, "response", res) | ||
return res, nil | ||
} | ||
|
||
// GetCommitmentState fetches the current CommitmentState of the chain. | ||
func (s *ExecutionServiceServerV1Alpha2) GetCommitmentState(ctx context.Context, req *astriaPb.GetCommitmentStateRequest) (*astriaPb.CommitmentState, error) { | ||
s.logger.Info("GetCommitmentState called", "request", req) | ||
return nil, nil | ||
} | ||
|
||
// UpdateCommitmentState replaces the whole CommitmentState with a new CommitmentState. | ||
func (s *ExecutionServiceServerV1Alpha2) UpdateCommitmentState(ctx context.Context, req *astriaPb.UpdateCommitmentStateRequest) (*astriaPb.CommitmentState, error) { | ||
s.logger.Info("UpdateCommitmentState called", "request", req) | ||
return nil, nil | ||
} | ||
|
||
func (s *ExecutionServiceServerV1Alpha2) getBlockFromIdentifier(ctx context.Context, identifier *astriaPb.BlockIdentifier) (*astriaPb.Block, error) { | ||
var block *types.Block | ||
var err error | ||
|
||
switch idType := identifier.Identifier.(type) { | ||
case *astriaPb.BlockIdentifier_BlockNumber: | ||
block, err = s.store.GetBlock(ctx, uint64(identifier.GetBlockNumber())) | ||
if err != nil { | ||
return nil, err | ||
} | ||
case *astriaPb.BlockIdentifier_BlockHash: | ||
block, err = s.store.GetBlockByHash(ctx, types.Hash(identifier.GetBlockHash())) | ||
if err != nil { | ||
return nil, err | ||
} | ||
default: | ||
return nil, status.Errorf(codes.InvalidArgument, "identifier has unexpected type %T", idType) | ||
} | ||
|
||
if block == nil { | ||
return nil, status.Errorf(codes.NotFound, "Couldn't locate block with identifier %s", identifier.Identifier) | ||
} | ||
|
||
return &astriaPb.Block{ | ||
Number: uint32(block.Height()), | ||
Hash: cmbytes.HexBytes(block.Hash()), | ||
ParentBlockHash: cmbytes.HexBytes(block.LastHeader()), | ||
Timestamp: timestamppb.New(block.Time()), | ||
}, nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,111 @@ | ||
package mempool | ||
|
||
import ( | ||
"fmt" | ||
"sync" | ||
|
||
"github.com/rollkit/rollkit/mempool" | ||
"github.com/rollkit/rollkit/types" | ||
|
||
"github.com/rollkit/rollkit/astria/sequencer" | ||
) | ||
|
||
type MempoolReaper struct { | ||
c *sequencer.Client | ||
mempool *mempool.CListMempool | ||
|
||
mu sync.Mutex | ||
started bool | ||
stopCh chan struct{} | ||
} | ||
|
||
func NewMempoolReaper(client *sequencer.Client, mempool *mempool.CListMempool) *MempoolReaper { | ||
return &MempoolReaper{ | ||
c: client, | ||
mempool: mempool, | ||
started: false, | ||
stopCh: make(chan struct{}), | ||
} | ||
} | ||
|
||
// reap tx from the mempool as they occur | ||
func (mr *MempoolReaper) Reap() { | ||
for { | ||
select { | ||
case <-mr.stopCh: | ||
return | ||
default: | ||
// wait for tx to be in mempool | ||
ch := mr.mempool.TxsWaitChan() | ||
<-ch | ||
|
||
// get first tx in pool | ||
tx0 := mr.mempool.TxsFront() | ||
TxNext: | ||
for { | ||
select { | ||
case <-mr.stopCh: | ||
return | ||
default: | ||
mempoolTx := tx0.Value.(*mempoolTx) | ||
|
||
// submit to shared sequencer | ||
res, err := mr.c.BroadcastTx(mempoolTx.tx) | ||
if err != nil { | ||
panic(fmt.Sprintf("error sending message: %s\n", err)) | ||
} | ||
println(res.Log) | ||
|
||
// wait for next tx | ||
tx0 = tx0.NextWait() | ||
|
||
// tx was last element and was removed (pool is empty?) | ||
if tx0 == nil { | ||
break TxNext | ||
} | ||
} | ||
} | ||
} | ||
} | ||
} | ||
|
||
func (mr *MempoolReaper) Start() error { | ||
mr.mu.Lock() | ||
defer mr.mu.Unlock() | ||
|
||
// Ensure Reap is only run once | ||
if mr.started { | ||
return nil | ||
} | ||
|
||
go mr.Reap() | ||
mr.started = true | ||
return nil | ||
} | ||
|
||
func (mr *MempoolReaper) Stop() error { | ||
mr.mu.Lock() | ||
defer mr.mu.Unlock() | ||
|
||
if !mr.started { | ||
return nil | ||
} | ||
|
||
close(mr.stopCh) | ||
mr.started = false | ||
return nil | ||
} | ||
|
||
// copied from rollkit clist_mempool.go | ||
//-------------------------------------------------------------------------------- | ||
|
||
// mempoolTx is a transaction that successfully ran | ||
type mempoolTx struct { | ||
height uint64 // height that this tx had been validated in | ||
gasWanted int64 // amount of gas this tx states it will require | ||
tx types.Tx // | ||
|
||
// ids of peers who've sent us this tx (as a map for quick lookups). | ||
// senders: PeerID -> bool | ||
senders sync.Map | ||
} |
Oops, something went wrong.