Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Node metrics #948

Open
wants to merge 35 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
362a390
Add metrics to relay.
cody-littley Nov 26, 2024
3b4637e
Incremental progress.
cody-littley Nov 27, 2024
60f015e
Incremental progress.
cody-littley Nov 27, 2024
2d7e9ef
Incremental progress, need running averages.
cody-littley Nov 27, 2024
b8c7d35
Added running average metrics for GetChunks
cody-littley Nov 27, 2024
a6692c4
Merge branch 'master' into relay-metrics
cody-littley Nov 27, 2024
b9d71d6
Documentation
cody-littley Nov 27, 2024
671f0c8
Add time window to metrics doc
cody-littley Nov 27, 2024
4adb7ea
Added GetBlob metrics.
cody-littley Nov 27, 2024
5579a88
Cleanup.
cody-littley Nov 27, 2024
2b84f21
Cleanup test
cody-littley Nov 27, 2024
fb0cad5
Add locking for running average metric.
cody-littley Nov 27, 2024
a2c05cb
Merge branch 'master' into relay-metrics
cody-littley Nov 27, 2024
dfd2925
Add cache metrics.
cody-littley Nov 27, 2024
24f5f5d
Fix test bug
cody-littley Nov 27, 2024
c3adb70
Made suggested change.
cody-littley Dec 3, 2024
5c8c173
Added metrics for v2 DA node.
cody-littley Dec 3, 2024
1795654
Added metrics documentation.
cody-littley Dec 3, 2024
434c6b9
Merge branch 'master' into node-metrics
cody-littley Dec 6, 2024
5c9274c
Revert deletions.
cody-littley Dec 6, 2024
4d4bfe9
Remove documentation.
cody-littley Dec 6, 2024
d9d898c
Reimplement without metrics framework.
cody-littley Dec 6, 2024
8bd8ff1
Cleanup.
cody-littley Dec 6, 2024
2070eee
Stop background thread when metrics are stopped.
cody-littley Dec 6, 2024
cffa884
Revert unintentional change
cody-littley Dec 6, 2024
5c511c9
Made suggested changes.
cody-littley Dec 10, 2024
a15117f
Don't start two metrics servers.
cody-littley Dec 10, 2024
1076a8f
Fix compile issue.
cody-littley Dec 10, 2024
bbf9005
Merge branch 'master' into node-metrics
cody-littley Dec 11, 2024
62ec4f6
Merge branch 'master' into node-metrics
cody-littley Dec 11, 2024
143b798
Enable debug code.
cody-littley Dec 12, 2024
168ded5
Debug
cody-littley Dec 12, 2024
dd21f61
Fix inabox bug.
cody-littley Dec 12, 2024
1bb1404
Made suggested changes.
cody-littley Dec 12, 2024
864e7d0
Made suggested changes.
cody-littley Dec 13, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package common
import (
"bytes"
"crypto/sha256"
"time"
"unsafe"

"github.com/fxamacker/cbor/v2"
Expand Down Expand Up @@ -62,3 +63,9 @@ func DecodeFromBytes[T any](b []byte) (T, error) {
}
return t, nil
}

// ToMilliseconds converts the given duration to milliseconds. Unlike duration.Milliseconds(), this function returns
// a float64 with nanosecond precision (at least, as much precision as floating point numbers allow).
func ToMilliseconds(duration time.Duration) float64 {
return float64(duration.Nanoseconds()) / float64(time.Millisecond)
}
13 changes: 7 additions & 6 deletions disperser/apiserver/metrics_v2.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package apiserver

import (
"github.com/Layr-Labs/eigenda/common"
grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
Expand Down Expand Up @@ -116,15 +117,15 @@ func newAPIServerV2Metrics(registry *prometheus.Registry) *metricsV2 {
}

func (m *metricsV2) reportGetBlobCommitmentLatency(duration time.Duration) {
m.getBlobCommitmentLatency.WithLabelValues().Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond))
m.getBlobCommitmentLatency.WithLabelValues().Observe(common.ToMilliseconds(duration))
}

