Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Node metrics #948

Open
wants to merge 34 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
362a390
Add metrics to relay.
cody-littley Nov 26, 2024
3b4637e
Incremental progress.
cody-littley Nov 27, 2024
60f015e
Incremental progress.
cody-littley Nov 27, 2024
2d7e9ef
Incremental progress, need running averages.
cody-littley Nov 27, 2024
b8c7d35
Added running average metrics for GetChunks
cody-littley Nov 27, 2024
a6692c4
Merge branch 'master' into relay-metrics
cody-littley Nov 27, 2024
b9d71d6
Documentation
cody-littley Nov 27, 2024
671f0c8
Add time window to metrics doc
cody-littley Nov 27, 2024
4adb7ea
Added GetBlob metrics.
cody-littley Nov 27, 2024
5579a88
Cleanup.
cody-littley Nov 27, 2024
2b84f21
Cleanup test
cody-littley Nov 27, 2024
fb0cad5
Add locking for running average metric.
cody-littley Nov 27, 2024
a2c05cb
Merge branch 'master' into relay-metrics
cody-littley Nov 27, 2024
dfd2925
Add cache metrics.
cody-littley Nov 27, 2024
24f5f5d
Fix test bug
cody-littley Nov 27, 2024
c3adb70
Made suggested change.
cody-littley Dec 3, 2024
5c8c173
Added metrics for v2 DA node.
cody-littley Dec 3, 2024
1795654
Added metrics documentation.
cody-littley Dec 3, 2024
434c6b9
Merge branch 'master' into node-metrics
cody-littley Dec 6, 2024
5c9274c
Revert deletions.
cody-littley Dec 6, 2024
4d4bfe9
Remove documentation.
cody-littley Dec 6, 2024
d9d898c
Reimplement without metrics framework.
cody-littley Dec 6, 2024
8bd8ff1
Cleanup.
cody-littley Dec 6, 2024
2070eee
Stop background thread when metrics are stopped.
cody-littley Dec 6, 2024
cffa884
Revert unintentional change
cody-littley Dec 6, 2024
5c511c9
Made suggested changes.
cody-littley Dec 10, 2024
a15117f
Don't start two metrics servers.
cody-littley Dec 10, 2024
1076a8f
Fix compile issue.
cody-littley Dec 10, 2024
bbf9005
Merge branch 'master' into node-metrics
cody-littley Dec 11, 2024
62ec4f6
Merge branch 'master' into node-metrics
cody-littley Dec 11, 2024
143b798
Enable debug code.
cody-littley Dec 12, 2024
168ded5
Debug
cody-littley Dec 12, 2024
dd21f61
Fix inabox bug.
cody-littley Dec 12, 2024
1bb1404
Made suggested changes.
cody-littley Dec 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion node/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,14 @@ func NodeMain(ctx *cli.Context) error {
}

// Creates the GRPC server.

// TODO(cody-littley): the metrics server is currently started by eigenmetrics, which is in another repo.
// When we fully remove v1 support, we need to start the metrics server inside the v2 metrics code.
server := nodegrpc.NewServer(config, node, logger, ratelimiter)
serverV2 := nodegrpc.NewServerV2(config, node, logger, ratelimiter)
serverV2, err := nodegrpc.NewServerV2(config, node, logger, ratelimiter, reg)
if err != nil {
return fmt.Errorf("failed to create server v2: %v", err)
}
err = nodegrpc.RunServers(server, serverV2, config, logger)

