-
Notifications
You must be signed in to change notification settings - Fork 177
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement v2 client GET functionality #972
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,68 @@ | ||
package mock | ||
|
||
import mock "github.com/stretchr/testify/mock" | ||
|
||
// BlobCodec is an autogenerated mock type for the BlobCodec type | ||
type BlobCodec struct { | ||
mock.Mock | ||
} | ||
|
||
// DecodeBlob provides a mock function with given fields: encodedData | ||
func (_m *BlobCodec) DecodeBlob(encodedData []byte) ([]byte, error) { | ||
ret := _m.Called(encodedData) | ||
|
||
if len(ret) == 0 { | ||
panic("no return value specified for DecodeBlob") | ||
} | ||
|
||
var r0 []byte | ||
var r1 error | ||
if rf, ok := ret.Get(0).(func([]byte) ([]byte, error)); ok { | ||
return rf(encodedData) | ||
} | ||
if rf, ok := ret.Get(0).(func([]byte) []byte); ok { | ||
r0 = rf(encodedData) | ||
} else { | ||
if ret.Get(0) != nil { | ||
r0 = ret.Get(0).([]byte) | ||
} | ||
} | ||
|
||
if rf, ok := ret.Get(1).(func([]byte) error); ok { | ||
r1 = rf(encodedData) | ||
} else { | ||
r1 = ret.Error(1) | ||
} | ||
|
||
return r0, r1 | ||
} | ||
|
||
// EncodeBlob provides a mock function with given fields: rawData | ||
func (_m *BlobCodec) EncodeBlob(rawData []byte) ([]byte, error) { | ||
ret := _m.Called(rawData) | ||
|
||
if len(ret) == 0 { | ||
panic("no return value specified for EncodeBlob") | ||
} | ||
|
||
var r0 []byte | ||
var r1 error | ||
if rf, ok := ret.Get(0).(func([]byte) ([]byte, error)); ok { | ||
return rf(rawData) | ||
} | ||
if rf, ok := ret.Get(0).(func([]byte) []byte); ok { | ||
r0 = rf(rawData) | ||
} else { | ||
if ret.Get(0) != nil { | ||
r0 = ret.Get(0).([]byte) | ||
} | ||
} | ||
|
||
if rf, ok := ret.Get(1).(func([]byte) error); ok { | ||
r1 = rf(rawData) | ||
} else { | ||
r1 = ret.Error(1) | ||
} | ||
|
||
return r0, r1 | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,29 @@ | ||
package clients | ||
|
||
import ( | ||
"github.com/Layr-Labs/eigenda/api/clients/codecs" | ||
) | ||
|
||
// VerificationMode is an enum that represents the different ways that a blob may be encoded/decoded between | ||
// the client and the disperser. | ||
type VerificationMode uint | ||
|
||
const ( | ||
// TODO: write good docs here for IFFT and NoIFFT (I need to update my understanding to be able to write this) | ||
IFFT VerificationMode = iota | ||
NoIFFT | ||
) | ||
|
||
// EigenDAClientConfigV2 contains configuration values for EigenDAClientV2 | ||
type EigenDAClientConfigV2 struct { | ||
// The blob encoding version to use when writing and reading blobs | ||
BlobEncodingVersion codecs.BlobEncodingVersion | ||
|
||
// If PointVerificationMode is IFFT, then the client codec will do an IFFT on blobs before they are dispersed, and | ||
// will do an FFT on blobs after receiving them. This makes it possible to open points on the KZG commitment to prove | ||
// that the field elements correspond to the commitment. | ||
// | ||
// If PointVerificationMode is NoIFFT, the blob must be supplied in its entirety, to perform a verification | ||
// that any part of the data matches the KZG commitment. | ||
PointVerificationMode VerificationMode | ||
} |
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -0,0 +1,148 @@ | ||||||
package clients | ||||||
|
||||||
import ( | ||||||
"context" | ||||||
"errors" | ||||||
"fmt" | ||||||
"github.com/Layr-Labs/eigenda/api/clients/codecs" | ||||||
corev2 "github.com/Layr-Labs/eigenda/core/v2" | ||||||
"github.com/Layr-Labs/eigensdk-go/logging" | ||||||
"github.com/cockroachdb/errors/join" | ||||||
"math/rand" | ||||||
) | ||||||
|
||||||
// EigenDAClientV2 provides the ability to get blobs from the relay subsystem, and to send new blobs to the disperser. | ||||||
// | ||||||
// This struct is not threadsafe. | ||||||
type EigenDAClientV2 struct { | ||||||
log logging.Logger | ||||||
// doesn't need to be cryptographically secure, as it's only used to distribute load across relays | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. out of scope for this PR but curious if we'd ever wanna let users define their own retrieval policies when communicating with relays |
||||||
random *rand.Rand | ||||||
config *EigenDAClientConfigV2 | ||||||
codec codecs.BlobCodec | ||||||
relayClient RelayClient | ||||||
} | ||||||
|
||||||
// BuildEigenDAClientV2 builds an EigenDAClientV2 from config structs. | ||||||
func BuildEigenDAClientV2( | ||||||
log logging.Logger, | ||||||
config *EigenDAClientConfigV2, | ||||||
relayClientConfig *RelayClientConfig) (*EigenDAClientV2, error) { | ||||||
|
||||||
relayClient, err := NewRelayClient(relayClientConfig, log) | ||||||
if err != nil { | ||||||
return nil, fmt.Errorf("new relay client: %w", err) | ||||||
} | ||||||
|
||||||
codec, err := createCodec(config) | ||||||
if err != nil { | ||||||
return nil, err | ||||||
} | ||||||
|
||||||
return NewEigenDAClientV2(log, rand.New(rand.NewSource(rand.Int63())), config, relayClient, codec) | ||||||
} | ||||||
|
||||||
// NewEigenDAClientV2 assembles an EigenDAClientV2 from subcomponents that have already been constructed and initialized. | ||||||
func NewEigenDAClientV2( | ||||||
log logging.Logger, | ||||||
random *rand.Rand, | ||||||
config *EigenDAClientConfigV2, | ||||||
relayClient RelayClient, | ||||||
codec codecs.BlobCodec) (*EigenDAClientV2, error) { | ||||||
|
||||||
return &EigenDAClientV2{ | ||||||
log: log, | ||||||
random: random, | ||||||
config: config, | ||||||
codec: codec, | ||||||
relayClient: relayClient, | ||||||
}, nil | ||||||
} | ||||||
|
||||||
// GetBlob iteratively attempts to retrieve a given blob with key blobKey from the relays listed in the blobCertificate. | ||||||
// | ||||||
// The relays are attempted in random order. | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. curious why this choice. How is the relayKey list populated? Shouldnt we let the person populating be able to dictate some preference on the relays by iterating through them in normal order? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My logic (which may be based on incorrect assumptions) is that, since the relayKey list is part of the cert, the order would the same for every client. It seems we wouldn't want all clients to attempt the first relay in the list, rather we should distribute load across all available relays. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah I see. I think this makes sense. Although now I just realized I'm confused myself. @ian-shim does this make sense? Are the relayKeys meant to be for data replication or sharding? Aka do we need to hit all the relayKeys to get all the blobs, or any ONE should do? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The disperser implementation itself shuffles the relay keys, so attempting each relay in order is actually fine. But it's good that we're not assuming that relay keys aren't ordered in any particular way.
Any one should do! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
@ian-shim The randomization is done while creating the cert, and all clients will be given the same cert from the disperser, right? So, for a given blob that can be served by relays Is this the correct understanding, and if so, are you saying this behavior would be ok? |
||||||
// | ||||||
// The returned blob is decoded. | ||||||
func (c *EigenDAClientV2) GetBlob( | ||||||
ctx context.Context, | ||||||
blobKey corev2.BlobKey, | ||||||
blobCertificate corev2.BlobCertificate) ([]byte, error) { | ||||||
|
||||||
relayKeyCount := len(blobCertificate.RelayKeys) | ||||||
samlaf marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
|
||||||
if relayKeyCount == 0 { | ||||||
return nil, errors.New("relay key count is zero") | ||||||
} | ||||||
|
||||||
// create a randomized array of indices, so that it isn't always the first relay in the list which gets hit | ||||||
indices := c.random.Perm(relayKeyCount) | ||||||
|
||||||
// TODO (litt3): consider creating a utility which can deprioritize relays that fail to respond (or respond maliciously) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. good to think about but probably pretty low priority until we have relays that we don't run ourselves There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this sort of TODO comment acceptable to leave in production code? |
||||||
|
||||||
// iterate over relays in random order, until we are able to get the blob from someone | ||||||
for _, val := range indices { | ||||||
relayKey := blobCertificate.RelayKeys[val] | ||||||
|
||||||
// TODO: does this need a timeout? | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes! |
||||||
data, err := c.relayClient.GetBlob(ctx, relayKey, blobKey) | ||||||
Comment on lines
+87
to
+88
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes - grpc clients by default don't have a default timeout iiuc: https://medium.com/geekculture/timeout-context-in-go-e88af0abd08d |
||||||
|
||||||
// if GetBlob returned an error, try calling a different relay | ||||||
if err != nil { | ||||||
c.log.Warn("blob couldn't be retrieved from relay", "blobKey", blobKey, "relayKey", relayKey, "error", err) | ||||||
continue | ||||||
} | ||||||
Comment on lines
+90
to
+94
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what about the circumstance where the error is transient and the # of relay keys == 1? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are you suggesting that we have an additional timeout, during which the client repeatedly retries all relays? I could implement this if it's the way we want to go- but I don't see how the case |
||||||
|
||||||
// An honest relay should never send an empty blob | ||||||
if len(data) == 0 { | ||||||
c.log.Warn("blob received from relay had length 0", "blobKey", blobKey, "relayKey", relayKey) | ||||||
continue | ||||||
} | ||||||
|
||||||
// An honest relay should never send a blob which cannot be decoded | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To expand these invariants: an honest relay should never send a blob which doesn't respect its polynomial commitments. The thing is though this check would get caught upstream (i.e, within proxy directly) and probably cause the request to fail. The proxy client would trigger a retry which would probably route to another relay. this isn't a big problem rn and we can just document it somewhere for circle back sometime in the future. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there any reason not to check this invariant here, included in this PR? Seems like it wouldn't be hard to add |
||||||
decodedData, err := c.codec.DecodeBlob(data) | ||||||
if err != nil { | ||||||
c.log.Warn("error decoding blob from relay", "blobKey", blobKey, "relayKey", relayKey, "error", err) | ||||||
continue | ||||||
} | ||||||
|
||||||
return decodedData, nil | ||||||
} | ||||||
|
||||||
return nil, fmt.Errorf("unable to retrieve blob from any relay. relay count: %d", relayKeyCount) | ||||||
} | ||||||
|
||||||
// GetCodec returns the codec the client uses for encoding and decoding blobs | ||||||
func (c *EigenDAClientV2) GetCodec() codecs.BlobCodec { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why do we need this getter? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I carried it forward from v1 client - There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we'll still need it. We use it in proxy to get the codec and IFFT blobs when computing commitments. |
||||||
return c.codec | ||||||
} | ||||||
|
||||||
// Close is responsible for calling close on all internal clients. This method will do its best to close all internal | ||||||
// clients, even if some closes fail. | ||||||
// | ||||||
// Any and all errors returned from closing internal clients will be joined and returned. | ||||||
// | ||||||
// This method should only be called once. | ||||||
func (c *EigenDAClientV2) Close() error { | ||||||
relayClientErr := c.relayClient.Close() | ||||||
|
||||||
// TODO: this is using join, since there will be more subcomponents requiring closing after adding PUT functionality | ||||||
return join.Join(relayClientErr) | ||||||
} | ||||||
|
||||||
// createCodec creates the codec based on client config values | ||||||
func createCodec(config *EigenDAClientConfigV2) (codecs.BlobCodec, error) { | ||||||
lowLevelCodec, err := codecs.BlobEncodingVersionToCodec(config.BlobEncodingVersion) | ||||||
if err != nil { | ||||||
return nil, fmt.Errorf("create low level codec: %w", err) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. knit:
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. knit^2: disagree here, been trying to enforce https://github.com/uber-go/guide/blob/master/style.md#error-wrapping |
||||||
} | ||||||
|
||||||
switch config.PointVerificationMode { | ||||||
case NoIFFT: | ||||||
return codecs.NewNoIFFTCodec(lowLevelCodec), nil | ||||||
case IFFT: | ||||||
return codecs.NewIFFTCodec(lowLevelCodec), nil | ||||||
default: | ||||||
return nil, fmt.Errorf("unsupported point verification mode: %d", config.PointVerificationMode) | ||||||
} | ||||||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you comment thread safety of this struct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added comment stating the struct is not threadsafe a48afb1