diff --git a/api/clients/codecs/mock/blob_codec.go b/api/clients/codecs/mock/blob_codec.go new file mode 100644 index 000000000..973669bff --- /dev/null +++ b/api/clients/codecs/mock/blob_codec.go @@ -0,0 +1,68 @@ +package mock + +import mock "github.com/stretchr/testify/mock" + +// BlobCodec is an autogenerated mock type for the BlobCodec type +type BlobCodec struct { + mock.Mock +} + +// DecodeBlob provides a mock function with given fields: encodedData +func (_m *BlobCodec) DecodeBlob(encodedData []byte) ([]byte, error) { + ret := _m.Called(encodedData) + + if len(ret) == 0 { + panic("no return value specified for DecodeBlob") + } + + var r0 []byte + var r1 error + if rf, ok := ret.Get(0).(func([]byte) ([]byte, error)); ok { + return rf(encodedData) + } + if rf, ok := ret.Get(0).(func([]byte) []byte); ok { + r0 = rf(encodedData) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]byte) + } + } + + if rf, ok := ret.Get(1).(func([]byte) error); ok { + r1 = rf(encodedData) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// EncodeBlob provides a mock function with given fields: rawData +func (_m *BlobCodec) EncodeBlob(rawData []byte) ([]byte, error) { + ret := _m.Called(rawData) + + if len(ret) == 0 { + panic("no return value specified for EncodeBlob") + } + + var r0 []byte + var r1 error + if rf, ok := ret.Get(0).(func([]byte) ([]byte, error)); ok { + return rf(rawData) + } + if rf, ok := ret.Get(0).(func([]byte) []byte); ok { + r0 = rf(rawData) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]byte) + } + } + + if rf, ok := ret.Get(1).(func([]byte) error); ok { + r1 = rf(rawData) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} diff --git a/api/clients/config_v2.go b/api/clients/config_v2.go new file mode 100644 index 000000000..7189ad05f --- /dev/null +++ b/api/clients/config_v2.go @@ -0,0 +1,29 @@ +package clients + +import ( + "github.com/Layr-Labs/eigenda/api/clients/codecs" +) + +// VerificationMode is an enum that represents the different ways that a blob may be encoded/decoded between +// the client and the disperser. +type VerificationMode uint + +const ( + // TODO: write good docs here for IFFT and NoIFFT (I need to update my understanding to be able to write this) + IFFT VerificationMode = iota + NoIFFT +) + +// EigenDAClientConfigV2 contains configuration values for EigenDAClientV2 +type EigenDAClientConfigV2 struct { + // The blob encoding version to use when writing and reading blobs + BlobEncodingVersion codecs.BlobEncodingVersion + + // If PointVerificationMode is IFFT, then the client codec will do an IFFT on blobs before they are dispersed, and + // will do an FFT on blobs after receiving them. This makes it possible to open points on the KZG commitment to prove + // that the field elements correspond to the commitment. + // + // If PointVerificationMode is NoIFFT, the blob must be supplied in its entirety, to perform a verification + // that any part of the data matches the KZG commitment. + PointVerificationMode VerificationMode +} diff --git a/api/clients/eigenda_client_v2.go b/api/clients/eigenda_client_v2.go new file mode 100644 index 000000000..2c6a2aeda --- /dev/null +++ b/api/clients/eigenda_client_v2.go @@ -0,0 +1,148 @@ +package clients + +import ( + "context" + "errors" + "fmt" + "github.com/Layr-Labs/eigenda/api/clients/codecs" + corev2 "github.com/Layr-Labs/eigenda/core/v2" + "github.com/Layr-Labs/eigensdk-go/logging" + "github.com/cockroachdb/errors/join" + "math/rand" +) + +// EigenDAClientV2 provides the ability to get blobs from the relay subsystem, and to send new blobs to the disperser. +// +// This struct is not threadsafe. +type EigenDAClientV2 struct { + log logging.Logger + // doesn't need to be cryptographically secure, as it's only used to distribute load across relays + random *rand.Rand + config *EigenDAClientConfigV2 + codec codecs.BlobCodec + relayClient RelayClient +} + +// BuildEigenDAClientV2 builds an EigenDAClientV2 from config structs. +func BuildEigenDAClientV2( + log logging.Logger, + config *EigenDAClientConfigV2, + relayClientConfig *RelayClientConfig) (*EigenDAClientV2, error) { + + relayClient, err := NewRelayClient(relayClientConfig, log) + if err != nil { + return nil, fmt.Errorf("new relay client: %w", err) + } + + codec, err := createCodec(config) + if err != nil { + return nil, err + } + + return NewEigenDAClientV2(log, rand.New(rand.NewSource(rand.Int63())), config, relayClient, codec) +} + +// NewEigenDAClientV2 assembles an EigenDAClientV2 from subcomponents that have already been constructed and initialized. +func NewEigenDAClientV2( + log logging.Logger, + random *rand.Rand, + config *EigenDAClientConfigV2, + relayClient RelayClient, + codec codecs.BlobCodec) (*EigenDAClientV2, error) { + + return &EigenDAClientV2{ + log: log, + random: random, + config: config, + codec: codec, + relayClient: relayClient, + }, nil +} + +// GetBlob iteratively attempts to retrieve a given blob with key blobKey from the relays listed in the blobCertificate. +// +// The relays are attempted in random order. +// +// The returned blob is decoded. +func (c *EigenDAClientV2) GetBlob( + ctx context.Context, + blobKey corev2.BlobKey, + blobCertificate corev2.BlobCertificate) ([]byte, error) { + + relayKeyCount := len(blobCertificate.RelayKeys) + + if relayKeyCount == 0 { + return nil, errors.New("relay key count is zero") + } + + // create a randomized array of indices, so that it isn't always the first relay in the list which gets hit + indices := c.random.Perm(relayKeyCount) + + // TODO (litt3): consider creating a utility which can deprioritize relays that fail to respond (or respond maliciously) + + // iterate over relays in random order, until we are able to get the blob from someone + for _, val := range indices { + relayKey := blobCertificate.RelayKeys[val] + + // TODO: does this need a timeout? + data, err := c.relayClient.GetBlob(ctx, relayKey, blobKey) + + // if GetBlob returned an error, try calling a different relay + if err != nil { + c.log.Warn("blob couldn't be retrieved from relay", "blobKey", blobKey, "relayKey", relayKey, "error", err) + continue + } + + // An honest relay should never send an empty blob + if len(data) == 0 { + c.log.Warn("blob received from relay had length 0", "blobKey", blobKey, "relayKey", relayKey) + continue + } + + // An honest relay should never send a blob which cannot be decoded + decodedData, err := c.codec.DecodeBlob(data) + if err != nil { + c.log.Warn("error decoding blob from relay", "blobKey", blobKey, "relayKey", relayKey, "error", err) + continue + } + + return decodedData, nil + } + + return nil, fmt.Errorf("unable to retrieve blob from any relay. relay count: %d", relayKeyCount) +} + +// GetCodec returns the codec the client uses for encoding and decoding blobs +func (c *EigenDAClientV2) GetCodec() codecs.BlobCodec { + return c.codec +} + +// Close is responsible for calling close on all internal clients. This method will do its best to close all internal +// clients, even if some closes fail. +// +// Any and all errors returned from closing internal clients will be joined and returned. +// +// This method should only be called once. +func (c *EigenDAClientV2) Close() error { + relayClientErr := c.relayClient.Close() + + // TODO: this is using join, since there will be more subcomponents requiring closing after adding PUT functionality + return join.Join(relayClientErr) +} + +// createCodec creates the codec based on client config values +func createCodec(config *EigenDAClientConfigV2) (codecs.BlobCodec, error) { + lowLevelCodec, err := codecs.BlobEncodingVersionToCodec(config.BlobEncodingVersion) + if err != nil { + return nil, fmt.Errorf("create low level codec: %w", err) + } + + switch config.PointVerificationMode { + case NoIFFT: + return codecs.NewNoIFFTCodec(lowLevelCodec), nil + case IFFT: + return codecs.NewIFFTCodec(lowLevelCodec), nil + default: + return nil, fmt.Errorf("unsupported point verification mode: %d", config.PointVerificationMode) + } +} diff --git a/api/clients/eigenda_client_v2_test.go b/api/clients/eigenda_client_v2_test.go new file mode 100644 index 000000000..3a0c59479 --- /dev/null +++ b/api/clients/eigenda_client_v2_test.go @@ -0,0 +1,286 @@ +package clients_test + +import ( + "context" + "fmt" + "github.com/Layr-Labs/eigenda/api/clients" + "github.com/Layr-Labs/eigenda/api/clients/codecs" + codecsmock "github.com/Layr-Labs/eigenda/api/clients/codecs/mock" + clientsmock "github.com/Layr-Labs/eigenda/api/clients/mock" + tu "github.com/Layr-Labs/eigenda/common/testutils" + v2 "github.com/Layr-Labs/eigenda/core/v2" + "github.com/Layr-Labs/eigensdk-go/logging" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "math/rand" + "testing" +) + +type ClientV2Tester struct { + ClientV2 *clients.EigenDAClientV2 + MockRelayClient *clientsmock.MockRelayClient + MockCodec *codecsmock.BlobCodec +} + +func (c *ClientV2Tester) assertExpectations(t *testing.T) { + c.MockRelayClient.AssertExpectations(t) + c.MockCodec.AssertExpectations(t) +} + +// buildClientV2Tester sets up a V2 client, with mocks necessary for testing +func buildClientV2Tester(t *testing.T) ClientV2Tester { + tu.InitializeRandom() + logger := logging.NewNoopLogger() + clientConfig := &clients.EigenDAClientConfigV2{} + + mockRelayClient := clientsmock.MockRelayClient{} + mockCodec := codecsmock.BlobCodec{} + + // TODO (litt3): use TestRandom once the PR merges https://github.com/Layr-Labs/eigenda/pull/976 + random := rand.New(rand.NewSource(rand.Int63())) + + client, err := clients.NewEigenDAClientV2( + logger, + random, + clientConfig, + &mockRelayClient, + &mockCodec) + + assert.NotNil(t, client) + assert.Nil(t, err) + + return ClientV2Tester{ + ClientV2: client, + MockRelayClient: &mockRelayClient, + MockCodec: &mockCodec, + } +} + +// TestGetBlobSuccess tests that a blob is received without error in the happy case +func TestGetBlobSuccess(t *testing.T) { + tester := buildClientV2Tester(t) + + blobKey := v2.BlobKey(tu.RandomBytes(32)) + blobBytes := tu.RandomBytes(100) + + relayKeys := make([]v2.RelayKey, 1) + relayKeys[0] = rand.Uint32() + blobCert := v2.BlobCertificate{ + RelayKeys: relayKeys, + } + + tester.MockRelayClient.On("GetBlob", mock.Anything, relayKeys[0], blobKey).Return(blobBytes, nil).Once() + tester.MockCodec.On("DecodeBlob", blobBytes).Return(tu.RandomBytes(50), nil).Once() + + blob, err := tester.ClientV2.GetBlob(context.Background(), blobKey, blobCert) + + assert.NotNil(t, blob) + assert.Nil(t, err) + + tester.assertExpectations(t) +} + +// TestRandomRelayRetries verifies correct behavior when some relays from the certificate do not respond with the blob, +// requiring the client to retry with other relays. +func TestRandomRelayRetries(t *testing.T) { + tester := buildClientV2Tester(t) + + blobKey := v2.BlobKey(tu.RandomBytes(32)) + blobBytes := tu.RandomBytes(100) + + relayCount := 100 + relayKeys := make([]v2.RelayKey, relayCount) + for i := 0; i < relayCount; i++ { + relayKeys[i] = rand.Uint32() + } + blobCert := v2.BlobCertificate{ + RelayKeys: relayKeys, + } + + // for this test, only a single relay is online + // we will be asserting that it takes a different amount of retries to dial this relay, since the array of relay keys to try is randomized + onlineRelayKey := relayKeys[rand.Intn(len(relayKeys))] + + offlineKeyMatcher := func(relayKey v2.RelayKey) bool { return relayKey != onlineRelayKey } + onlineKeyMatcher := func(relayKey v2.RelayKey) bool { return relayKey == onlineRelayKey } + var failedCallCount int + tester.MockRelayClient.On("GetBlob", mock.Anything, mock.MatchedBy(offlineKeyMatcher), blobKey).Return(nil, fmt.Errorf("offline relay")).Run(func(args mock.Arguments) { + failedCallCount++ + }) + tester.MockRelayClient.On("GetBlob", mock.Anything, mock.MatchedBy(onlineKeyMatcher), blobKey).Return(blobBytes, nil) + tester.MockCodec.On("DecodeBlob", mock.Anything).Return(tu.RandomBytes(50), nil) + + // keep track of how many tries various blob retrievals require + // this allows us to assert that there is variability, i.e. that relay call order is actually random + requiredTries := map[int]bool{} + + for i := 0; i < relayCount; i++ { + failedCallCount = 0 + blob, err := tester.ClientV2.GetBlob(context.Background(), blobKey, blobCert) + assert.NotNil(t, blob) + assert.Nil(t, err) + + requiredTries[failedCallCount] = true + } + + // with 100 random tries, with possible values between 1 and 100, we can very confidently assert that there are at least 10 unique values + assert.Greater(t, len(requiredTries), 10) + + tester.assertExpectations(t) +} + +// TestNoRelayResponse tests functionality when none of the relays listed in the blob certificate respond +func TestNoRelayResponse(t *testing.T) { + tester := buildClientV2Tester(t) + + blobKey := v2.BlobKey(tu.RandomBytes(32)) + + relayCount := 10 + relayKeys := make([]v2.RelayKey, relayCount) + for i := 0; i < relayCount; i++ { + relayKeys[i] = rand.Uint32() + } + blobCert := v2.BlobCertificate{ + RelayKeys: relayKeys, + } + + tester.MockRelayClient.On("GetBlob", mock.Anything, mock.Anything, blobKey).Return(nil, fmt.Errorf("offline relay")) + + blob, err := tester.ClientV2.GetBlob(context.Background(), blobKey, blobCert) + assert.Nil(t, blob) + assert.NotNil(t, err) + + tester.assertExpectations(t) +} + +// TestNoRelaysInCert tests that having no relay keys in the cert is handled gracefully +func TestNoRelaysInCert(t *testing.T) { + tester := buildClientV2Tester(t) + + blobKey := v2.BlobKey(tu.RandomBytes(32)) + + // cert has no listed relay keys + blobCert := v2.BlobCertificate{ + RelayKeys: []v2.RelayKey{}, + } + + blob, err := tester.ClientV2.GetBlob(context.Background(), blobKey, blobCert) + assert.Nil(t, blob) + assert.NotNil(t, err) + + tester.assertExpectations(t) +} + +// TestGetBlobReturns0Len verifies that a 0 length blob returned from a relay is handled gracefully, and that the client retries after such a failure +func TestGetBlobReturns0Len(t *testing.T) { + tester := buildClientV2Tester(t) + + blobKey := v2.BlobKey(tu.RandomBytes(32)) + + relayCount := 10 + relayKeys := make([]v2.RelayKey, relayCount) + for i := 0; i < relayCount; i++ { + relayKeys[i] = rand.Uint32() + } + blobCert := v2.BlobCertificate{ + RelayKeys: relayKeys, + } + + // the first GetBlob will return a 0 len blob + tester.MockRelayClient.On("GetBlob", mock.Anything, mock.Anything, blobKey).Return([]byte{}, nil).Once() + // the second call will return random bytes + tester.MockRelayClient.On("GetBlob", mock.Anything, mock.Anything, blobKey).Return(tu.RandomBytes(100), nil).Once() + + tester.MockCodec.On("DecodeBlob", mock.Anything).Return(tu.RandomBytes(50), nil) + + // the call to the first relay will fail with a 0 len blob returned. the call to the second relay will succeed + blob, err := tester.ClientV2.GetBlob(context.Background(), blobKey, blobCert) + assert.NotNil(t, blob) + assert.Nil(t, err) + + tester.assertExpectations(t) +} + +// TestFailedDecoding verifies that a failed blob decode is handled gracefully, and that the client retries after such a failure +func TestFailedDecoding(t *testing.T) { + tester := buildClientV2Tester(t) + + blobKey := v2.BlobKey(tu.RandomBytes(32)) + + relayCount := 10 + relayKeys := make([]v2.RelayKey, relayCount) + for i := 0; i < relayCount; i++ { + relayKeys[i] = rand.Uint32() + } + blobCert := v2.BlobCertificate{ + RelayKeys: relayKeys, + } + + tester.MockRelayClient.On("GetBlob", mock.Anything, mock.Anything, blobKey).Return(tu.RandomBytes(100), nil) + + tester.MockCodec.On("DecodeBlob", mock.Anything).Return(nil, fmt.Errorf("decode failed")).Once() + tester.MockCodec.On("DecodeBlob", mock.Anything).Return(tu.RandomBytes(50), nil).Once() + + // decoding will fail the first time, but succeed the second time + blob, err := tester.ClientV2.GetBlob(context.Background(), blobKey, blobCert) + assert.NotNil(t, blob) + assert.Nil(t, err) + + tester.assertExpectations(t) +} + +// TestErrorFreeClose tests the happy case, where none of the internal closes yield an error +func TestErrorFreeClose(t *testing.T) { + tester := buildClientV2Tester(t) + + tester.MockRelayClient.On("Close").Return(nil).Once() + + err := tester.ClientV2.Close() + assert.Nil(t, err) + + tester.assertExpectations(t) +} + +// TestErrorClose tests what happens when subcomponents throw errors when being closed +func TestErrorClose(t *testing.T) { + tester := buildClientV2Tester(t) + + tester.MockRelayClient.On("Close").Return(fmt.Errorf("close failed")).Once() + + err := tester.ClientV2.Close() + assert.NotNil(t, err) + + tester.assertExpectations(t) +} + +// TestGetCodec checks that the codec used in construction is returned by GetCodec +func TestGetCodec(t *testing.T) { + tester := buildClientV2Tester(t) + + assert.Equal(t, tester.MockCodec, tester.ClientV2.GetCodec()) + + tester.assertExpectations(t) +} + +// TestBuilder tests that the method that builds the client from config doesn't throw any obvious errors +func TestBuilder(t *testing.T) { + clientConfig := &clients.EigenDAClientConfigV2{ + BlobEncodingVersion: codecs.DefaultBlobEncoding, + PointVerificationMode: clients.IFFT, + } + + relayClientConfig := &clients.RelayClientConfig{ + Sockets: make(map[v2.RelayKey]string), + UseSecureGrpcFlag: true, + } + + clientV2, err := clients.BuildEigenDAClientV2( + logging.NewNoopLogger(), + clientConfig, + relayClientConfig) + + assert.NotNil(t, clientV2) + assert.Nil(t, err) + + assert.NotNil(t, clientV2.GetCodec()) +} diff --git a/api/clients/mock/relay_client.go b/api/clients/mock/relay_client.go index e97e1e540..63d5750f4 100644 --- a/api/clients/mock/relay_client.go +++ b/api/clients/mock/relay_client.go @@ -19,7 +19,7 @@ func NewRelayClient() *MockRelayClient { } func (c *MockRelayClient) GetBlob(ctx context.Context, relayKey corev2.RelayKey, blobKey corev2.BlobKey) ([]byte, error) { - args := c.Called(blobKey) + args := c.Called(ctx, relayKey, blobKey) if args.Get(0) == nil { return nil, args.Error(1) }