func (m *metricsV2) reportGetPaymentStateLatency(duration time.Duration) {
m.getPaymentStateLatency.WithLabelValues().Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond))
m.getPaymentStateLatency.WithLabelValues().Observe(common.ToMilliseconds(duration))
}

func (m *metricsV2) reportDisperseBlobLatency(duration time.Duration) {
m.disperseBlobLatency.WithLabelValues().Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond))
m.disperseBlobLatency.WithLabelValues().Observe(common.ToMilliseconds(duration))
}

func (m *metricsV2) reportDisperseBlobSize(size int) {
Expand All @@ -133,13 +134,13 @@ func (m *metricsV2) reportDisperseBlobSize(size int) {

func (m *metricsV2) reportValidateDispersalRequestLatency(duration time.Duration) {
m.validateDispersalRequestLatency.WithLabelValues().Observe(
float64(duration.Nanoseconds()) / float64(time.Millisecond))
common.ToMilliseconds(duration))
}

func (m *metricsV2) reportStoreBlobLatency(duration time.Duration) {
m.storeBlobLatency.WithLabelValues().Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond))
m.storeBlobLatency.WithLabelValues().Observe(common.ToMilliseconds(duration))
}

func (m *metricsV2) reportGetBlobStatusLatency(duration time.Duration) {
m.getBlobStatusLatency.WithLabelValues().Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond))
m.getBlobStatusLatency.WithLabelValues().Observe(common.ToMilliseconds(duration))
}
37 changes: 19 additions & 18 deletions disperser/controller/dispatcher_metrics.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package controller

import (
"github.com/Layr-Labs/eigenda/common"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"time"
Expand Down Expand Up @@ -248,77 +249,77 @@ func newDispatcherMetrics(registry *prometheus.Registry) *dispatcherMetrics {
}

func (m *dispatcherMetrics) reportHandleBatchLatency(duration time.Duration) {
m.handleBatchLatency.WithLabelValues().Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond))
m.handleBatchLatency.WithLabelValues().Observe(common.ToMilliseconds(duration))
}

func (m *dispatcherMetrics) reportNewBatchLatency(duration time.Duration) {
m.newBatchLatency.WithLabelValues().Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond))
m.newBatchLatency.WithLabelValues().Observe(common.ToMilliseconds(duration))
}

func (m *dispatcherMetrics) reportGetBlobMetadataLatency(duration time.Duration) {
m.getBlobMetadataLatency.WithLabelValues().Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond))
m.getBlobMetadataLatency.WithLabelValues().Observe(common.ToMilliseconds(duration))
}

func (m *dispatcherMetrics) reportGetOperatorStateLatency(duration time.Duration) {
m.getOperatorStateLatency.WithLabelValues().Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond))
m.getOperatorStateLatency.WithLabelValues().Observe(common.ToMilliseconds(duration))
}

func (m *dispatcherMetrics) reportGetBlobCertificatesLatency(duration time.Duration) {
m.getBlobCertificatesLatency.WithLabelValues().Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond))
m.getBlobCertificatesLatency.WithLabelValues().Observe(common.ToMilliseconds(duration))
}

func (m *dispatcherMetrics) reportBuildMerkleTreeLatency(duration time.Duration) {
m.buildMerkleTreeLatency.WithLabelValues().Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond))
m.buildMerkleTreeLatency.WithLabelValues().Observe(common.ToMilliseconds(duration))
}

func (m *dispatcherMetrics) reportPutBatchHeaderLatency(duration time.Duration) {
m.putBatchHeaderLatency.WithLabelValues().Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond))
m.putBatchHeaderLatency.WithLabelValues().Observe(common.ToMilliseconds(duration))
}

func (m *dispatcherMetrics) reportProofLatency(duration time.Duration) {
m.proofLatency.WithLabelValues().Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond))
m.proofLatency.WithLabelValues().Observe(common.ToMilliseconds(duration))
}

