From bb0ba934bac03728e885ca873cb9b5105ad8920b Mon Sep 17 00:00:00 2001 From: Samuel Laferriere Date: Wed, 9 Oct 2024 15:23:15 +0100 Subject: [PATCH] feat(batcher): multi-frame altda channels --- op-batcher/batcher/channel.go | 6 +-- op-batcher/batcher/channel_config.go | 11 +++-- .../batcher/channel_config_provider_test.go | 3 +- op-batcher/batcher/channel_manager.go | 10 ++--- op-batcher/batcher/channel_manager_test.go | 9 +++-- op-batcher/batcher/channel_test.go | 4 +- op-batcher/batcher/driver.go | 29 ++++++-------- op-batcher/batcher/service.go | 40 ++++++++++++------- op-batcher/batcher/test_batch_submitter.go | 2 +- op-batcher/batcher/tx_data.go | 11 ++++- op-batcher/flags/flags.go | 6 ++- 11 files changed, 76 insertions(+), 55 deletions(-) diff --git a/op-batcher/batcher/channel.go b/op-batcher/batcher/channel.go index 95abcb46a7fa..17278590b727 100644 --- a/op-batcher/batcher/channel.go +++ b/op-batcher/batcher/channel.go @@ -132,14 +132,14 @@ func (c *channel) ID() derive.ChannelID { // NextTxData should only be called after HasTxData returned true. func (c *channel) NextTxData() txData { nf := c.cfg.MaxFramesPerTx() - txdata := txData{frames: make([]frameData, 0, nf), asBlob: c.cfg.UseBlobs} + txdata := txData{frames: make([]frameData, 0, nf), daType: c.cfg.DaType} for i := 0; i < nf && c.channelBuilder.HasPendingFrame(); i++ { frame := c.channelBuilder.NextFrame() txdata.frames = append(txdata.frames, frame) } id := txdata.ID().String() - c.log.Debug("returning next tx data", "id", id, "num_frames", len(txdata.frames), "as_blob", txdata.asBlob) + c.log.Debug("returning next tx data", "id", id, "num_frames", len(txdata.frames), "da_type", txdata.daType) c.pendingTransactions[id] = txdata return txdata @@ -147,7 +147,7 @@ func (c *channel) NextTxData() txData { func (c *channel) HasTxData() bool { if c.IsFull() || // If the channel is full, we should start to submit it - !c.cfg.UseBlobs { // If using calldata, we only send one frame per tx + c.cfg.DaType == DaTypeCalldata { // If using calldata, we only send one frame per tx return c.channelBuilder.HasPendingFrame() } // Collect enough frames if channel is not full yet diff --git a/op-batcher/batcher/channel_config.go b/op-batcher/batcher/channel_config.go index e62ea26eee45..dee7fa09f0b3 100644 --- a/op-batcher/batcher/channel_config.go +++ b/op-batcher/batcher/channel_config.go @@ -46,9 +46,12 @@ type ChannelConfig struct { // BatchType indicates whether the channel uses SingularBatch or SpanBatch. BatchType uint - // UseBlobs indicates that this channel should be sent as a multi-blob - // transaction with one blob per frame. - UseBlobs bool + // DaType indicates how the frames in this channel should be sent to the L1. + DaType DaType +} + +func (cc ChannelConfig) UseBlobs() bool { + return cc.DaType == DaTypeBlob } // ChannelConfig returns a copy of the receiver. @@ -93,7 +96,7 @@ func (cc *ChannelConfig) ReinitCompressorConfig() { } func (cc *ChannelConfig) MaxFramesPerTx() int { - if !cc.UseBlobs { + if cc.DaType == DaTypeCalldata { return 1 } return cc.TargetNumFrames diff --git a/op-batcher/batcher/channel_config_provider_test.go b/op-batcher/batcher/channel_config_provider_test.go index 169d122e210a..a8f4af60f49b 100644 --- a/op-batcher/batcher/channel_config_provider_test.go +++ b/op-batcher/batcher/channel_config_provider_test.go @@ -31,11 +31,12 @@ func TestDynamicEthChannelConfig_ChannelConfig(t *testing.T) { calldataCfg := ChannelConfig{ MaxFrameSize: 120_000 - 1, TargetNumFrames: 1, + DaType: DaTypeCalldata, } blobCfg := ChannelConfig{ MaxFrameSize: eth.MaxBlobDataSize - 1, TargetNumFrames: 3, // gets closest to amortized fixed tx costs - UseBlobs: true, + DaType: DaTypeBlob, } tests := []struct { diff --git a/op-batcher/batcher/channel_manager.go b/op-batcher/batcher/channel_manager.go index 81ee0fb35a51..e3c6b73c3038 100644 --- a/op-batcher/batcher/channel_manager.go +++ b/op-batcher/batcher/channel_manager.go @@ -214,16 +214,16 @@ func (s *channelManager) TxData(l1Head eth.BlockID) (txData, error) { newCfg := s.cfgProvider.ChannelConfig() // No change: - if newCfg.UseBlobs == s.defaultCfg.UseBlobs { + if newCfg.UseBlobs() == s.defaultCfg.UseBlobs() { s.log.Debug("Recomputing optimal ChannelConfig: no need to switch DA type", - "useBlobs", s.defaultCfg.UseBlobs) + "useBlobs", s.defaultCfg.UseBlobs()) return s.nextTxData(channel) } // Change: s.log.Info("Recomputing optimal ChannelConfig: changing DA type and requeing blocks...", - "useBlobsBefore", s.defaultCfg.UseBlobs, - "useBlobsAfter", newCfg.UseBlobs) + "useBlobsBefore", s.defaultCfg.UseBlobs(), + "useBlobsAfter", newCfg.UseBlobs()) // Invalidate the channel so its blocks // get requeued: @@ -326,7 +326,7 @@ func (s *channelManager) ensureChannelWithSpace(l1Head eth.BlockID) error { "compression_algo", cfg.CompressorConfig.CompressionAlgo, "target_num_frames", cfg.TargetNumFrames, "max_frame_size", cfg.MaxFrameSize, - "use_blobs", cfg.UseBlobs, + "da_type", cfg.DaType, ) s.metr.RecordChannelOpened(pc.ID(), s.blocks.Len()) diff --git a/op-batcher/batcher/channel_manager_test.go b/op-batcher/batcher/channel_manager_test.go index 32aae1b06dd1..5158c5e34ee5 100644 --- a/op-batcher/batcher/channel_manager_test.go +++ b/op-batcher/batcher/channel_manager_test.go @@ -290,11 +290,12 @@ func newFakeDynamicEthChannelConfig(lgr log.Logger, calldataCfg := ChannelConfig{ MaxFrameSize: 120_000 - 1, TargetNumFrames: 1, + DaType: DaTypeCalldata, } blobCfg := ChannelConfig{ MaxFrameSize: eth.MaxBlobDataSize - 1, TargetNumFrames: 3, // gets closest to amortized fixed tx costs - UseBlobs: true, + DaType: DaTypeBlob, } calldataCfg.InitNoneCompressor() blobCfg.InitNoneCompressor() @@ -348,7 +349,7 @@ func TestChannelManager_TxData(t *testing.T) { cfg.chooseBlobs = tc.chooseBlobsWhenChannelCreated m := NewChannelManager(l, metrics.NoopMetrics, cfg, defaultTestRollupConfig) - require.Equal(t, tc.chooseBlobsWhenChannelCreated, m.defaultCfg.UseBlobs) + require.Equal(t, tc.chooseBlobsWhenChannelCreated, m.defaultCfg.DaType == DaTypeBlob) // Seed channel manager with a block rng := rand.New(rand.NewSource(99)) @@ -385,8 +386,8 @@ func TestChannelManager_TxData(t *testing.T) { } require.Equal(t, tc.numExpectedAssessments, cfg.assessments) - require.Equal(t, tc.chooseBlobsWhenChannelSubmitted, data.asBlob) - require.Equal(t, tc.chooseBlobsWhenChannelSubmitted, m.defaultCfg.UseBlobs) + require.Equal(t, tc.chooseBlobsWhenChannelSubmitted, data.daType == DaTypeBlob) + require.Equal(t, tc.chooseBlobsWhenChannelSubmitted, m.defaultCfg.DaType == DaTypeBlob) }) } diff --git a/op-batcher/batcher/channel_test.go b/op-batcher/batcher/channel_test.go index b36ce9311bce..5e1a4414ada6 100644 --- a/op-batcher/batcher/channel_test.go +++ b/op-batcher/batcher/channel_test.go @@ -131,7 +131,7 @@ func TestChannel_NextTxData_singleFrameTx(t *testing.T) { const n = 6 lgr := testlog.Logger(t, log.LevelWarn) ch, err := newChannelWithChannelOut(lgr, metrics.NoopMetrics, ChannelConfig{ - UseBlobs: false, + DaType: DaTypeCalldata, TargetNumFrames: n, CompressorConfig: compressor.Config{ CompressionAlgo: derive.Zlib, @@ -172,7 +172,7 @@ func TestChannel_NextTxData_multiFrameTx(t *testing.T) { const n = eth.MaxBlobsPerBlobTx lgr := testlog.Logger(t, log.LevelWarn) ch, err := newChannelWithChannelOut(lgr, metrics.NoopMetrics, ChannelConfig{ - UseBlobs: true, + DaType: DaTypeBlob, TargetNumFrames: n, CompressorConfig: compressor.Config{ CompressionAlgo: derive.Zlib, diff --git a/op-batcher/batcher/driver.go b/op-batcher/batcher/driver.go index 93c1425c58bc..5b060cfbd5f4 100644 --- a/op-batcher/batcher/driver.go +++ b/op-batcher/batcher/driver.go @@ -765,14 +765,6 @@ func (l *BatchSubmitter) cancelBlockingTx(queue *txmgr.Queue[txRef], receiptsCh // publishToAltDAAndL1 posts the txdata to the DA Provider and then sends the commitment to L1. func (l *BatchSubmitter) publishToAltDAAndL1(txdata txData, queue *txmgr.Queue[txRef], receiptsCh chan txmgr.TxReceipt[txRef], daGroup *errgroup.Group) { - // sanity checks - if nf := len(txdata.frames); nf != 1 { - l.Log.Crit("Unexpected number of frames in calldata tx", "num_frames", nf) - } - if txdata.asBlob { - l.Log.Crit("Unexpected blob txdata with AltDA enabled") - } - // when posting txdata to an external DA Provider, we use a goroutine to avoid blocking the main loop // since it may take a while for the request to return. goroutineSpawned := daGroup.TryGo(func() error { @@ -810,16 +802,17 @@ func (l *BatchSubmitter) publishToAltDAAndL1(txdata txData, queue *txmgr.Queue[t // The method will block if the queue's MaxPendingTransactions is exceeded. func (l *BatchSubmitter) sendTransaction(txdata txData, queue *txmgr.Queue[txRef], receiptsCh chan txmgr.TxReceipt[txRef], daGroup *errgroup.Group) error { var err error - - // if Alt DA is enabled we post the txdata to the DA Provider and replace it with the commitment. - if l.Config.UseAltDA { + var candidate *txmgr.TxCandidate + switch txdata.daType { + case DaTypeAltDA: + if !l.Config.UseAltDA { + l.Log.Crit("Received AltDA type txdata without AltDA being enabled") + } + // if Alt DA is enabled we post the txdata to the DA Provider and replace it with the commitment. l.publishToAltDAAndL1(txdata, queue, receiptsCh, daGroup) // we return nil to allow publishStateToL1 to keep processing the next txdata return nil - } - - var candidate *txmgr.TxCandidate - if txdata.asBlob { + case DaTypeBlob: if candidate, err = l.blobTxCandidate(txdata); err != nil { // We could potentially fall through and try a calldata tx instead, but this would // likely result in the chain spending more in gas fees than it is tuned for, so best @@ -827,12 +820,14 @@ func (l *BatchSubmitter) sendTransaction(txdata txData, queue *txmgr.Queue[txRef // or configuration issue. return fmt.Errorf("could not create blob tx candidate: %w", err) } - } else { + case DaTypeCalldata: // sanity check if nf := len(txdata.frames); nf != 1 { l.Log.Crit("Unexpected number of frames in calldata tx", "num_frames", nf) } candidate = l.calldataTxCandidate(txdata.CallData()) + default: + l.Log.Crit("Unknown DA type", "da_type", txdata.daType) } l.sendTx(txdata, false, candidate, queue, receiptsCh) @@ -850,7 +845,7 @@ func (l *BatchSubmitter) sendTx(txdata txData, isCancel bool, candidate *txmgr.T candidate.GasLimit = intrinsicGas } - queue.Send(txRef{id: txdata.ID(), isCancel: isCancel, isBlob: txdata.asBlob}, *candidate, receiptsCh) + queue.Send(txRef{id: txdata.ID(), isCancel: isCancel, isBlob: txdata.daType == DaTypeBlob}, *candidate, receiptsCh) } func (l *BatchSubmitter) blobTxCandidate(data txData) (*txmgr.TxCandidate, error) { diff --git a/op-batcher/batcher/service.go b/op-batcher/batcher/service.go index 1b150642261f..65cdc8d52963 100644 --- a/op-batcher/batcher/service.go +++ b/op-batcher/batcher/service.go @@ -218,30 +218,40 @@ func (bs *BatcherService) initChannelConfig(cfg *CLIConfig) error { TargetNumFrames: cfg.TargetNumFrames, SubSafetyMargin: cfg.SubSafetyMargin, BatchType: cfg.BatchType, + // DaType: set below } - switch cfg.DataAvailabilityType { - case flags.BlobsType, flags.AutoType: - if !cfg.TestUseMaxTxSizeForBlobs { - // account for version byte prefix - cc.MaxFrameSize = eth.MaxBlobDataSize - 1 + if bs.UseAltDA { + if cfg.DataAvailabilityType == flags.CalldataType { + cc.DaType = DaTypeAltDA + } else { + return fmt.Errorf("altDA is currently only supported with calldata DA Type") } - cc.UseBlobs = true - case flags.CalldataType: // do nothing - default: - return fmt.Errorf("unknown data availability type: %v", cfg.DataAvailabilityType) - } + if cc.MaxFrameSize > altda.MaxInputSize { + return fmt.Errorf("max frame size %d exceeds altDA max input size %d", cc.MaxFrameSize, altda.MaxInputSize) + } + } else { - if bs.UseAltDA && cc.MaxFrameSize > altda.MaxInputSize { - return fmt.Errorf("max frame size %d exceeds altDA max input size %d", cc.MaxFrameSize, altda.MaxInputSize) + switch cfg.DataAvailabilityType { + case flags.BlobsType, flags.AutoType: + if !cfg.TestUseMaxTxSizeForBlobs { + // account for version byte prefix + cc.MaxFrameSize = eth.MaxBlobDataSize - 1 + } + cc.DaType = DaTypeBlob + case flags.CalldataType: // do nothing + cc.DaType = DaTypeCalldata + default: + return fmt.Errorf("unknown data availability type: %v", cfg.DataAvailabilityType) + } } cc.InitCompressorConfig(cfg.ApproxComprRatio, cfg.Compressor, cfg.CompressionAlgo) - if cc.UseBlobs && !bs.RollupConfig.IsEcotone(uint64(time.Now().Unix())) { + if cc.UseBlobs() && !bs.RollupConfig.IsEcotone(uint64(time.Now().Unix())) { return errors.New("cannot use Blobs before Ecotone") } - if !cc.UseBlobs && bs.RollupConfig.IsEcotone(uint64(time.Now().Unix())) { + if !cc.UseBlobs() && bs.RollupConfig.IsEcotone(uint64(time.Now().Unix())) { bs.Log.Warn("Ecotone upgrade is active, but batcher is not configured to use Blobs!") } @@ -273,7 +283,7 @@ func (bs *BatcherService) initChannelConfig(cfg *CLIConfig) error { calldataCC := cc calldataCC.TargetNumFrames = 1 calldataCC.MaxFrameSize = 120_000 - calldataCC.UseBlobs = false + calldataCC.DaType = DaTypeCalldata calldataCC.ReinitCompressorConfig() bs.ChannelConfig = NewDynamicEthChannelConfig(bs.Log, 10*time.Second, bs.TxManager, cc, calldataCC) diff --git a/op-batcher/batcher/test_batch_submitter.go b/op-batcher/batcher/test_batch_submitter.go index 9ff5ca69796f..deb9fd245c11 100644 --- a/op-batcher/batcher/test_batch_submitter.go +++ b/op-batcher/batcher/test_batch_submitter.go @@ -28,7 +28,7 @@ func (l *TestBatchSubmitter) JamTxPool(ctx context.Context) error { var candidate *txmgr.TxCandidate var err error cc := l.state.cfgProvider.ChannelConfig() - if cc.UseBlobs { + if cc.UseBlobs() { candidate = l.calldataTxCandidate([]byte{}) } else if candidate, err = l.blobTxCandidate(emptyTxData); err != nil { return err diff --git a/op-batcher/batcher/tx_data.go b/op-batcher/batcher/tx_data.go index d0f5474fd5f2..1dccf2b3f29a 100644 --- a/op-batcher/batcher/tx_data.go +++ b/op-batcher/batcher/tx_data.go @@ -8,6 +8,15 @@ import ( "github.com/ethereum-optimism/optimism/op-service/eth" ) +// DaType determines how txData is submitted to L1. +type DaType int + +const ( + DaTypeCalldata DaType = iota + DaTypeBlob + DaTypeAltDA +) + // txData represents the data for a single transaction. // // Note: The batcher currently sends exactly one frame per transaction. This @@ -15,7 +24,7 @@ import ( // different channels. type txData struct { frames []frameData - asBlob bool // indicates whether this should be sent as blob + daType DaType } func singleFrameTxData(frame frameData) txData { diff --git a/op-batcher/flags/flags.go b/op-batcher/flags/flags.go index d5681ea87234..d87cafd63c06 100644 --- a/op-batcher/flags/flags.go +++ b/op-batcher/flags/flags.go @@ -82,8 +82,10 @@ var ( EnvVars: prefixEnvVars("MAX_BLOCKS_PER_SPAN_BATCH"), } TargetNumFramesFlag = &cli.IntFlag{ - Name: "target-num-frames", - Usage: "The target number of frames to create per channel. Controls number of blobs per blob tx, if using Blob DA.", + Name: "target-num-frames", + Usage: "The target number of frames to create per channel. " + + "Controls number of blobs per blob tx, if using Blob DA, " + + "or number of frames per blob, if using altDA.", Value: 1, EnvVars: prefixEnvVars("TARGET_NUM_FRAMES"), }