Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement v2 client GET functionality #972

Draft
wants to merge 11 commits into
base: master
Choose a base branch
from
68 changes: 68 additions & 0 deletions api/clients/codecs/mock/BlobCodec.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package mock
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: filenames should be snakecase i.e. blob_codec.go

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed a48afb1


import mock "github.com/stretchr/testify/mock"

// BlobCodec is an autogenerated mock type for the BlobCodec type
type BlobCodec struct {
mock.Mock
}

// DecodeBlob provides a mock function with given fields: encodedData
func (_m *BlobCodec) DecodeBlob(encodedData []byte) ([]byte, error) {
ret := _m.Called(encodedData)

if len(ret) == 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should it expect leng(ret) == 2 since you also access Get(1)?

Copy link
Contributor Author

@litt3 litt3 Dec 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe the reason to only check ==0 here is to yield a more informative panic in the specific case where no return value is specified. The call to Get(1) later contains a length check under the hood, and would just yield a different message.

(for context, this mock was autogenerated)

panic("no return value specified for DecodeBlob")
}

var r0 []byte
var r1 error
if rf, ok := ret.Get(0).(func([]byte) ([]byte, error)); ok {
return rf(encodedData)
}
if rf, ok := ret.Get(0).(func([]byte) []byte); ok {
r0 = rf(encodedData)
} else {
if ret.Get(0) != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: control flow may be simplified as else if ret.Get(0) != nil { ... }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The file was auto generated by mockery. Are you ok with leaving the auto generated code as is?

Let me know if you'd prefer I make these changes, and I will apply your suggestions.

r0 = ret.Get(0).([]byte)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: may check the casting is done correctly r0, ok = ret.Get(0).([]byte)

}
}

if rf, ok := ret.Get(1).(func([]byte) error); ok {
r1 = rf(encodedData)
} else {
r1 = ret.Error(1)
}

return r0, r1
}

// EncodeBlob provides a mock function with given fields: rawData
func (_m *BlobCodec) EncodeBlob(rawData []byte) ([]byte, error) {
ret := _m.Called(rawData)

if len(ret) == 0 {
panic("no return value specified for EncodeBlob")
}

var r0 []byte
var r1 error
if rf, ok := ret.Get(0).(func([]byte) ([]byte, error)); ok {
return rf(rawData)
}
if rf, ok := ret.Get(0).(func([]byte) []byte); ok {
r0 = rf(rawData)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]byte)
}
}

if rf, ok := ret.Get(1).(func([]byte) error); ok {
r1 = rf(rawData)
} else {
r1 = ret.Error(1)
}

return r0, r1
}
142 changes: 142 additions & 0 deletions api/clients/eigenda_client_v2.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
package clients

import (
"context"
"fmt"
"github.com/Layr-Labs/eigenda/api/clients/codecs"
corev2 "github.com/Layr-Labs/eigenda/core/v2"
"github.com/Layr-Labs/eigensdk-go/logging"
"github.com/hashicorp/go-multierror"
"math/rand"
)

