Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Delete secondary VRG before primary #1689

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -190,9 +190,6 @@ test-util: generate manifests envtest ## Run util tests.
test-util-pvc: generate manifests envtest ## Run util-pvc tests.
go test ./internal/controller/util -coverprofile cover.out -ginkgo.focus PVCS_Util

test-kubeobjects: ## Run kubeobjects tests.
go test ./internal/controller/kubeobjects -coverprofile cover.out -ginkgo.focus Kubeobjects

test-drenv: ## Run drenv tests.
$(MAKE) -C test

Expand Down
62 changes: 51 additions & 11 deletions internal/controller/drplacementcontrol_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (r *DRPlacementControlReconciler) SetupWithManager(mgr ctrl.Manager) error
// For more details, check Reconcile and its Result here:
// - https://pkg.go.dev/sigs.k8s.io/[email protected]/pkg/reconcile
//
//nolint:funlen,gocognit,gocyclo,cyclop
//nolint:funlen,gocognit,gocyclo,cyclop,nestif
func (r *DRPlacementControlReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
logger := r.Log.WithValues("DRPC", req.NamespacedName, "rid", uuid.New())

Expand Down Expand Up @@ -166,13 +166,21 @@ func (r *DRPlacementControlReconciler) Reconcile(ctx context.Context, req ctrl.R
// then the DRPC should be deleted as well. The least we should do here is to clean up DPRC.
err := r.processDeletion(ctx, drpc, placementObj, logger)
if err != nil {
logger.Info(fmt.Sprintf("Error in deleting DRPC: (%v)", err))

statusErr := r.setDeletionStatusAndUpdate(ctx, drpc)
if statusErr != nil {
err = fmt.Errorf("drpc deletion failed: %w and status update failed: %w", err, statusErr)

return ctrl.Result{}, err
}

// Is this an expected condition?
if rmnutil.IsOperationInProgress(err) {
logger.Info("Deleting DRPC in progress", "reason", err)

return ctrl.Result{Requeue: true}, nil
}

// Unexpected error.
return ctrl.Result{}, err
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean just let this be returned and remove the block starting from line 189 to 195 inclusive.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The result will be ERROR log with a stacktrace - we see 18 of these for every delete operation. Since this is an expected condition, it is wrong to treat this as an error.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is an expected condition, it is wrong to treat this as an error.

Same as my comment above

}

Expand Down Expand Up @@ -724,19 +732,19 @@ func (r *DRPlacementControlReconciler) cleanupVRGs(
return fmt.Errorf("failed to retrieve VRGs. We'll retry later. Error (%w)", err)
}

if !ensureVRGsManagedByDRPC(r.Log, mwu, vrgs, drpc, vrgNamespace) {
return fmt.Errorf("VRG adoption in progress")
// We have to ensure the seconrary VRG is deleted before deleting the primary VRG. This will fail until there
// is no secondary VRG in the vrgs list.
if err := r.ensureVRGsDeleted(mwu, vrgs, drpc, vrgNamespace, rmn.Secondary); err != nil {
return err
}

// delete VRG manifestwork
for _, drClusterName := range rmnutil.DRPolicyClusterNames(drPolicy) {
if err := mwu.DeleteManifestWork(mwu.BuildManifestWorkName(rmnutil.MWTypeVRG), drClusterName); err != nil {
return fmt.Errorf("%w", err)
}
// This will fail until there is no primary VRG in the vrgs list.
if err := r.ensureVRGsDeleted(mwu, vrgs, drpc, vrgNamespace, rmn.Primary); err != nil {
return err
}

if len(vrgs) != 0 {
return fmt.Errorf("waiting for VRGs count to go to zero")
return rmnutil.OperationInProgress("waiting for VRGs count to go to zero")
}

// delete MCVs
Expand All @@ -747,6 +755,38 @@ func (r *DRPlacementControlReconciler) cleanupVRGs(
return nil
}

// ensureVRGsDeleted ensure that seconrary or primary VRGs are deleted. Return an error if a vrg could not be deleted,
// or deletion is in progress. Return nil if vrg of specified type was not found.
func (r *DRPlacementControlReconciler) ensureVRGsDeleted(
mwu rmnutil.MWUtil,
vrgs map[string]*rmn.VolumeReplicationGroup,
drpc *rmn.DRPlacementControl,
vrgNamespace string,
replicationState rmn.ReplicationState,
) error {
var inProgress bool

for cluster, vrg := range vrgs {
if vrg.Spec.ReplicationState == replicationState {
if !ensureVRGsManagedByDRPC(r.Log, mwu, vrgs, drpc, vrgNamespace) {
return rmnutil.OperationInProgress(fmt.Sprintf("%s VRG adoption in progress", replicationState))
}

if err := mwu.DeleteManifestWork(mwu.BuildManifestWorkName(rmnutil.MWTypeVRG), cluster); err != nil {
return fmt.Errorf("failed to delete %s VRG manifestwork for cluster %q: %w", replicationState, cluster, err)
}

inProgress = true
}
}

if inProgress {
return rmnutil.OperationInProgress(fmt.Sprintf("%s VRG manifestwork deletion in progress", replicationState))
}

return nil
}

func (r *DRPlacementControlReconciler) deleteAllManagedClusterViews(
drpc *rmn.DRPlacementControl, clusterNames []string,
) error {
Expand Down
12 changes: 0 additions & 12 deletions internal/controller/kubeobjects/requests.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,6 @@ func RequestsMapKeyedByName(requestsStruct Requests) map[string]Request {
return requests
}

type RequestProcessingError struct{ string }

type CaptureSpec struct {
//+optional
Name string `json:"name,omitempty"`
Expand Down Expand Up @@ -138,16 +136,6 @@ type Operation struct {
InverseOp string `json:"inverseOp,omitempty"`
}

func RequestProcessingErrorCreate(s string) RequestProcessingError { return RequestProcessingError{s} }
func (e RequestProcessingError) Error() string { return e.string }

// Called by errors.Is() to match target.
func (RequestProcessingError) Is(target error) bool {
_, ok := target.(RequestProcessingError)

return ok
}

type RequestsManager interface {
ProtectsPath() string
RecoversPath() string
Expand Down
30 changes: 0 additions & 30 deletions internal/controller/kubeobjects/requests_test.go

This file was deleted.

13 changes: 7 additions & 6 deletions internal/controller/kubeobjects/velero/requests.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/go-logr/logr"
pkgerrors "github.com/pkg/errors"
"github.com/ramendr/ramen/internal/controller/kubeobjects"
"github.com/ramendr/ramen/internal/controller/util"
velero "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
corev1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -249,11 +250,11 @@ func backupDummyStatusProcessAndRestore(
velero.BackupPhaseUploading,
velero.BackupPhaseUploadingPartialFailure,
velero.BackupPhaseDeleting:
return nil, kubeobjects.RequestProcessingErrorCreate("backup" + string(backup.Status.Phase))
return nil, util.OperationInProgress("backup" + string(backup.Status.Phase))
case velero.BackupPhaseFailedValidation:
return nil, errors.New("backup" + string(backup.Status.Phase))
default:
return nil, kubeobjects.RequestProcessingErrorCreate("backup.status.phase absent")
return nil, util.OperationInProgress("backup.status.phase absent")
}
}

Expand Down Expand Up @@ -283,13 +284,13 @@ func restoreStatusProcess(
return nil
case velero.RestorePhaseNew,
velero.RestorePhaseInProgress:
return kubeobjects.RequestProcessingErrorCreate("restore" + string(restore.Status.Phase))
return util.OperationInProgress("restore" + string(restore.Status.Phase))
case velero.RestorePhaseFailed,
velero.RestorePhaseFailedValidation,
velero.RestorePhasePartiallyFailed:
return errors.New("restore" + string(restore.Status.Phase))
default:
return kubeobjects.RequestProcessingErrorCreate("restore.status.phase absent")
return util.OperationInProgress("restore.status.phase absent")
}
}

Expand Down Expand Up @@ -399,13 +400,13 @@ func backupRealStatusProcess(
velero.BackupPhaseUploading,
velero.BackupPhaseUploadingPartialFailure,
velero.BackupPhaseDeleting:
return kubeobjects.RequestProcessingErrorCreate("backup" + string(backup.Status.Phase))
return util.OperationInProgress("backup" + string(backup.Status.Phase))
case velero.BackupPhaseFailedValidation,
velero.BackupPhasePartiallyFailed,
velero.BackupPhaseFailed:
return errors.New("backup" + string(backup.Status.Phase))
default:
return kubeobjects.RequestProcessingErrorCreate("backup.status.phase absent")
return util.OperationInProgress("backup.status.phase absent")
}
}

Expand Down
24 changes: 24 additions & 0 deletions internal/controller/util/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// SPDX-FileCopyrightText: The RamenDR authors
// SPDX-License-Identifier: Apache-2.0package util

package util

import "errors"

// OperationInProgress error is return when an operation is in progress and we wait for the desired state. The error
// string should describe the operation for logging the error.
type OperationInProgress string

func (e OperationInProgress) Error() string { return string(e) }

// Called by errors.Is() to match target.
func (OperationInProgress) Is(target error) bool {
_, ok := target.(OperationInProgress)

return ok
}

// IsOperationInProgress returns true if err or error wrapped by it is an OperationInProgress error.
func IsOperationInProgress(err error) bool {
return errors.Is(err, OperationInProgress(""))
Copy link
Member

@BenamarMk BenamarMk Dec 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One question though, is this call going to propagate back to line 15? I doubt it. errors.Is(...) is a free function.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it does here:

if x, ok := err.(interface{ Is(error) bool }); ok && x.Is(target) {
			return true
		}

All good

}
33 changes: 33 additions & 0 deletions internal/controller/util/errors_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// SPDX-FileCopyrightText: The RamenDR authors
// SPDX-License-Identifier: Apache-2.0

package util_test

import (
"errors"
"fmt"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/ramendr/ramen/internal/controller/util"
)

var _ = Describe("OperationInProgress", func() {
Context("comparing errors", func() {
err := util.OperationInProgress("this operation")

It("is not equal", func() {
Expect(err == util.OperationInProgress("other operation")).To(Equal(false))
})
It("match error", func() {
Expect(util.IsOperationInProgress(err)).To(Equal(true))
})
It("match wrapped error", func() {
wrapped := fmt.Errorf("wrapping operation in progress: %w", err)
Expect(util.IsOperationInProgress(wrapped)).To(Equal(true))
})
It("does not match other errors", func() {
Expect(util.IsOperationInProgress(errors.New("other error"))).To(Equal(false))
})
})
})
4 changes: 2 additions & 2 deletions internal/controller/vrg_kubeobjects.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ func (v *VRGInstance) kubeObjectsGroupCapture(
continue
}

if errors.Is(err, kubeobjects.RequestProcessingError{}) {
if util.IsOperationInProgress(err) {
log1.Info("Kube objects group capturing", "state", err.Error())

continue
Expand Down Expand Up @@ -695,7 +695,7 @@ func (v *VRGInstance) executeRecoverGroup(result *ctrl.Result, s3StoreAccessor s
}
}

if errors.Is(err, kubeobjects.RequestProcessingError{}) {
if util.IsOperationInProgress(err) {
log1.Info("Kube objects group recovering", "state", err.Error())

return err
Expand Down
Loading