Skip to content

Commit

Permalink
feat(batcher): multi-frame altda channels
Browse files Browse the repository at this point in the history
  • Loading branch information
samlaf committed Nov 26, 2024
1 parent c898ed1 commit bb0ba93
Show file tree
Hide file tree
Showing 11 changed files with 76 additions and 55 deletions.
6 changes: 3 additions & 3 deletions op-batcher/batcher/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,22 +132,22 @@ func (c *channel) ID() derive.ChannelID {
// NextTxData should only be called after HasTxData returned true.
func (c *channel) NextTxData() txData {
nf := c.cfg.MaxFramesPerTx()
txdata := txData{frames: make([]frameData, 0, nf), asBlob: c.cfg.UseBlobs}
txdata := txData{frames: make([]frameData, 0, nf), daType: c.cfg.DaType}
for i := 0; i < nf && c.channelBuilder.HasPendingFrame(); i++ {
frame := c.channelBuilder.NextFrame()
txdata.frames = append(txdata.frames, frame)
}

id := txdata.ID().String()
c.log.Debug("returning next tx data", "id", id, "num_frames", len(txdata.frames), "as_blob", txdata.asBlob)
c.log.Debug("returning next tx data", "id", id, "num_frames", len(txdata.frames), "da_type", txdata.daType)
c.pendingTransactions[id] = txdata

return txdata
}

func (c *channel) HasTxData() bool {
if c.IsFull() || // If the channel is full, we should start to submit it
!c.cfg.UseBlobs { // If using calldata, we only send one frame per tx
c.cfg.DaType == DaTypeCalldata { // If using calldata, we only send one frame per tx
return c.channelBuilder.HasPendingFrame()
}
// Collect enough frames if channel is not full yet
Expand Down
11 changes: 7 additions & 4 deletions op-batcher/batcher/channel_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,12 @@ type ChannelConfig struct {
// BatchType indicates whether the channel uses SingularBatch or SpanBatch.
BatchType uint

// UseBlobs indicates that this channel should be sent as a multi-blob
// transaction with one blob per frame.
UseBlobs bool
// DaType indicates how the frames in this channel should be sent to the L1.
DaType DaType
}

func (cc ChannelConfig) UseBlobs() bool {
return cc.DaType == DaTypeBlob
}

// ChannelConfig returns a copy of the receiver.
Expand Down Expand Up @@ -93,7 +96,7 @@ func (cc *ChannelConfig) ReinitCompressorConfig() {
}

func (cc *ChannelConfig) MaxFramesPerTx() int {
if !cc.UseBlobs {
if cc.DaType == DaTypeCalldata {
return 1
}
return cc.TargetNumFrames
Expand Down
3 changes: 2 additions & 1 deletion op-batcher/batcher/channel_config_provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,12 @@ func TestDynamicEthChannelConfig_ChannelConfig(t *testing.T) {
calldataCfg := ChannelConfig{
MaxFrameSize: 120_000 - 1,
TargetNumFrames: 1,
DaType: DaTypeCalldata,
}
blobCfg := ChannelConfig{
MaxFrameSize: eth.MaxBlobDataSize - 1,
TargetNumFrames: 3, // gets closest to amortized fixed tx costs
UseBlobs: true,
DaType: DaTypeBlob,
}

tests := []struct {
Expand Down
10 changes: 5 additions & 5 deletions op-batcher/batcher/channel_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,16 +214,16 @@ func (s *channelManager) TxData(l1Head eth.BlockID) (txData, error) {
newCfg := s.cfgProvider.ChannelConfig()

// No change:
if newCfg.UseBlobs == s.defaultCfg.UseBlobs {
if newCfg.UseBlobs() == s.defaultCfg.UseBlobs() {
s.log.Debug("Recomputing optimal ChannelConfig: no need to switch DA type",
"useBlobs", s.defaultCfg.UseBlobs)
"useBlobs", s.defaultCfg.UseBlobs())
return s.nextTxData(channel)
}

// Change:
s.log.Info("Recomputing optimal ChannelConfig: changing DA type and requeing blocks...",
"useBlobsBefore", s.defaultCfg.UseBlobs,
"useBlobsAfter", newCfg.UseBlobs)
"useBlobsBefore", s.defaultCfg.UseBlobs(),
"useBlobsAfter", newCfg.UseBlobs())

// Invalidate the channel so its blocks
// get requeued:
Expand Down Expand Up @@ -326,7 +326,7 @@ func (s *channelManager) ensureChannelWithSpace(l1Head eth.BlockID) error {
"compression_algo", cfg.CompressorConfig.CompressionAlgo,
"target_num_frames", cfg.TargetNumFrames,
"max_frame_size", cfg.MaxFrameSize,
"use_blobs", cfg.UseBlobs,
"da_type", cfg.DaType,
)
s.metr.RecordChannelOpened(pc.ID(), s.blocks.Len())

Expand Down
9 changes: 5 additions & 4 deletions op-batcher/batcher/channel_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,11 +290,12 @@ func newFakeDynamicEthChannelConfig(lgr log.Logger,
calldataCfg := ChannelConfig{
MaxFrameSize: 120_000 - 1,
TargetNumFrames: 1,
DaType: DaTypeCalldata,
}
blobCfg := ChannelConfig{
MaxFrameSize: eth.MaxBlobDataSize - 1,
TargetNumFrames: 3, // gets closest to amortized fixed tx costs
UseBlobs: true,
DaType: DaTypeBlob,
}
calldataCfg.InitNoneCompressor()
blobCfg.InitNoneCompressor()
Expand Down Expand Up @@ -348,7 +349,7 @@ func TestChannelManager_TxData(t *testing.T) {

cfg.chooseBlobs = tc.chooseBlobsWhenChannelCreated
m := NewChannelManager(l, metrics.NoopMetrics, cfg, defaultTestRollupConfig)
require.Equal(t, tc.chooseBlobsWhenChannelCreated, m.defaultCfg.UseBlobs)
require.Equal(t, tc.chooseBlobsWhenChannelCreated, m.defaultCfg.DaType == DaTypeBlob)

// Seed channel manager with a block
rng := rand.New(rand.NewSource(99))
Expand Down Expand Up @@ -385,8 +386,8 @@ func TestChannelManager_TxData(t *testing.T) {
}

require.Equal(t, tc.numExpectedAssessments, cfg.assessments)
require.Equal(t, tc.chooseBlobsWhenChannelSubmitted, data.asBlob)
require.Equal(t, tc.chooseBlobsWhenChannelSubmitted, m.defaultCfg.UseBlobs)
require.Equal(t, tc.chooseBlobsWhenChannelSubmitted, data.daType == DaTypeBlob)
require.Equal(t, tc.chooseBlobsWhenChannelSubmitted, m.defaultCfg.DaType == DaTypeBlob)
})
}

Expand Down
4 changes: 2 additions & 2 deletions op-batcher/batcher/channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func TestChannel_NextTxData_singleFrameTx(t *testing.T) {
const n = 6
lgr := testlog.Logger(t, log.LevelWarn)
ch, err := newChannelWithChannelOut(lgr, metrics.NoopMetrics, ChannelConfig{
UseBlobs: false,
DaType: DaTypeCalldata,
TargetNumFrames: n,
CompressorConfig: compressor.Config{
CompressionAlgo: derive.Zlib,
Expand Down Expand Up @@ -172,7 +172,7 @@ func TestChannel_NextTxData_multiFrameTx(t *testing.T) {
const n = eth.MaxBlobsPerBlobTx
lgr := testlog.Logger(t, log.LevelWarn)
ch, err := newChannelWithChannelOut(lgr, metrics.NoopMetrics, ChannelConfig{
UseBlobs: true,
DaType: DaTypeBlob,
TargetNumFrames: n,
CompressorConfig: compressor.Config{
CompressionAlgo: derive.Zlib,
Expand Down
29 changes: 12 additions & 17 deletions op-batcher/batcher/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -765,14 +765,6 @@ func (l *BatchSubmitter) cancelBlockingTx(queue *txmgr.Queue[txRef], receiptsCh

// publishToAltDAAndL1 posts the txdata to the DA Provider and then sends the commitment to L1.
func (l *BatchSubmitter) publishToAltDAAndL1(txdata txData, queue *txmgr.Queue[txRef], receiptsCh chan txmgr.TxReceipt[txRef], daGroup *errgroup.Group) {
// sanity checks
if nf := len(txdata.frames); nf != 1 {
l.Log.Crit("Unexpected number of frames in calldata tx", "num_frames", nf)
}
if txdata.asBlob {
l.Log.Crit("Unexpected blob txdata with AltDA enabled")
}

// when posting txdata to an external DA Provider, we use a goroutine to avoid blocking the main loop
// since it may take a while for the request to return.
goroutineSpawned := daGroup.TryGo(func() error {
Expand Down Expand Up @@ -810,29 +802,32 @@ func (l *BatchSubmitter) publishToAltDAAndL1(txdata txData, queue *txmgr.Queue[t
// The method will block if the queue's MaxPendingTransactions is exceeded.
func (l *BatchSubmitter) sendTransaction(txdata txData, queue *txmgr.Queue[txRef], receiptsCh chan txmgr.TxReceipt[txRef], daGroup *errgroup.Group) error {
var err error

// if Alt DA is enabled we post the txdata to the DA Provider and replace it with the commitment.
if l.Config.UseAltDA {
var candidate *txmgr.TxCandidate
switch txdata.daType {
case DaTypeAltDA:
if !l.Config.UseAltDA {
l.Log.Crit("Received AltDA type txdata without AltDA being enabled")
}
// if Alt DA is enabled we post the txdata to the DA Provider and replace it with the commitment.
l.publishToAltDAAndL1(txdata, queue, receiptsCh, daGroup)
// we return nil to allow publishStateToL1 to keep processing the next txdata
return nil
}

var candidate *txmgr.TxCandidate
if txdata.asBlob {
case DaTypeBlob:
if candidate, err = l.blobTxCandidate(txdata); err != nil {
// We could potentially fall through and try a calldata tx instead, but this would
// likely result in the chain spending more in gas fees than it is tuned for, so best
// to just fail. We do not expect this error to trigger unless there is a serious bug
// or configuration issue.
return fmt.Errorf("could not create blob tx candidate: %w", err)
}
} else {
case DaTypeCalldata:
// sanity check
if nf := len(txdata.frames); nf != 1 {
l.Log.Crit("Unexpected number of frames in calldata tx", "num_frames", nf)
}
candidate = l.calldataTxCandidate(txdata.CallData())
default:
l.Log.Crit("Unknown DA type", "da_type", txdata.daType)
}

l.sendTx(txdata, false, candidate, queue, receiptsCh)
Expand All @@ -850,7 +845,7 @@ func (l *BatchSubmitter) sendTx(txdata txData, isCancel bool, candidate *txmgr.T
candidate.GasLimit = intrinsicGas
}

queue.Send(txRef{id: txdata.ID(), isCancel: isCancel, isBlob: txdata.asBlob}, *candidate, receiptsCh)
queue.Send(txRef{id: txdata.ID(), isCancel: isCancel, isBlob: txdata.daType == DaTypeBlob}, *candidate, receiptsCh)
}

func (l *BatchSubmitter) blobTxCandidate(data txData) (*txmgr.TxCandidate, error) {
Expand Down
40 changes: 25 additions & 15 deletions op-batcher/batcher/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,30 +218,40 @@ func (bs *BatcherService) initChannelConfig(cfg *CLIConfig) error {
TargetNumFrames: cfg.TargetNumFrames,
SubSafetyMargin: cfg.SubSafetyMargin,
BatchType: cfg.BatchType,
// DaType: set below
}

switch cfg.DataAvailabilityType {
case flags.BlobsType, flags.AutoType:
if !cfg.TestUseMaxTxSizeForBlobs {
// account for version byte prefix
cc.MaxFrameSize = eth.MaxBlobDataSize - 1
if bs.UseAltDA {
if cfg.DataAvailabilityType == flags.CalldataType {
cc.DaType = DaTypeAltDA
} else {
return fmt.Errorf("altDA is currently only supported with calldata DA Type")
}
cc.UseBlobs = true
case flags.CalldataType: // do nothing
default:
return fmt.Errorf("unknown data availability type: %v", cfg.DataAvailabilityType)
}
if cc.MaxFrameSize > altda.MaxInputSize {
return fmt.Errorf("max frame size %d exceeds altDA max input size %d", cc.MaxFrameSize, altda.MaxInputSize)
}
} else {

if bs.UseAltDA && cc.MaxFrameSize > altda.MaxInputSize {
return fmt.Errorf("max frame size %d exceeds altDA max input size %d", cc.MaxFrameSize, altda.MaxInputSize)
switch cfg.DataAvailabilityType {
case flags.BlobsType, flags.AutoType:
if !cfg.TestUseMaxTxSizeForBlobs {
// account for version byte prefix
cc.MaxFrameSize = eth.MaxBlobDataSize - 1
}
cc.DaType = DaTypeBlob
case flags.CalldataType: // do nothing
cc.DaType = DaTypeCalldata
default:
return fmt.Errorf("unknown data availability type: %v", cfg.DataAvailabilityType)
}
}

cc.InitCompressorConfig(cfg.ApproxComprRatio, cfg.Compressor, cfg.CompressionAlgo)

if cc.UseBlobs && !bs.RollupConfig.IsEcotone(uint64(time.Now().Unix())) {
if cc.UseBlobs() && !bs.RollupConfig.IsEcotone(uint64(time.Now().Unix())) {
return errors.New("cannot use Blobs before Ecotone")
}
if !cc.UseBlobs && bs.RollupConfig.IsEcotone(uint64(time.Now().Unix())) {
if !cc.UseBlobs() && bs.RollupConfig.IsEcotone(uint64(time.Now().Unix())) {
bs.Log.Warn("Ecotone upgrade is active, but batcher is not configured to use Blobs!")
}

Expand Down Expand Up @@ -273,7 +283,7 @@ func (bs *BatcherService) initChannelConfig(cfg *CLIConfig) error {
calldataCC := cc
calldataCC.TargetNumFrames = 1
calldataCC.MaxFrameSize = 120_000
calldataCC.UseBlobs = false
calldataCC.DaType = DaTypeCalldata
calldataCC.ReinitCompressorConfig()

bs.ChannelConfig = NewDynamicEthChannelConfig(bs.Log, 10*time.Second, bs.TxManager, cc, calldataCC)
Expand Down
2 changes: 1 addition & 1 deletion op-batcher/batcher/test_batch_submitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func (l *TestBatchSubmitter) JamTxPool(ctx context.Context) error {
var candidate *txmgr.TxCandidate
var err error
cc := l.state.cfgProvider.ChannelConfig()
if cc.UseBlobs {
if cc.UseBlobs() {
candidate = l.calldataTxCandidate([]byte{})
} else if candidate, err = l.blobTxCandidate(emptyTxData); err != nil {
return err
Expand Down
11 changes: 10 additions & 1 deletion op-batcher/batcher/tx_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,23 @@ import (
"github.com/ethereum-optimism/optimism/op-service/eth"
)

// DaType determines how txData is submitted to L1.
type DaType int

const (
DaTypeCalldata DaType = iota
DaTypeBlob
DaTypeAltDA
)

// txData represents the data for a single transaction.
//
// Note: The batcher currently sends exactly one frame per transaction. This
// might change in the future to allow for multiple frames from possibly
// different channels.
type txData struct {
frames []frameData
asBlob bool // indicates whether this should be sent as blob
daType DaType
}

func singleFrameTxData(frame frameData) txData {
Expand Down
6 changes: 4 additions & 2 deletions op-batcher/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,10 @@ var (
EnvVars: prefixEnvVars("MAX_BLOCKS_PER_SPAN_BATCH"),
}
TargetNumFramesFlag = &cli.IntFlag{
Name: "target-num-frames",
Usage: "The target number of frames to create per channel. Controls number of blobs per blob tx, if using Blob DA.",
Name: "target-num-frames",
Usage: "The target number of frames to create per channel. " +
"Controls number of blobs per blob tx, if using Blob DA, " +
"or number of frames per blob, if using altDA.",
Value: 1,
EnvVars: prefixEnvVars("TARGET_NUM_FRAMES"),
}
Expand Down

0 comments on commit bb0ba93

Please sign in to comment.