// EigenDAClientV2 provides the ability to get blobs from the relay subsystem, and to send new blobs to the disperser.
type EigenDAClientV2 interface {
samlaf marked this conversation as resolved.
Show resolved Hide resolved
// GetBlob iteratively attempts to retrieve a given blob with key blobKey from the relays listed in the blobCertificate.
GetBlob(ctx context.Context, blobKey corev2.BlobKey, blobCertificate corev2.BlobCertificate) ([]byte, error)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we only need the BlobCertificate for the relay keys, I think we should just take the relayKeys slice directly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we might need more data from the certificate for verification (though I'm not sure yet what is required). If it turns out we don't need the cert for anything else, I'll make the suggested change

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense! I'll keep this convo unresolved as a reminder for the next review when you commit the POST code

GetCodec() codecs.BlobCodec
Close() error
samlaf marked this conversation as resolved.
Show resolved Hide resolved
samlaf marked this conversation as resolved.
Show resolved Hide resolved
}

type eigenDAClientV2 struct {
samlaf marked this conversation as resolved.
Show resolved Hide resolved
clientConfig EigenDAClientConfig
samlaf marked this conversation as resolved.
Show resolved Hide resolved
log logging.Logger
relayClient RelayClient
codec codecs.BlobCodec
}

var _ EigenDAClientV2 = &eigenDAClientV2{}

// BuildEigenDAClientV2 builds an EigenDAClientV2 from config structs.
func BuildEigenDAClientV2(
log logging.Logger,
clientConfig EigenDAClientConfig,
relayClientConfig RelayClientConfig) (EigenDAClientV2, error) {

err := clientConfig.CheckAndSetDefaults()
if err != nil {
return nil, err
samlaf marked this conversation as resolved.
Show resolved Hide resolved
}

relayClient, err := NewRelayClient(&relayClientConfig, log)
samlaf marked this conversation as resolved.
Show resolved Hide resolved

lowLevelCodec, err := codecs.BlobEncodingVersionToCodec(clientConfig.PutBlobEncodingVersion)
if err != nil {
return nil, fmt.Errorf("create low level codec: %w", err)
}
var codec codecs.BlobCodec
if clientConfig.DisablePointVerificationMode {
codec = codecs.NewNoIFFTCodec(lowLevelCodec)
} else {
codec = codecs.NewIFFTCodec(lowLevelCodec)
}
samlaf marked this conversation as resolved.
Show resolved Hide resolved

return NewEigenDAClientV2(log, clientConfig, relayClient, codec)
}

// NewEigenDAClientV2 assembles an EigenDAClientV2 from subcomponents that have already been constructed and initialized.
func NewEigenDAClientV2(
log logging.Logger,
clientConfig EigenDAClientConfig,
relayClient RelayClient,
codec codecs.BlobCodec) (EigenDAClientV2, error) {

return &eigenDAClientV2{
clientConfig: clientConfig,
log: log,
relayClient: relayClient,
codec: codec,
}, nil
}

// GetBlob iteratively attempts to retrieve a given blob with key blobKey from the relays listed in the blobCertificate.
//
// The relays are attempted in random order.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

curious why this choice. How is the relayKey list populated? Shouldnt we let the person populating be able to dictate some preference on the relays by iterating through them in normal order?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My logic (which may be based on incorrect assumptions) is that, since the relayKey list is part of the cert, the order would the same for every client.

It seems we wouldn't want all clients to attempt the first relay in the list, rather we should distribute load across all available relays.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see. I think this makes sense. Although now I just realized I'm confused myself. @ian-shim does this make sense? Are the relayKeys meant to be for data replication or sharding? Aka do we need to hit all the relayKeys to get all the blobs, or any ONE should do?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The disperser implementation itself shuffles the relay keys, so attempting each relay in order is actually fine. But it's good that we're not assuming that relay keys aren't ordered in any particular way.

Aka do we need to hit all the relayKeys to get all the blobs, or any ONE should do?

Any one should do!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

attempting each relay in order is actually fine

@ian-shim The randomization is done while creating the cert, and all clients will be given the same cert from the disperser, right?

So, for a given blob that can be served by relays [X, Y, Z], if all clients were to attempt the relays in order, then they all would try to get the blob from relay X, and relays [Y, Z] wouldn't receive any load.

Is this the correct understanding, and if so, are you saying this behavior would be ok?

//
// The returned blob is decoded.
func (c *eigenDAClientV2) GetBlob(ctx context.Context, blobKey corev2.BlobKey, blobCertificate corev2.BlobCertificate) ([]byte, error) {
// create a randomized array of indices, so that it isn't always the first relay in the list which gets hit
random := rand.New(rand.NewSource(rand.Int63()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we want to go with this random appraoch (but see my other comment), maybe we should pass the source of randomness in the constructor and reuse it everywhere? To make tests deterministic.

Copy link
Contributor Author

@litt3 litt3 Dec 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Passed in a source of randomness 885c131

relayKeyCount := len(blobCertificate.RelayKeys)
samlaf marked this conversation as resolved.
Show resolved Hide resolved
var indices []int
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can just simplify it: indices := rand.Perm(relayKeyCount)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done a48afb1

for i := 0; i < relayKeyCount; i++ {
indices = append(indices, i)
}
random.Shuffle(len(indices), func(i int, j int) {
samlaf marked this conversation as resolved.
Show resolved Hide resolved
indices[i], indices[j] = indices[j], indices[i]
})

// TODO (litt3): consider creating a utility which can deprioritize relays that fail to respond (or respond maliciously)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good to think about but probably pretty low priority until we have relays that we don't run ourselves

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this sort of TODO comment acceptable to leave in production code?


// iterate over relays in random order, until we are able to get the blob from someone
for _, val := range indices {
relayKey := blobCertificate.RelayKeys[val]

data, err := c.relayClient.GetBlob(ctx, relayKey, blobKey)

// if GetBlob returned an error, try calling a different relay
if err != nil {
// TODO: should this log type be downgraded to debug to avoid log spam? I'm not sure how frequent retrieval
// from a relay will fail in practice (?)
c.log.Info("blob couldn't be retrieved from relay", "blobKey", blobKey, "relayKey", relayKey)
samlaf marked this conversation as resolved.
Show resolved Hide resolved
continue
}

// An honest relay should never send an empty blob
if len(data) == 0 {
c.log.Warn("blob received from relay had length 0", "blobKey", blobKey, "relayKey", relayKey)
continue
}

// An honest relay should never send a blob which cannot be decoded

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To expand these invariants: an honest relay should never send a blob which doesn't respect its polynomial commitments. The thing is though this check would get caught upstream (i.e, within proxy directly) and probably cause the request to fail. The proxy client would trigger a retry which would probably route to another relay.

this isn't a big problem rn and we can just document it somewhere for circle back sometime in the future.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any reason not to check this invariant here, included in this PR? Seems like it wouldn't be hard to add

decodedData, err := c.codec.DecodeBlob(data)
if err != nil {
c.log.Warn("error decoding blob", "blobKey", blobKey, "relayKey", relayKey)
samlaf marked this conversation as resolved.
Show resolved Hide resolved
continue
}

return decodedData, nil
}

return nil, fmt.Errorf("unable to retrieve blob from any relay")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

knit - including the relay count in the error msg could be useful context when debugging

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}

func (c *eigenDAClientV2) GetCodec() codecs.BlobCodec {
return c.codec
}

func (c *eigenDAClientV2) Close() error {
var errList *multierror.Error

// TODO: this is using a multierror, since there will be more subcomponents requiring closing after adding PUT functionality
samlaf marked this conversation as resolved.
Show resolved Hide resolved
relayClientErr := c.relayClient.Close()
if relayClientErr != nil {
errList = multierror.Append(errList, relayClientErr)
}

if errList != nil {
return errList.ErrorOrNil()
}

return nil
}
Loading
Loading