func (m *dispatcherMetrics) reportPutVerificationInfosLatency(duration time.Duration) {
m.putVerificationInfosLatency.WithLabelValues().Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond))
m.putVerificationInfosLatency.WithLabelValues().Observe(common.ToMilliseconds(duration))
}

func (m *dispatcherMetrics) reportPoolSubmissionLatency(duration time.Duration) {
m.poolSubmissionLatency.WithLabelValues().Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond))
m.poolSubmissionLatency.WithLabelValues().Observe(common.ToMilliseconds(duration))
}

func (m *dispatcherMetrics) reportPutDispersalRequestLatency(duration time.Duration) {
m.putDispersalRequestLatency.WithLabelValues().Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond))
m.putDispersalRequestLatency.WithLabelValues().Observe(common.ToMilliseconds(duration))
}

func (m *dispatcherMetrics) reportSendChunksLatency(duration time.Duration) {
m.sendChunksLatency.WithLabelValues().Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond))
m.sendChunksLatency.WithLabelValues().Observe(common.ToMilliseconds(duration))
}

func (m *dispatcherMetrics) reportSendChunksRetryCount(retries float64) {
m.sendChunksRetryCount.WithLabelValues().Set(retries)
}

func (m *dispatcherMetrics) reportPutDispersalResponseLatency(duration time.Duration) {
m.putDispersalResponseLatency.WithLabelValues().Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond))
m.putDispersalResponseLatency.WithLabelValues().Observe(common.ToMilliseconds(duration))
}

func (m *dispatcherMetrics) reportHandleSignaturesLatency(duration time.Duration) {
m.handleSignaturesLatency.WithLabelValues().Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond))
m.handleSignaturesLatency.WithLabelValues().Observe(common.ToMilliseconds(duration))
}

func (m *dispatcherMetrics) reportReceiveSignaturesLatency(duration time.Duration) {
m.receiveSignaturesLatency.WithLabelValues().Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond))
m.receiveSignaturesLatency.WithLabelValues().Observe(common.ToMilliseconds(duration))
}

func (m *dispatcherMetrics) reportAggregateSignaturesLatency(duration time.Duration) {
m.aggregateSignaturesLatency.WithLabelValues().Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond))
m.aggregateSignaturesLatency.WithLabelValues().Observe(common.ToMilliseconds(duration))
}

func (m *dispatcherMetrics) reportPutAttestationLatency(duration time.Duration) {
m.putAttestationLatency.WithLabelValues().Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond))
m.putAttestationLatency.WithLabelValues().Observe(common.ToMilliseconds(duration))
}

func (m *dispatcherMetrics) reportUpdateBatchStatusLatency(duration time.Duration) {
m.updateBatchStatusLatency.WithLabelValues().Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond))
m.updateBatchStatusLatency.WithLabelValues().Observe(common.ToMilliseconds(duration))
}
11 changes: 6 additions & 5 deletions disperser/controller/encoding_manager_metrics.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package controller

import (
common "github.com/Layr-Labs/eigenda/common"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"time"
Expand Down Expand Up @@ -123,23 +124,23 @@ func newEncodingManagerMetrics(registry *prometheus.Registry) *encodingManagerMe
}

func (m *encodingManagerMetrics) reportBatchSubmissionLatency(duration time.Duration) {
m.batchSubmissionLatency.WithLabelValues().Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond))
m.batchSubmissionLatency.WithLabelValues().Observe(common.ToMilliseconds(duration))
}

func (m *encodingManagerMetrics) reportBlobHandleLatency(duration time.Duration) {
m.blobHandleLatency.WithLabelValues().Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond))
m.blobHandleLatency.WithLabelValues().Observe(common.ToMilliseconds(duration))
}

func (m *encodingManagerMetrics) reportEncodingLatency(duration time.Duration) {
m.encodingLatency.WithLabelValues().Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond))
m.encodingLatency.WithLabelValues().Observe(common.ToMilliseconds(duration))
}

