Skip to content

Commit

Permalink
[v2] Inabox test (#958)
Browse files Browse the repository at this point in the history
  • Loading branch information
ian-shim authored Dec 11, 2024
1 parent 980a866 commit eeae72a
Show file tree
Hide file tree
Showing 33 changed files with 1,040 additions and 191 deletions.
40 changes: 26 additions & 14 deletions api/clients/disperser_client_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package clients
import (
"context"
"fmt"
"math/big"
"sync"

"github.com/Layr-Labs/eigenda/api"
Expand Down Expand Up @@ -124,15 +125,17 @@ func (c *disperserClientV2) DisperseBlob(
if c.signer == nil {
return nil, [32]byte{}, api.NewErrorInternal("uninitialized signer for authenticated dispersal")
}
if c.accountant == nil {
return nil, [32]byte{}, api.NewErrorInternal("uninitialized accountant for paid dispersal; make sure to call PopulateAccountant after creating the client")
}

symbolLength := encoding.GetBlobLengthPowerOf2(uint(len(data)))
payment, err := c.accountant.AccountBlob(ctx, uint64(symbolLength), quorums, salt)
if err != nil {
return nil, [32]byte{}, fmt.Errorf("error accounting blob: %w", err)
}
// TODO(hopeyen): uncomment this after the accountant is implemented
// if c.accountant == nil {
// return nil, [32]byte{}, api.NewErrorInternal("uninitialized accountant for paid dispersal; make sure to call PopulateAccountant after creating the client")
// }

// symbolLength := encoding.GetBlobLengthPowerOf2(uint(len(data)))
// payment, err := c.accountant.AccountBlob(ctx, uint64(symbolLength), quorums, salt)
// if err != nil {
// return nil, [32]byte{}, fmt.Errorf("error accounting blob: %w", err)
// }

if len(quorums) == 0 {
return nil, [32]byte{}, api.NewErrorInvalidArg("quorum numbers must be provided")
Expand Down Expand Up @@ -171,17 +174,26 @@ func (c *disperserClientV2) DisperseBlob(
}
}

var payment core.PaymentMetadata
accountId, err := c.signer.GetAccountID()
if err != nil {
return nil, [32]byte{}, api.NewErrorInvalidArg(fmt.Sprintf("please configure signer key if you want to use authenticated endpoint %v", err))
}
payment.AccountID = accountId
payment.ReservationPeriod = 0
payment.CumulativePayment = big.NewInt(0)
blobHeader := &corev2.BlobHeader{
BlobVersion: blobVersion,
BlobCommitments: blobCommitments,
QuorumNumbers: quorums,
PaymentMetadata: *payment,
}
sig, err := c.signer.SignBlobRequest(blobHeader)
if err != nil {
return nil, [32]byte{}, fmt.Errorf("error signing blob request: %w", err)
PaymentMetadata: payment,
}
blobHeader.Signature = sig
// TODO(hopeyen): uncomment this and replace the payment metadata for authentication
// sig, err := c.signer.SignBlobRequest(blobHeader)
// if err != nil {
// return nil, [32]byte{}, fmt.Errorf("error signing blob request: %w", err)
// }
// blobHeader.Signature = sig
blobHeaderProto, err := blobHeader.ToProtobuf()
if err != nil {
return nil, [32]byte{}, fmt.Errorf("error converting blob header to protobuf: %w", err)
Expand Down
10 changes: 10 additions & 0 deletions core/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,9 @@ type PaymentMetadata struct {

// Hash returns the Keccak256 hash of the PaymentMetadata
func (pm *PaymentMetadata) Hash() ([32]byte, error) {
if pm == nil {
return [32]byte{}, errors.New("payment metadata is nil")
}
blobHeaderType, err := abi.NewType("tuple", "", []abi.ArgumentMarshaling{
{
Name: "accountID",
Expand Down Expand Up @@ -543,6 +546,10 @@ func (pm *PaymentMetadata) Hash() ([32]byte, error) {
}

func (pm *PaymentMetadata) MarshalDynamoDBAttributeValue() (types.AttributeValue, error) {
if pm == nil {
return nil, errors.New("payment metadata is nil")
}

return &types.AttributeValueMemberM{
Value: map[string]types.AttributeValue{
"AccountID": &types.AttributeValueMemberS{Value: pm.AccountID},
Expand Down Expand Up @@ -576,6 +583,9 @@ func (pm *PaymentMetadata) UnmarshalDynamoDBAttributeValue(av types.AttributeVal
}

func (pm *PaymentMetadata) ToProtobuf() *commonpb.PaymentHeader {
if pm == nil {
return nil
}
return &commonpb.PaymentHeader{
AccountId: pm.AccountID,
ReservationPeriod: pm.ReservationPeriod,
Expand Down
16 changes: 15 additions & 1 deletion core/thegraph/state_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"flag"
"fmt"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -58,9 +59,22 @@ func setup() {
fmt.Println("Deploying experiment")
testConfig.DeployExperiment()

pk := testConfig.Pks.EcdsaMap["default"].PrivateKey
pk = strings.TrimPrefix(pk, "0x")
pk = strings.TrimPrefix(pk, "0X")
ethClient, err := geth.NewMultiHomingClient(geth.EthClientConfig{
RPCURLs: []string{testConfig.Deployers[0].RPC},
PrivateKeyString: pk,
NumConfirmations: 0,
NumRetries: 1,
}, gethcommon.Address{}, logging.NewNoopLogger())
if err != nil {
panic(err)
}
_ = testConfig.RegisterBlobVersionAndRelays(ethClient)

fmt.Println("Starting binaries")
testConfig.StartBinaries()

}

func teardown() {
Expand Down
2 changes: 1 addition & 1 deletion disperser/Makefile
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
clean:
rm -rf ./bin

build: build_server build_batcher build_encoder build_dataapi
build: build_server build_batcher build_encoder build_dataapi build_controller

build_batcher:
go build -o ./bin/batcher ./cmd/batcher
Expand Down
51 changes: 25 additions & 26 deletions disperser/apiserver/disperse_blob_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,10 @@ package apiserver
import (
"context"
"fmt"
"math/big"
"time"

"github.com/Layr-Labs/eigenda/api"
pb "github.com/Layr-Labs/eigenda/api/grpc/disperser/v2"
"github.com/Layr-Labs/eigenda/core"
corev2 "github.com/Layr-Labs/eigenda/core/v2"
dispv2 "github.com/Layr-Labs/eigenda/disperser/common/v2"
"github.com/Layr-Labs/eigenda/encoding"
Expand Down Expand Up @@ -123,33 +121,34 @@ func (s *DispersalServerV2) validateDispersalRequest(ctx context.Context, req *p
if err != nil {
return api.NewErrorInvalidArg(fmt.Sprintf("invalid blob header: %s", err.Error()))
}
if err = s.authenticator.AuthenticateBlobRequest(blobHeader); err != nil {
return api.NewErrorInvalidArg(fmt.Sprintf("authentication failed: %s", err.Error()))
}
// TODO(ian-shim): enable this check for authentication
// if blobHeader.PaymentMetadata == nil {
// return api.NewErrorInvalidArg("payment metadata is required")
// }
// if err = s.authenticator.AuthenticateBlobRequest(blobHeader); err != nil {
// return api.NewErrorInvalidArg(fmt.Sprintf("authentication failed: %s", err.Error()))
// }

if len(blobHeader.PaymentMetadata.AccountID) == 0 || blobHeader.PaymentMetadata.ReservationPeriod == 0 || blobHeader.PaymentMetadata.CumulativePayment == nil {
return api.NewErrorInvalidArg("invalid payment metadata")
}
// TODO(ian-shim): enable this check when we have payment metadata + authentication in disperser client
// if len(blobHeader.PaymentMetadata.AccountID) == 0 || blobHeader.PaymentMetadata.ReservationPeriod == 0 || blobHeader.PaymentMetadata.CumulativePayment == nil {
// return api.NewErrorInvalidArg("invalid payment metadata")
// }

// handle payments and check rate limits
if blobHeaderProto.GetPaymentHeader() != nil {
reservationPeriod := blobHeaderProto.GetPaymentHeader().GetReservationPeriod()
cumulativePayment := new(big.Int).SetBytes(blobHeaderProto.GetPaymentHeader().GetCumulativePayment())
accountID := blobHeaderProto.GetPaymentHeader().GetAccountId()

paymentHeader := core.PaymentMetadata{
AccountID: accountID,
ReservationPeriod: reservationPeriod,
CumulativePayment: cumulativePayment,
}

err := s.meterer.MeterRequest(ctx, paymentHeader, blobLength, blobHeader.QuorumNumbers)
if err != nil {
return api.NewErrorResourceExhausted(err.Error())
}
} else {
return api.NewErrorInvalidArg("payment header is required")
}
// reservationPeriod := blobHeaderProto.GetPaymentHeader().GetReservationPeriod()
// cumulativePayment := new(big.Int).SetBytes(blobHeaderProto.GetPaymentHeader().GetCumulativePayment())
// accountID := blobHeaderProto.GetPaymentHeader().GetAccountId()

// paymentHeader := core.PaymentMetadata{
// AccountID: accountID,
// ReservationPeriod: reservationPeriod,
// CumulativePayment: cumulativePayment,
// }

// err := s.meterer.MeterRequest(ctx, paymentHeader, blobLength, blobHeader.QuorumNumbers)
// if err != nil {
// return api.NewErrorResourceExhausted(err.Error())
// }

commitments, err := s.prover.GetCommitmentsForPaddedLength(data)
if err != nil {
Expand Down
11 changes: 8 additions & 3 deletions disperser/apiserver/server_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@ import (
"context"
"crypto/rand"
"fmt"
"github.com/prometheus/client_golang/prometheus"
"math/big"
"net"
"testing"
"time"

"github.com/prometheus/client_golang/prometheus"

"github.com/Layr-Labs/eigenda/common/aws"
"github.com/Layr-Labs/eigenda/common/aws/dynamodb"
"github.com/Layr-Labs/eigenda/common/aws/s3"
Expand Down Expand Up @@ -203,7 +204,9 @@ func TestV2DisperseBlobRequestValidation(t *testing.T) {
Data: data,
BlobHeader: invalidReqProto,
})
assert.ErrorContains(t, err, "authentication failed")
// TODO(hopeyen); re-enable this validation after adding signature verification
// assert.ErrorContains(t, err, "authentication failed")
assert.NoError(t, err)

// request with invalid payment metadata
invalidReqProto = &pbcommonv2.BlobHeader{
Expand All @@ -226,7 +229,9 @@ func TestV2DisperseBlobRequestValidation(t *testing.T) {
Data: data,
BlobHeader: invalidReqProto,
})
assert.ErrorContains(t, err, "invalid payment metadata")
// TODO(ian-shim): re-enable this validation after fixing the payment metadata validation
// assert.ErrorContains(t, err, "invalid payment metadata")
assert.NoError(t, err)

// request with invalid commitment
invalidCommitment := commitmentProto
Expand Down
4 changes: 3 additions & 1 deletion disperser/controller/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,7 @@ func (d *Dispatcher) NewBatch(ctx context.Context, referenceBlockNumber uint64)
return nil, fmt.Errorf("failed to get blob metadata by status: %w", err)
}

d.logger.Debug("got new metadatas to make batch", "numBlobs", len(blobMetadatas))
if len(blobMetadatas) == 0 {
return nil, errNoBlobsToDispatch
}
Expand Down Expand Up @@ -317,7 +318,7 @@ func (d *Dispatcher) NewBatch(ctx context.Context, referenceBlockNumber uint64)
}

if len(certs) != len(keys) {
return nil, fmt.Errorf("blob certificates not found for all blob keys")
return nil, fmt.Errorf("blob certificates (%d) not found for all blob keys (%d)", len(certs), len(keys))
}

certsMap := make(map[corev2.BlobKey]*corev2.BlobCertificate, len(certs))
Expand Down Expand Up @@ -410,6 +411,7 @@ func (d *Dispatcher) NewBatch(ctx context.Context, referenceBlockNumber uint64)
d.cursor = cursor
}

d.logger.Debug("new batch", "referenceBlockNumber", referenceBlockNumber, "numBlobs", len(certs))
return &batchData{
Batch: &corev2.Batch{
BatchHeader: batchHeader,
Expand Down
4 changes: 3 additions & 1 deletion disperser/controller/encoding_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@ import (
"context"
"errors"
"fmt"
"github.com/prometheus/client_golang/prometheus"
"math"
"math/rand"
"sync/atomic"
"time"

"github.com/prometheus/client_golang/prometheus"

"github.com/Layr-Labs/eigenda/common"
"github.com/Layr-Labs/eigenda/core"
corev2 "github.com/Layr-Labs/eigenda/core/v2"
Expand Down Expand Up @@ -164,6 +165,7 @@ func (e *EncodingManager) HandleBatch(ctx context.Context) error {

submissionStart := time.Now()

e.logger.Debug("request encoding", "numBlobs", len(blobMetadatas))
for _, blob := range blobMetadatas {
blob := blob
blobKey, err := blob.BlobHeader.BlobKey()
Expand Down
8 changes: 4 additions & 4 deletions disperser/encoder/server_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func (s *EncoderServerV2) handleEncodingToChunkStore(ctx context.Context, req *p
if len(data) == 0 {
return nil, status.Error(codes.NotFound, "blob length is zero")
}
s.logger.Info("fetched blob", "duration", time.Since(fetchStart))
s.logger.Info("fetched blob", "duration", time.Since(fetchStart).String())

// Encode the data
encodingStart := time.Now()
Expand All @@ -155,7 +155,7 @@ func (s *EncoderServerV2) handleEncodingToChunkStore(ctx context.Context, req *p
s.logger.Error("failed to encode frames", "error", err)
return nil, status.Errorf(codes.Internal, "encoding failed: %v", err)
}
s.logger.Info("encoding frames", "duration", time.Since(encodingStart))
s.logger.Info("encoding frames", "duration", time.Since(encodingStart).String())

// Process and store results
return s.processAndStoreResults(ctx, blobKey, frames)
Expand Down Expand Up @@ -221,15 +221,15 @@ func (s *EncoderServerV2) processAndStoreResults(ctx context.Context, blobKey co
if err := s.chunkWriter.PutChunkProofs(ctx, blobKey, proofs); err != nil {
return nil, status.Errorf(codes.Internal, "failed to upload chunk proofs: %v", err)
}
s.logger.Info("stored proofs", "duration", time.Since(storeStart))
s.logger.Info("stored proofs", "duration", time.Since(storeStart).String())

// Store coefficients
coeffStart := time.Now()
fragmentInfo, err := s.chunkWriter.PutChunkCoefficients(ctx, blobKey, coeffs)
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to upload chunk coefficients: %v", err)
}
s.logger.Info("stored coefficients", "duration", time.Since(coeffStart))
s.logger.Info("stored coefficients", "duration", time.Since(coeffStart).String())

return &pb.EncodeBlobReply{
FragmentInfo: &pb.FragmentInfo{
Expand Down
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ require (
github.com/urfave/cli/v2 v2.27.4
github.com/wealdtech/go-merkletree/v2 v2.6.0
go.uber.org/automaxprocs v1.5.2
go.uber.org/goleak v1.3.0
go.uber.org/mock v0.4.0
golang.org/x/exp v0.0.0-20231110203233-9a3e6036ecaa
golang.org/x/sync v0.8.0
Expand Down
Loading

0 comments on commit eeae72a

Please sign in to comment.