Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: relax indexedChainState to ChainState for retrieval #943

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion api/clients/mock/node_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func (c *MockNodeClient) GetBlobHeader(ctx context.Context, socket string, batch
func (c *MockNodeClient) GetChunks(
ctx context.Context,
opID core.OperatorID,
opInfo *core.IndexedOperatorInfo,
opInfo *core.OperatorInfo,
batchHeaderHash [32]byte,
blobIndex uint32,
quorumID core.QuorumID,
Expand Down
4 changes: 2 additions & 2 deletions api/clients/node_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ type RetrievedChunks struct {

type NodeClient interface {
GetBlobHeader(ctx context.Context, socket string, batchHeaderHash [32]byte, blobIndex uint32) (*core.BlobHeader, *merkletree.Proof, error)
GetChunks(ctx context.Context, opID core.OperatorID, opInfo *core.IndexedOperatorInfo, batchHeaderHash [32]byte, blobIndex uint32, quorumID core.QuorumID, chunksChan chan RetrievedChunks)
GetChunks(ctx context.Context, opID core.OperatorID, opInfo *core.OperatorInfo, batchHeaderHash [32]byte, blobIndex uint32, quorumID core.QuorumID, chunksChan chan RetrievedChunks)
}

type client struct {
Expand Down Expand Up @@ -79,7 +79,7 @@ func (c client) GetBlobHeader(
func (c client) GetChunks(
ctx context.Context,
opID core.OperatorID,
opInfo *core.IndexedOperatorInfo,
opInfo *core.OperatorInfo,
batchHeaderHash [32]byte,
blobIndex uint32,
quorumID core.QuorumID,
Expand Down
16 changes: 8 additions & 8 deletions api/clients/retrieval_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ type BlobChunks struct {

type retrievalClient struct {
logger logging.Logger
indexedChainState core.IndexedChainState
chainState core.ChainState
assignmentCoordinator core.AssignmentCoordinator
nodeClient NodeClient
verifier encoding.Verifier
Expand All @@ -63,15 +63,15 @@ type retrievalClient struct {
// NewRetrievalClient creates a new retrieval client.
func NewRetrievalClient(
logger logging.Logger,
chainState core.IndexedChainState,
chainState core.ChainState,
assignmentCoordinator core.AssignmentCoordinator,
nodeClient NodeClient,
verifier encoding.Verifier,
numConnections int) (RetrievalClient, error) {

return &retrievalClient{
logger: logger.With("component", "RetrievalClient"),
indexedChainState: chainState,
chainState: chainState,
assignmentCoordinator: assignmentCoordinator,
nodeClient: nodeClient,
verifier: verifier,
Expand Down Expand Up @@ -104,11 +104,11 @@ func (r *retrievalClient) RetrieveBlobChunks(ctx context.Context,
batchRoot [32]byte,
quorumID core.QuorumID) (*BlobChunks, error) {

indexedOperatorState, err := r.indexedChainState.GetIndexedOperatorState(ctx, referenceBlockNumber, []core.QuorumID{quorumID})
operatorState, err := r.chainState.GetOperatorState(ctx, referenceBlockNumber, []core.QuorumID{quorumID})
if err != nil {
return nil, err
}
operators, ok := indexedOperatorState.Operators[quorumID]
operators, ok := operatorState.Operators[quorumID]
if !ok {
return nil, fmt.Errorf("no quorum with ID: %d", quorumID)
}
Expand All @@ -118,7 +118,7 @@ func (r *retrievalClient) RetrieveBlobChunks(ctx context.Context,
var proof *merkletree.Proof
var proofVerified bool
for opID := range operators {
opInfo := indexedOperatorState.IndexedOperators[opID]
opInfo := operators[opID]
blobHeader, proof, err = r.nodeClient.GetBlobHeader(ctx, opInfo.Socket, batchHeaderHash, blobIndex)
if err != nil {
// try another operator
Expand Down Expand Up @@ -172,7 +172,7 @@ func (r *retrievalClient) RetrieveBlobChunks(ctx context.Context,
return nil, err
}

assignments, info, err := r.assignmentCoordinator.GetAssignments(indexedOperatorState.OperatorState, blobHeader.Length, quorumHeader)
assignments, info, err := r.assignmentCoordinator.GetAssignments(operatorState, blobHeader.Length, quorumHeader)
if err != nil {
return nil, errors.New("failed to get assignments")
}
Expand All @@ -182,7 +182,7 @@ func (r *retrievalClient) RetrieveBlobChunks(ctx context.Context,
pool := workerpool.New(r.numConnections)
for opID := range operators {
opID := opID
opInfo := indexedOperatorState.IndexedOperators[opID]
opInfo := operators[opID]
pool.Submit(func() {
r.nodeClient.GetChunks(ctx, opID, opInfo, batchHeaderHash, blobIndex, quorumID, chunksChan)
})
Expand Down
7 changes: 1 addition & 6 deletions api/clients/retrieval_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,12 +99,7 @@ func setup(t *testing.T) {
indexer = &indexermock.MockIndexer{}
indexer.On("Index").Return(nil).Once()

ics, err := coreindexer.NewIndexedChainState(chainState, indexer)
if err != nil {
panic("failed to create a new indexed chain state")
}

retrievalClient, err = clients.NewRetrievalClient(logger, ics, coordinator, nodeClient, v, 2)
retrievalClient, err = clients.NewRetrievalClient(logger, chainState, coordinator, nodeClient, v, 2)
if err != nil {
panic("failed to create a new retrieval client")
}
Expand Down
44 changes: 39 additions & 5 deletions core/eth/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,14 @@ func (cs *ChainState) GetOperatorStateByOperator(ctx context.Context, blockNumbe
return nil, err
}

return getOperatorState(operatorsByQuorum, uint32(blockNumber))
socketMap := make(map[core.OperatorID]string)
socket, err := cs.Tx.GetOperatorSocket(ctx, operator)
if err != nil {
return nil, err
}
socketMap[operator] = socket

return getOperatorState(operatorsByQuorum, uint32(blockNumber), socketMap)

}

Expand All @@ -38,7 +45,12 @@ func (cs *ChainState) GetOperatorState(ctx context.Context, blockNumber uint, qu
return nil, err
}

return getOperatorState(operatorsByQuorum, uint32(blockNumber))
socketMap, err := cs.buildSocketMap(ctx, operatorsByQuorum)
if err != nil {
return nil, err
}

return getOperatorState(operatorsByQuorum, uint32(blockNumber), socketMap)
}

func (cs *ChainState) GetCurrentBlockNumber() (uint, error) {
Expand All @@ -59,7 +71,26 @@ func (cs *ChainState) GetOperatorSocket(ctx context.Context, blockNumber uint, o
return socket, nil
}

func getOperatorState(operatorsByQuorum core.OperatorStakes, blockNumber uint32) (*core.OperatorState, error) {
// buildSocketMap returns a map from operatorID to socket address for the operators in the operatorsByQuorum
func (cs *ChainState) buildSocketMap(ctx context.Context, operatorsByQuorum core.OperatorStakes) (map[core.OperatorID]string, error) {
socketMap := make(map[core.OperatorID]string)
for _, quorum := range operatorsByQuorum {
for _, op := range quorum {
// if the socket is already in the map, skip
if _, ok := socketMap[op.OperatorID]; ok {
continue
}
socket, err := cs.Tx.GetOperatorSocket(ctx, op.OperatorID)
if err != nil {
return nil, err
}
socketMap[op.OperatorID] = socket
}
}
return socketMap, nil
}

func getOperatorState(operatorsByQuorum core.OperatorStakes, blockNumber uint32, socketMap map[core.OperatorID]string) (*core.OperatorState, error) {
operators := make(map[core.QuorumID]map[core.OperatorID]*core.OperatorInfo)
totals := make(map[core.QuorumID]*core.OperatorInfo)

Expand All @@ -69,15 +100,18 @@ func getOperatorState(operatorsByQuorum core.OperatorStakes, blockNumber uint32)

for ind, op := range quorum {
operators[quorumID][op.OperatorID] = &core.OperatorInfo{
Stake: op.Stake,
Index: core.OperatorIndex(ind),
Stake: op.Stake,
Index: core.OperatorIndex(ind),
Socket: socketMap[op.OperatorID],
}
totalStake.Add(totalStake, op.Stake)
}

totals[quorumID] = &core.OperatorInfo{
Stake: totalStake,
Index: core.OperatorIndex(len(quorum)),
// no socket for the total
Socket: "",
}
}

Expand Down
2 changes: 2 additions & 0 deletions core/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ type OperatorInfo struct {
Stake StakeAmount
// Index is the index of the operator within the quorum
Index OperatorIndex
// Socket is the socket address of the operator, in the form "host:port"
Socket string
}

// OperatorState contains information about the current state of operators which is stored in the blockchain state
Expand Down
38 changes: 22 additions & 16 deletions core/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,33 +14,39 @@ func TestOperatorStateHash(t *testing.T) {
Operators: map[core.QuorumID]map[core.OperatorID]*core.OperatorInfo{
0: {
[32]byte{0}: &core.OperatorInfo{
Stake: big.NewInt(12),
Index: uint(2),
Stake: big.NewInt(12),
Index: uint(2),
Socket: "192.168.1.100:8080",
},
[32]byte{1}: &core.OperatorInfo{
Stake: big.NewInt(23),
Index: uint(3),
Stake: big.NewInt(23),
Index: uint(3),
Socket: "127.0.0.1:3000",
},
},
1: {
[32]byte{1}: &core.OperatorInfo{
Stake: big.NewInt(23),
Index: uint(3),
Stake: big.NewInt(23),
Index: uint(3),
Socket: "127.0.0.1:3000",
},
[32]byte{2}: &core.OperatorInfo{
Stake: big.NewInt(34),
Index: uint(4),
Stake: big.NewInt(34),
Index: uint(4),
Socket: "192.168.1.100:8080",
},
},
},
Totals: map[core.QuorumID]*core.OperatorInfo{
0: {
Stake: big.NewInt(35),
Index: uint(2),
Stake: big.NewInt(35),
Index: uint(2),
Socket: "",
},
1: {
Stake: big.NewInt(57),
Index: uint(2),
Stake: big.NewInt(57),
Index: uint(2),
Socket: "",
},
},
BlockNumber: uint(123),
Expand All @@ -50,8 +56,8 @@ func TestOperatorStateHash(t *testing.T) {
assert.NoError(t, err)
q0 := hash1[0]
q1 := hash1[1]
assert.Equal(t, "3805338f34f77ff1fa23bbc23b1e86c4", hex.EncodeToString(q0[:]))
assert.Equal(t, "2f110a29f2bdd8a19c2d87d05736be0a", hex.EncodeToString(q1[:]))
assert.Equal(t, "6098562ea2e61a8f68743f9162b0adc0", hex.EncodeToString(q0[:]))
assert.Equal(t, "8ceea2ec543eb311e51ccfdc9e00ea4f", hex.EncodeToString(q1[:]))

s2 := core.OperatorState{
Operators: map[core.QuorumID]map[core.OperatorID]*core.OperatorInfo{
Expand Down Expand Up @@ -93,6 +99,6 @@ func TestOperatorStateHash(t *testing.T) {
assert.NoError(t, err)
q0 = hash2[0]
q1 = hash2[1]
assert.Equal(t, "1836448b57ae79decdcb77157cf31698", hex.EncodeToString(q0[:]))
assert.Equal(t, "2f110a29f2bdd8a19c2d87d05736be0a", hex.EncodeToString(q1[:]))
assert.Equal(t, "dc1bbb0b2b5d20238adfd4bd33661423", hex.EncodeToString(q0[:]))
assert.Equal(t, "8ceea2ec543eb311e51ccfdc9e00ea4f", hex.EncodeToString(q1[:]))
}
7 changes: 1 addition & 6 deletions inabox/tests/integration_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,12 +189,7 @@ func setupRetrievalClient(testConfig *deploy.Config) error {
return err
}

ics, err := coreindexer.NewIndexedChainState(cs, indexer)
if err != nil {
return err
}

retrievalClient, err = clients.NewRetrievalClient(logger, ics, agn, nodeClient, v, 10)
retrievalClient, err = clients.NewRetrievalClient(logger, cs, agn, nodeClient, v, 10)
if err != nil {
return err
}
Expand Down
41 changes: 4 additions & 37 deletions retriever/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,11 @@ import (
"github.com/Layr-Labs/eigenda/common/healthcheck"
"github.com/Layr-Labs/eigenda/core"
"github.com/Layr-Labs/eigenda/core/eth"
coreindexer "github.com/Layr-Labs/eigenda/core/indexer"
"github.com/Layr-Labs/eigenda/core/thegraph"
"github.com/Layr-Labs/eigenda/encoding/kzg/verifier"
"github.com/Layr-Labs/eigenda/retriever"
retrivereth "github.com/Layr-Labs/eigenda/retriever/eth"
"github.com/Layr-Labs/eigenda/retriever/flags"
gethcommon "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/rpc"
"github.com/urfave/cli"
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"
Expand Down Expand Up @@ -98,47 +95,17 @@ func RetrieverMain(ctx *cli.Context) error {
log.Fatalln("could not start tcp listener", err)
}
cs := eth.NewChainState(tx, gethClient)
rpcClient, err := rpc.Dial(config.EthClientConfig.RPCURLs[0])
if err != nil {
log.Fatalln("could not start tcp listener", err)
}

var ics core.IndexedChainState
if config.UseGraph {
logger.Info("Using graph node")

logger.Info("Connecting to subgraph", "url", config.ChainStateConfig.Endpoint)
ics = thegraph.MakeIndexedChainState(config.ChainStateConfig, cs, logger)
} else {
logger.Info("Using built-in indexer")

indexer, err := coreindexer.CreateNewIndexer(
&config.IndexerConfig,
gethClient,
rpcClient,
config.EigenDAServiceManagerAddr,
logger,
)
if err != nil {
return err
}
ics, err = coreindexer.NewIndexedChainState(cs, indexer)
if err != nil {
return err
}
}

agn := &core.StdAssignmentCoordinator{}
retrievalClient, err := clients.NewRetrievalClient(logger, ics, agn, nodeClient, v, config.NumConnections)
retrievalClient, err := clients.NewRetrievalClient(logger, cs, agn, nodeClient, v, config.NumConnections)
if err != nil {
log.Fatalln("could not start tcp listener", err)
}

chainClient := retrivereth.NewChainClient(gethClient, logger)
retrieverServiceServer := retriever.NewServer(config, logger, retrievalClient, ics, chainClient)
if err = retrieverServiceServer.Start(context.Background()); err != nil {
log.Fatalln("failed to start retriever service server", err)
}
retrieverServiceServer := retriever.NewServer(config, logger, retrievalClient, cs, chainClient)
// This only start the metrics server; consider unwrapping the function
retrieverServiceServer.Start(context.Background())

// Register reflection service on gRPC server
// This makes "grpcurl -plaintext localhost:9000 list" command work
Expand Down
9 changes: 4 additions & 5 deletions retriever/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ type Server struct {
config *Config
retrievalClient clients.RetrievalClient
chainClient eth.ChainClient
indexedState core.IndexedChainState
chainState core.ChainState
logger logging.Logger
metrics *Metrics
}
Expand All @@ -28,7 +28,7 @@ func NewServer(
config *Config,
logger logging.Logger,
retrievalClient clients.RetrievalClient,
indexedState core.IndexedChainState,
chainState core.ChainState,
chainClient eth.ChainClient,
) *Server {
metrics := NewMetrics(config.MetricsConfig.HTTPPort, logger)
Expand All @@ -37,15 +37,14 @@ func NewServer(
config: config,
retrievalClient: retrievalClient,
chainClient: chainClient,
indexedState: indexedState,
chainState: chainState,
logger: logger.With("component", "RetrieverServer"),
metrics: metrics,
}
}

func (s *Server) Start(ctx context.Context) error {
func (s *Server) Start(ctx context.Context) {
s.metrics.Start(ctx)
return s.indexedState.Start(ctx)
}

func (s *Server) RetrieveBlob(ctx context.Context, req *pb.BlobRequest) (*pb.BlobReply, error) {
Expand Down
Loading
Loading