func (m *encodingManagerMetrics) reportPutBlobCertLatency(duration time.Duration) {
m.putBlobCertLatency.WithLabelValues().Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond))
m.putBlobCertLatency.WithLabelValues().Observe(common.ToMilliseconds(duration))
}

func (m *encodingManagerMetrics) reportUpdateBlobStatusLatency(duration time.Duration) {
m.updateBlobStatusLatency.WithLabelValues().Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond))
m.updateBlobStatusLatency.WithLabelValues().Observe(common.ToMilliseconds(duration))
}

func (m *encodingManagerMetrics) reportBatchSize(size int) {
Expand Down
8 changes: 7 additions & 1 deletion node/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,14 @@ func NodeMain(ctx *cli.Context) error {
}

// Creates the GRPC server.

// TODO(cody-littley): the metrics server is currently started by eigenmetrics, which is in another repo.
// When we fully remove v1 support, we need to start the metrics server inside the v2 metrics code.
server := nodegrpc.NewServer(config, node, logger, ratelimiter)
serverV2 := nodegrpc.NewServerV2(config, node, logger, ratelimiter)
serverV2, err := nodegrpc.NewServerV2(config, node, logger, ratelimiter, reg)
if err != nil {
return fmt.Errorf("failed to create server v2: %v", err)
}
err = nodegrpc.RunServers(server, serverV2, config, logger)

return err
Expand Down
4 changes: 2 additions & 2 deletions node/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ type Config struct {
EnableNodeApi bool
NodeApiPort string
EnableMetrics bool
MetricsPort string
MetricsPort int
OnchainMetricsInterval int64
Timeout time.Duration
RegisterNodeAtStart bool
Expand Down Expand Up @@ -207,7 +207,7 @@ func NewConfig(ctx *cli.Context) (*Config, error) {
EnableNodeApi: ctx.GlobalBool(flags.EnableNodeApiFlag.Name),
NodeApiPort: ctx.GlobalString(flags.NodeApiPortFlag.Name),
EnableMetrics: ctx.GlobalBool(flags.EnableMetricsFlag.Name),
MetricsPort: ctx.GlobalString(flags.MetricsPortFlag.Name),
MetricsPort: ctx.GlobalInt(flags.MetricsPortFlag.Name),
cody-littley marked this conversation as resolved.
Show resolved Hide resolved
OnchainMetricsInterval: ctx.GlobalInt64(flags.OnchainMetricsIntervalFlag.Name),
Timeout: timeout,
RegisterNodeAtStart: registerNodeAtStart,
Expand Down
4 changes: 2 additions & 2 deletions node/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,11 @@ var (
Required: true,
EnvVar: common.PrefixEnvVar(EnvVarPrefix, "ENABLE_METRICS"),
}
MetricsPortFlag = cli.StringFlag{
MetricsPortFlag = cli.IntFlag{
Name: common.PrefixFlag(FlagPrefix, "metrics-port"),
Usage: "Port at which node listens for metrics calls",
Required: false,
Value: "9091",
Value: 9091,
EnvVar: common.PrefixEnvVar(EnvVarPrefix, "METRICS_PORT"),
}
OnchainMetricsIntervalFlag = cli.StringFlag{
Expand Down
111 changes: 111 additions & 0 deletions node/grpc/metrics_v2.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package grpc

import (
"github.com/Layr-Labs/eigenda/common"
"github.com/Layr-Labs/eigensdk-go/logging"
grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"google.golang.org/grpc"
"time"
)

const namespace = "eigenda_node"

// MetricsV2 encapsulates metrics for the v2 DA node.
type MetricsV2 struct {
logger logging.Logger

registry *prometheus.Registry
grpcServerOption grpc.ServerOption

storeChunksLatency *prometheus.SummaryVec
storeChunksRequestSize *prometheus.GaugeVec

getChunksLatency *prometheus.SummaryVec
getChunksDataSize *prometheus.GaugeVec
}

// NewV2Metrics creates a new MetricsV2 instance. dbSizePollPeriod is the period at which the database size is polled.
// If set to 0, the database size is not polled.
func NewV2Metrics(logger logging.Logger, registry *prometheus.Registry) (*MetricsV2, error) {

// These should be re-enabled once the legacy v1 metrics are removed.
//registry.MustRegister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}))
//registry.MustRegister(collectors.NewGoCollector())

grpcMetrics := grpcprom.NewServerMetrics()
registry.MustRegister(grpcMetrics)
grpcServerOption := grpc.UnaryInterceptor(
grpcMetrics.UnaryServerInterceptor(),
)

storeChunksLatency := promauto.With(registry).NewSummaryVec(
cody-littley marked this conversation as resolved.
Show resolved Hide resolved
prometheus.SummaryOpts{
Namespace: namespace,
Name: "store_chunks_latency_ms",
Help: "The latency of a StoreChunks() RPC call.",
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
},
[]string{},
)

storeChunksRequestSize := promauto.With(registry).NewGaugeVec(
prometheus.GaugeOpts{
Namespace: namespace,
Name: "store_chunks_request_size_bytes",
Help: "The size of the data requested to be stored by StoreChunks() RPC calls.",
},
[]string{},
)

getChunksLatency := promauto.With(registry).NewSummaryVec(
prometheus.SummaryOpts{
Namespace: namespace,
Name: "get_chunks_latency_ms",
Help: "The latency of a GetChunks() RPC call.",
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
},
[]string{},
)

getChunksDataSize := promauto.With(registry).NewGaugeVec(
prometheus.GaugeOpts{
Namespace: namespace,
Name: "get_chunks_data_size_bytes",
Help: "The size of the data requested to be retrieved by GetChunks() RPC calls.",
},
[]string{},
)

return &MetricsV2{
logger: logger,
registry: registry,
grpcServerOption: grpcServerOption,
storeChunksLatency: storeChunksLatency,
storeChunksRequestSize: storeChunksRequestSize,
getChunksLatency: getChunksLatency,
getChunksDataSize: getChunksDataSize,
}, nil
}

// GetGRPCServerOption returns the gRPC server option that enables automatic GRPC metrics collection.
func (m *MetricsV2) GetGRPCServerOption() grpc.ServerOption {
return m.grpcServerOption
}

func (m *MetricsV2) ReportStoreChunksLatency(latency time.Duration) {
m.storeChunksLatency.WithLabelValues().Observe(common.ToMilliseconds(latency))
}

func (m *MetricsV2) ReportStoreChunksRequestSize(size uint64) {
m.storeChunksRequestSize.WithLabelValues().Set(float64(size))
}

func (m *MetricsV2) ReportGetChunksLatency(latency time.Duration) {
m.getChunksLatency.WithLabelValues().Observe(common.ToMilliseconds(latency))
}

func (m *MetricsV2) ReportGetChunksDataSize(size int) {
m.getChunksDataSize.WithLabelValues().Set(float64(size))
}
4 changes: 2 additions & 2 deletions node/grpc/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func RunServers(serverV1 *Server, serverV2 *ServerV2, config *node.Config, logge
}

opt := grpc.MaxRecvMsgSize(60 * 1024 * 1024 * 1024) // 60 GiB
gs := grpc.NewServer(opt)
gs := grpc.NewServer(opt, serverV2.metrics.GetGRPCServerOption())

// Register reflection service on gRPC server
// This makes "grpcurl -plaintext localhost:9000 list" command work
Expand All @@ -60,7 +60,7 @@ func RunServers(serverV1 *Server, serverV2 *ServerV2, config *node.Config, logge
}

opt := grpc.MaxRecvMsgSize(1024 * 1024 * 300) // 300 MiB
gs := grpc.NewServer(opt)
gs := grpc.NewServer(opt, serverV2.metrics.GetGRPCServerOption())

// Register reflection service on gRPC server
// This makes "grpcurl -plaintext localhost:9000 list" command work
Expand Down
Loading
Loading