Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add exponential retry mechanism for rpc requests using utility function #593

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ require (
github.com/bits-and-blooms/bitset v1.10.0 // indirect
github.com/btcsuite/btcd/btcec/v2 v2.3.2 // indirect
github.com/btcsuite/btcd/btcutil v1.1.3 // indirect
github.com/cenkalti/backoff/v4 v4.2.1 // indirect
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/cockroachdb/errors v1.9.1 // indirect
github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ github.com/btcsuite/websocket v0.0.0-20150119174127-31079b680792/go.mod h1:ghJtE
github.com/btcsuite/winsvc v1.0.0/go.mod h1:jsenWakMcC0zFBFurPLEAyrnc/teJEM1O46fmI40EZs=
github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM=
github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8=
github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cespare/cp v0.1.0 h1:SE+dxFebS7Iik5LK0tsi1k9ZCxEaFX4AjQmoyA+1dJk=
github.com/cespare/cp v0.1.0/go.mod h1:SOGHArjBr4JWaSDEVpWpo/hNg6RoKrls6Oh40hiwW+s=
Expand Down
78 changes: 28 additions & 50 deletions relayer/application_relayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,10 @@ import (
const (
// Number of retries to collect signatures from validators
maxRelayerQueryAttempts = 5
// Maximum amount of time to spend waiting (in addition to network round trip time per attempt)
// during relayer signature query routine
signatureRequestRetryWaitPeriodMs = 10_000
)

var (
// Errors
errFailedToGetAggSig = errors.New("failed to get aggregate signature from node endpoint")
)
// Errors
var errFailedToGetAggSig = errors.New("failed to get aggregate signature from node endpoint")

// CheckpointManager stores committed heights in the database
type CheckpointManager interface {
Expand Down Expand Up @@ -276,53 +271,36 @@ func (r *ApplicationRelayer) createSignedMessage(
signedWarpMessageBytes hexutil.Bytes
err error
)
for attempt := 1; attempt <= maxRelayerQueryAttempts; attempt++ {
r.logger.Debug(
"Relayer collecting signatures from peers.",
zap.Int("attempt", attempt),
zap.String("sourceBlockchainID", r.sourceBlockchain.GetBlockchainID().String()),
zap.String("destinationBlockchainID", r.relayerID.DestinationBlockchainID.String()),
zap.String("signingSubnetID", r.signingSubnetID.String()),
)

err = r.sourceWarpSignatureClient.CallContext(
context.Background(),
&signedWarpMessageBytes,
"warp_getMessageAggregateSignature",
unsignedMessage.ID(),
r.warpConfig.QuorumNumerator,
r.signingSubnetID.String(),
)
if err == nil {
warpMsg, err := avalancheWarp.ParseMessage(signedWarpMessageBytes)
if err != nil {
r.logger.Error(
"Failed to parse signed warp message",
zap.Error(err),
)
return nil, err
}
return warpMsg, err
}
r.logger.Info(
"Failed to get aggregate signature from node endpoint. Retrying.",
zap.Int("attempt", attempt),
zap.Error(err),
)
if attempt != maxRelayerQueryAttempts {
// Sleep such that all retries are uniformly spread across totalRelayerQueryPeriodMs
// TODO: We may want to consider an exponential back off rather than a uniform sleep period.
time.Sleep(time.Duration(signatureRequestRetryWaitPeriodMs/maxRelayerQueryAttempts) * time.Millisecond)
}
}
r.logger.Warn(
"Failed to get aggregate signature from node endpoint",
zap.Int("attempts", maxRelayerQueryAttempts),
err = utils.WithMaxRetriesLog(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a head's up, the Warp API integration is planned to be deprecated in the near future. It's still reasonable to integrate exponential backoff here for the time being.

func() error {
return r.sourceWarpSignatureClient.CallContext(
context.Background(),
&signedWarpMessageBytes,
"warp_getMessageAggregateSignature",
unsignedMessage.ID(),
r.warpConfig.QuorumNumerator,
r.signingSubnetID.String(),
)
},
maxRelayerQueryAttempts,
r.logger,
"Failed to get aggregate signature from node endpoint.",
zap.String("sourceBlockchainID", r.sourceBlockchain.GetBlockchainID().String()),
zap.String("destinationBlockchainID", r.relayerID.DestinationBlockchainID.String()),
zap.String("signingSubnetID", r.signingSubnetID.String()),
)
return nil, errFailedToGetAggSig
if err != nil {
return nil, errFailedToGetAggSig
}
warpMsg, err := avalancheWarp.ParseMessage(signedWarpMessageBytes)
if err != nil {
r.logger.Error(
"Failed to parse signed warp message",
zap.Error(err),
)
return nil, err
}
return warpMsg, nil
}

//
Expand Down
44 changes: 44 additions & 0 deletions utils/backoff.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package utils

import (
"time"

"github.com/ava-labs/avalanchego/utils/logging"
"github.com/cenkalti/backoff/v4"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)

// WithMaxRetriesLog runs the operation until it succeeds or max retries has been reached.
// It uses exponential back off.
// It optionally logs information if logger is set.
func WithMaxRetriesLog(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than separate with/without log functions, I think we should instead have a single method that takes a logging.Logger and emits warning logs on retries. I can't think of any use cases where we'd want retry attempts to be logged as Warn for some operations, but not logged at all for others.

operation backoff.Operation,
max uint64,
logger logging.Logger,
msg string,
fields ...zapcore.Field,
Comment on lines +19 to +20
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think logging msg and fields on each attempt is not worth the added complexity of having to pass them in as arguments. Rather, WithMaxRetries should emit a generic warning log on each attempt failure, and we can leave it to the caller to construct a cohesive error log in the failure case.

) error {
attempt := uint(1)
expBackOff := backoff.WithMaxRetries(backoff.NewExponentialBackOff(), max)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's use backoff.WithMaxElapsedTime instead. At present, we choose the number of retries and the delay between each to resolve to a set elapsed time before we emit an error.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand well, you want to use backoff.WithMaxElapsedTime instead of backoff.WithMaxRetries.

Instead of giving a maxRetry to the utility function, I will give a maxElapsedTime derived from the existing values (numberOfRetries * delayBetweenEach)

notify := func(err error, duration time.Duration) {
if logger == nil {
return
}
fields := append(fields, zap.Uint("attempt", attempt), zap.Error(err), zap.Duration("backoff", duration))
logger.Warn(msg, fields...)
attempt++
}
err := backoff.RetryNotify(operation, expBackOff, notify)
if err != nil && logger != nil {
fields := append(fields, zap.Uint64("attempts", uint64(attempt)), zap.Error(err))
logger.Error(msg, fields...)
}
return err
}

// WithMaxRetries rens the operation until it succeeds or max retries has been reached.
// It uses exponential back off.
func WithMaxRetries(operation backoff.Operation, max uint64) error {
return WithMaxRetriesLog(operation, max, nil, "")
}
55 changes: 55 additions & 0 deletions utils/backoff_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package utils

import (
"errors"
"testing"

"github.com/stretchr/testify/require"
)

func TestWithMaxRetries(t *testing.T) {
t.Run("NotEnoughRetry", func(t *testing.T) {
retryable := newMockRetryableFn(3)
err := WithMaxRetries(
func() (err error) {
_, err = retryable.Run()
return err
},
2,
)
require.Error(t, err)
})
t.Run("EnoughRetry", func(t *testing.T) {
retryable := newMockRetryableFn(2)
var res bool
err := WithMaxRetries(
func() (err error) {
res, err = retryable.Run()
return err
},
2,
)
require.NoError(t, err)
require.True(t, res)
})
}

type mockRetryableFn struct {
counter uint64
trigger uint64
}

func newMockRetryableFn(trigger uint64) mockRetryableFn {
return mockRetryableFn{
counter: 0,
trigger: trigger,
}
}

func (m *mockRetryableFn) Run() (bool, error) {
if m.counter == m.trigger {
return true, nil
}
m.counter++
return false, errors.New("error")
}
95 changes: 38 additions & 57 deletions vms/evm/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/utils/logging"
"github.com/ava-labs/icm-services/utils"
"github.com/ava-labs/subnet-evm/core/types"
"github.com/ava-labs/subnet-evm/ethclient"
"github.com/ava-labs/subnet-evm/interfaces"
Expand All @@ -22,7 +23,7 @@ const (
maxClientSubscriptionBuffer = 20000
subscribeRetryTimeout = 1 * time.Second
MaxBlocksPerRequest = 200
rpcMaxRetries = 5
rpcMaxTries = 5
)

// subscriber implements Subscriber
Expand Down Expand Up @@ -128,75 +129,55 @@ func (s *subscriber) processBlockRange(
func (s *subscriber) getHeaderByNumberRetryable(headerNumber *big.Int) (*types.Header, error) {
var err error
var header *types.Header
attempt := 1
for {
header, err = s.rpcClient.HeaderByNumber(context.Background(), headerNumber)
if err == nil {
return header, nil
}
s.logger.Warn(
"Failed to get header by number",
zap.String("blockchainID", s.blockchainID.String()),
zap.Int("attempt", attempt),
zap.Error(err),
)
if attempt >= rpcMaxRetries {
return nil, err
}
time.Sleep(subscribeRetryTimeout)
attempt++
err = utils.WithMaxRetriesLog(
func() (err error) {
header, err = s.rpcClient.HeaderByNumber(context.Background(), headerNumber)
return err
},
rpcMaxTries,
s.logger,
"Failed to get header by number",
zap.String("blockchainID", s.blockchainID.String()),
zap.Error(err),
)
if err != nil {
return nil, err
}
return header, nil
}

// Loops forever iff maxResubscribeAttempts == 0
func (s *subscriber) Subscribe(maxResubscribeAttempts int) error {
// Retry subscribing until successful. Attempt to resubscribe maxResubscribeAttempts times
attempt := 1
for {
// Unsubscribe before resubscribing
// s.sub should only be nil on the first call to Subscribe
if s.sub != nil {
s.sub.Unsubscribe()
}
err := s.subscribe()
if err == nil {
s.logger.Info(
"Successfully subscribed",
zap.String("blockchainID", s.blockchainID.String()),
)
return nil
}

s.logger.Warn(
"Failed to subscribe to node",
zap.Int("attempt", attempt),
zap.String("blockchainID", s.blockchainID.String()),
zap.Error(err),
)

if attempt == maxResubscribeAttempts {
break
}

time.Sleep(subscribeRetryTimeout)
attempt++
// Unsubscribe before resubscribing
// s.sub should only be nil on the first call to Subscribe
if s.sub != nil {
s.sub.Unsubscribe()
}

return fmt.Errorf("failed to subscribe to node with all %d attempts", maxResubscribeAttempts)
err := s.subscribe(uint64(maxResubscribeAttempts))
if err != nil {
return fmt.Errorf("failed to subscribe to node with all %d attempts", maxResubscribeAttempts)
}
return nil
}

func (s *subscriber) subscribe() error {
sub, err := s.wsClient.SubscribeNewHead(context.Background(), s.headers)
// subscribe until it succeeds or reached maxSubscribeAttempts.
func (s *subscriber) subscribe(maxSubscribeAttempts uint64) error {
var sub interfaces.Subscription
err := utils.WithMaxRetriesLog(
func() (err error) {
sub, err = s.wsClient.SubscribeNewHead(context.Background(), s.headers)
return err
},
maxSubscribeAttempts,
s.logger,
"Failed to subscribe to node",
zap.String("blockchainID", s.blockchainID.String()),
)
if err != nil {
s.logger.Error(
"Failed to subscribe to logs",
zap.String("blockchainID", s.blockchainID.String()),
zap.Error(err),
)
return err
}
s.sub = sub

return nil
}

Expand Down
Loading