Skip to content
This repository has been archived by the owner on Oct 23, 2024. It is now read-only.

Commit

Permalink
Adding retries for gRPC calls
Browse files Browse the repository at this point in the history
  • Loading branch information
akirillov committed Jul 2, 2020
1 parent c387843 commit cc2c2f4
Show file tree
Hide file tree
Showing 19 changed files with 2,532 additions and 7 deletions.
30 changes: 27 additions & 3 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -115,3 +115,7 @@ required = [
[[prune.project]]
name = "k8s.io/gengo"
unused-packages = false

[[constraint]]
name = "github.com/grpc-ecosystem/go-grpc-middleware"
version = "1.2.0"
10 changes: 9 additions & 1 deletion pkg/controller.v1alpha3/consts/const.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package consts

import "github.com/kubeflow/katib/pkg/util/v1alpha3/env"
import (
"github.com/kubeflow/katib/pkg/util/v1alpha3/env"
"time"
)

const (
// ConfigExperimentSuggestionName is the config name of the
Expand Down Expand Up @@ -35,6 +38,11 @@ const (
// which is used to run healthz check using grpc probe.
DefaultGRPCService = "manager.v1alpha3.Suggestion"

// DefaultGRPCRetryAttempts is the the maximum number of retries for gRPC calls
DefaultGRPCRetryAttempts = 10
// DefaultGRPCRetryPeriod is a fixed period of time between gRPC call retries
DefaultGRPCRetryPeriod = 3 * time.Second

// DefaultKatibNamespaceEnvName is the default env name of katib namespace
DefaultKatibNamespaceEnvName = "KATIB_CORE_NAMESPACE"
// DefaultKatibComposerEnvName is the default env name of katib suggestion composer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ import (
"fmt"
"time"

"github.com/kubeflow/katib/pkg/controller.v1alpha3/consts"

grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand Down Expand Up @@ -105,7 +108,15 @@ func (g *General) SyncAssignments(
func (g *General) ValidateAlgorithmSettings(instance *suggestionsv1alpha3.Suggestion, e *experimentsv1alpha3.Experiment) error {
logger := log.WithValues("Suggestion", types.NamespacedName{Name: instance.GetName(), Namespace: instance.GetNamespace()})
endpoint := util.GetAlgorithmEndpoint(instance)
conn, err := grpc.Dial(endpoint, grpc.WithInsecure())

callOpts := []grpc_retry.CallOption{
grpc_retry.WithBackoff(grpc_retry.BackoffLinear(consts.DefaultGRPCRetryPeriod)),
grpc_retry.WithMax(consts.DefaultGRPCRetryAttempts),
}
conn, err := grpc.Dial(endpoint, grpc.WithInsecure(),
grpc.WithStreamInterceptor(grpc_retry.StreamClientInterceptor(callOpts...)),
grpc.WithUnaryInterceptor(grpc_retry.UnaryClientInterceptor(callOpts...)),
)
if err != nil {
return err
}
Expand All @@ -118,6 +129,7 @@ func (g *General) ValidateAlgorithmSettings(instance *suggestionsv1alpha3.Sugges
request := &suggestionapi.ValidateAlgorithmSettingsRequest{
Experiment: g.ConvertExperiment(e),
}

// See https://github.com/grpc/grpc-go/issues/2636
// See https://github.com/grpc/grpc-go/pull/2503
_, err = client.ValidateAlgorithmSettings(ctx, request, grpc.WaitForReady(true))
Expand Down
10 changes: 9 additions & 1 deletion pkg/controller.v1beta1/consts/const.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package consts

import "github.com/kubeflow/katib/pkg/util/v1beta1/env"
import (
"github.com/kubeflow/katib/pkg/util/v1beta1/env"
"time"
)

const (
// ConfigExperimentSuggestionName is the config name of the
Expand Down Expand Up @@ -35,6 +38,11 @@ const (
// which is used to run healthz check using grpc probe.
DefaultGRPCService = "manager.v1beta1.Suggestion"

// DefaultGRPCRetryAttempts is the the maximum number of retries for gRPC calls
DefaultGRPCRetryAttempts = 10
// DefaultGRPCRetryPeriod is a fixed period of time between gRPC call retries
DefaultGRPCRetryPeriod = 3 * time.Second

// DefaultKatibNamespaceEnvName is the default env name of katib namespace
DefaultKatibNamespaceEnvName = "KATIB_CORE_NAMESPACE"
// DefaultKatibComposerEnvName is the default env name of katib suggestion composer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"math"
"time"

grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"

"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand Down Expand Up @@ -110,7 +112,15 @@ func (g *General) SyncAssignments(
func (g *General) ValidateAlgorithmSettings(instance *suggestionsv1beta1.Suggestion, e *experimentsv1beta1.Experiment) error {
logger := log.WithValues("Suggestion", types.NamespacedName{Name: instance.GetName(), Namespace: instance.GetNamespace()})
endpoint := util.GetAlgorithmEndpoint(instance)
conn, err := grpc.Dial(endpoint, grpc.WithInsecure())

callOpts := []grpc_retry.CallOption{
grpc_retry.WithBackoff(grpc_retry.BackoffLinear(consts.DefaultGRPCRetryPeriod)),
grpc_retry.WithMax(consts.DefaultGRPCRetryAttempts),
}
conn, err := grpc.Dial(endpoint, grpc.WithInsecure(),
grpc.WithStreamInterceptor(grpc_retry.StreamClientInterceptor(callOpts...)),
grpc.WithUnaryInterceptor(grpc_retry.UnaryClientInterceptor(callOpts...)),
)
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit cc2c2f4

Please sign in to comment.