return err
Expand Down
6 changes: 4 additions & 2 deletions node/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ type Config struct {
EnableNodeApi bool
NodeApiPort string
EnableMetrics bool
MetricsPort string
MetricsPort int
OnchainMetricsInterval int64
Timeout time.Duration
RegisterNodeAtStart bool
Expand All @@ -62,6 +62,7 @@ type Config struct {
OverrideStoreDurationBlocks int64
QuorumIDList []core.QuorumID
DbPath string
DBSizePollPeriod time.Duration
LogPath string
PrivateBls string
ID core.OperatorID
Expand Down Expand Up @@ -207,7 +208,7 @@ func NewConfig(ctx *cli.Context) (*Config, error) {
EnableNodeApi: ctx.GlobalBool(flags.EnableNodeApiFlag.Name),
NodeApiPort: ctx.GlobalString(flags.NodeApiPortFlag.Name),
EnableMetrics: ctx.GlobalBool(flags.EnableMetricsFlag.Name),
MetricsPort: ctx.GlobalString(flags.MetricsPortFlag.Name),
MetricsPort: ctx.GlobalInt(flags.MetricsPortFlag.Name),
cody-littley marked this conversation as resolved.
Show resolved Hide resolved
OnchainMetricsInterval: ctx.GlobalInt64(flags.OnchainMetricsIntervalFlag.Name),
Timeout: timeout,
RegisterNodeAtStart: registerNodeAtStart,
Expand All @@ -218,6 +219,7 @@ func NewConfig(ctx *cli.Context) (*Config, error) {
OverrideStoreDurationBlocks: ctx.GlobalInt64(flags.OverrideStoreDurationBlocksFlag.Name),
QuorumIDList: ids,
DbPath: ctx.GlobalString(flags.DbPathFlag.Name),
DBSizePollPeriod: ctx.GlobalDuration(flags.MetricsDBSizePollPeriodFlag.Name),
PrivateBls: privateBls,
EthClientConfig: ethClientConfig,
EncoderConfig: kzg.ReadCLIConfig(ctx),
Expand Down
13 changes: 11 additions & 2 deletions node/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,11 @@ var (
Required: true,
EnvVar: common.PrefixEnvVar(EnvVarPrefix, "ENABLE_METRICS"),
}
MetricsPortFlag = cli.StringFlag{
MetricsPortFlag = cli.IntFlag{
Name: common.PrefixFlag(FlagPrefix, "metrics-port"),
Usage: "Port at which node listens for metrics calls",
Required: false,
Value: "9091",
Value: 9091,
EnvVar: common.PrefixEnvVar(EnvVarPrefix, "METRICS_PORT"),
}
OnchainMetricsIntervalFlag = cli.StringFlag{
Expand Down Expand Up @@ -98,6 +98,14 @@ var (
Required: true,
EnvVar: common.PrefixEnvVar(EnvVarPrefix, "DB_PATH"),
}
MetricsDBSizePollPeriodFlag = cli.DurationFlag{
Name: common.PrefixFlag(FlagPrefix, "metrics-db-size-poll-period"),
Usage: "The period at which the database size is polled for metrics. " +
"If set to 0, the database size is not polled.",
Required: false,
Value: 10 * time.Minute,
EnvVar: common.PrefixEnvVar(EnvVarPrefix, "METRICS_DB_SIZE_POLL_PERIOD"),
}
// The files for encrypted private keys.
BlsKeyFileFlag = cli.StringFlag{
Name: common.PrefixFlag(FlagPrefix, "bls-key-file"),
Expand Down Expand Up @@ -384,6 +392,7 @@ var optionalFlags = []cli.Flag{
ChunkDownloadTimeoutFlag,
PprofHttpPort,
EnablePprof,
MetricsDBSizePollPeriodFlag,
}

func init() {
Expand Down
175 changes: 175 additions & 0 deletions node/grpc/metrics_v2.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
package grpc

import (
"github.com/Layr-Labs/eigensdk-go/logging"
grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"google.golang.org/grpc"
"os"
"path/filepath"
"sync/atomic"
"time"
)

const namespace = "eigenda_node"

// MetricsV2 encapsulates metrics for the v2 DA node.
type MetricsV2 struct {
logger logging.Logger

registry *prometheus.Registry
grpcServerOption grpc.ServerOption

storeChunksLatency *prometheus.SummaryVec
storeChunksDataSize *prometheus.GaugeVec

getChunksLatency *prometheus.SummaryVec
getChunksDataSize *prometheus.GaugeVec

dbSize *prometheus.GaugeVec
dbSizePollPeriod time.Duration
dbDir string
isAlive *atomic.Bool
}

// NewV2Metrics creates a new MetricsV2 instance. dbSizePollPeriod is the period at which the database size is polled.
// If set to 0, the database size is not polled.
func NewV2Metrics(
logger logging.Logger,
registry *prometheus.Registry,
dbDir string,
dbSizePollPeriod time.Duration) (*MetricsV2, error) {

// These should be re-enabled once the legacy v1 metrics are removed.
//registry.MustRegister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}))
//registry.MustRegister(collectors.NewGoCollector())

grpcMetrics := grpcprom.NewServerMetrics()
registry.MustRegister(grpcMetrics)
grpcServerOption := grpc.UnaryInterceptor(
grpcMetrics.UnaryServerInterceptor(),
)

storeChunksLatency := promauto.With(registry).NewSummaryVec(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may not need these primitive level metrics as gRPC already provides (need to integrate / enable it)

prometheus.SummaryOpts{
Namespace: namespace,
Name: "store_chunks_latency_ms",
Help: "The latency of a StoreChunks() RPC call.",
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
},
[]string{},
)

storeChunksDataSize := promauto.With(registry).NewGaugeVec(
prometheus.GaugeOpts{
Namespace: namespace,
Name: "store_chunks_data_size_bytes",
Help: "The size of the data requested to be stored by StoreChunks() RPC calls.",
},
[]string{},
)

getChunksLatency := promauto.With(registry).NewSummaryVec(
prometheus.SummaryOpts{
Namespace: namespace,
Name: "get_chunks_latency_ms",
Help: "The latency of a GetChunks() RPC call.",
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
},
[]string{},
)

getChunksDataSize := promauto.With(registry).NewGaugeVec(
prometheus.GaugeOpts{
Namespace: namespace,
Name: "get_chunks_data_size_bytes",
Help: "The size of the data requested to be retrieved by GetChunks() RPC calls.",
},
[]string{},
)

dbSize := promauto.With(registry).NewGaugeVec(
prometheus.GaugeOpts{
Namespace: namespace,
Name: "db_size_bytes",
Help: "The size of the leveldb database.",
},
[]string{},
)
isAlive := &atomic.Bool{}
isAlive.Store(true)

return &MetricsV2{
logger: logger,
registry: registry,
grpcServerOption: grpcServerOption,
storeChunksLatency: storeChunksLatency,
storeChunksDataSize: storeChunksDataSize,
getChunksLatency: getChunksLatency,
getChunksDataSize: getChunksDataSize,
dbSize: dbSize,
dbSizePollPeriod: dbSizePollPeriod,
dbDir: dbDir,
isAlive: isAlive,
}, nil
}

// Start starts the metrics server.
func (m *MetricsV2) Start() {
if m.dbSizePollPeriod.Nanoseconds() == 0 {
return
}
go func() {
ticker := time.NewTicker(m.dbSizePollPeriod)

for m.isAlive.Load() {
var size int64
err := filepath.Walk(m.dbDir, func(_ string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if !info.IsDir() {
size += info.Size()
}
return err
})

if err != nil {
m.logger.Errorf("failed to get database size: %v", err)
} else {
m.dbSize.WithLabelValues().Set(float64(size))
}
<-ticker.C
}
}()

}

// Stop stops the metrics server.
func (m *MetricsV2) Stop() {
m.isAlive.Store(false)
}

// GetGRPCServerOption returns the gRPC server option that enables automatic GRPC metrics collection.
func (m *MetricsV2) GetGRPCServerOption() grpc.ServerOption {
return m.grpcServerOption
}

func (m *MetricsV2) ReportStoreChunksLatency(latency time.Duration) {
m.storeChunksLatency.WithLabelValues().Observe(
float64(latency.Nanoseconds()) / float64(time.Millisecond))
}

func (m *MetricsV2) ReportStoreChunksDataSize(size uint64) {
m.storeChunksDataSize.WithLabelValues().Set(float64(size))
}

func (m *MetricsV2) ReportGetChunksLatency(latency time.Duration) {
m.getChunksLatency.WithLabelValues().Observe(
float64(latency.Nanoseconds()) / float64(time.Millisecond))
}

func (m *MetricsV2) ReportGetChunksDataSize(size int) {
m.getChunksDataSize.WithLabelValues().Set(float64(size))
}
6 changes: 4 additions & 2 deletions node/grpc/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ func RunServers(serverV1 *Server, serverV2 *ServerV2, config *node.Config, logge
return errors.New("node V2 server is not configured")
}

serverV2.metrics.Start()

go func() {
for {
addr := fmt.Sprintf("%s:%s", localhost, config.InternalDispersalPort)
Expand All @@ -33,7 +35,7 @@ func RunServers(serverV1 *Server, serverV2 *ServerV2, config *node.Config, logge
}

opt := grpc.MaxRecvMsgSize(60 * 1024 * 1024 * 1024) // 60 GiB
gs := grpc.NewServer(opt)
gs := grpc.NewServer(opt, serverV2.metrics.GetGRPCServerOption())

// Register reflection service on gRPC server
// This makes "grpcurl -plaintext localhost:9000 list" command work
Expand All @@ -60,7 +62,7 @@ func RunServers(serverV1 *Server, serverV2 *ServerV2, config *node.Config, logge
}

opt := grpc.MaxRecvMsgSize(1024 * 1024 * 300) // 300 MiB
gs := grpc.NewServer(opt)
gs := grpc.NewServer(opt, serverV2.metrics.GetGRPCServerOption())

// Register reflection service on gRPC server
// This makes "grpcurl -plaintext localhost:9000 list" command work
Expand Down
Loading
Loading