diff --git a/api/v1alpha1/vmdiskimage_types.go b/api/v1alpha1/vmdiskimage_types.go index 51c82aa..c997f1a 100644 --- a/api/v1alpha1/vmdiskimage_types.go +++ b/api/v1alpha1/vmdiskimage_types.go @@ -34,20 +34,24 @@ const ( // Condition Reasons const ( - ReasonResourceCreationFailed string = "ResourceCreationFailed" - ReasonResouceUpdateFailed string = "ResourceUpdateFailed" - ReasonQueued string = "Queued" - ReasonSyncing string = "Syncing" - ReasonRetryLimitExceeded string = "RetryLimitExceeded" - ReasonSynced string = "Synced" + ReasonResourceCreationFailed string = "ResourceCreationFailed" + ReasonResouceUpdateFailed string = "ResourceUpdateFailed" + ReasonQueued string = "Queued" + ReasonSyncing string = "Syncing" + ReasonRetryLimitExceeded string = "RetryLimitExceeded" + ReasonMissingSourceArtifact string = "MissingSourceArtifact" + ReasonSyncAttemptDurationExceeded string = "SyncAttemptDurationExceeded" + ReasonUnknownSyncFailure string = "UnknownSyncFailure" + ReasonSynced string = "Synced" ) // CRD phases const ( - PhaseQueued string = "Queued" - PhaseSyncing string = "Syncing" - PhaseReady string = "Ready" - PhaseFailed string = "Failed" + PhaseQueued string = "Queued" + PhaseSyncing string = "Syncing" + PhaseReady string = "Ready" + PhaseRetryableFailure string = "RetryableFailure" + PhaseFailed string = "Failed" ) // VMDiskImage Labels @@ -94,7 +98,7 @@ type VMDiskImageStatus struct { // INSERT ADDITIONAL STATUS FIELD - define observed state of cluster // Important: Run "make" to regenerate code after modifying this file - // +kubebuilder:validation:Enum=Queued;Syncing;Ready;Failed + // +kubebuilder:validation:Enum=Queued;Syncing;Ready;Failed;RetryableFailure Phase string `json:"phase"` // A human-readable message providing more details about the current phase. @@ -103,7 +107,8 @@ type VMDiskImageStatus struct { // Conditions of the VMDiskImage resource. Conditions []metav1.Condition `json:"conditions,omitempty"` - FailureCount int `json:"failureCount,omitempty"` + FailureCount int `json:"failureCount,omitempty"` + LastFailureTime *metav1.Time `json:"lastFailureTime,omitempty"` } // +kubebuilder:object:root=true diff --git a/api/v1alpha1/zz_generated.deepcopy.go b/api/v1alpha1/zz_generated.deepcopy.go index 93ddcca..db29f87 100644 --- a/api/v1alpha1/zz_generated.deepcopy.go +++ b/api/v1alpha1/zz_generated.deepcopy.go @@ -124,6 +124,10 @@ func (in *VMDiskImageStatus) DeepCopyInto(out *VMDiskImageStatus) { (*in)[i].DeepCopyInto(&(*out)[i]) } } + if in.LastFailureTime != nil { + in, out := &in.LastFailureTime, &out.LastFailureTime + *out = (*in).DeepCopy() + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new VMDiskImageStatus. diff --git a/charts/data-sync-operator/templates/crd/vmdiskimages.crd.pelotech.ot.yaml b/charts/data-sync-operator/templates/crd/vmdiskimages.crd.pelotech.ot.yaml index c86c8d0..032c1a7 100644 --- a/charts/data-sync-operator/templates/crd/vmdiskimages.crd.pelotech.ot.yaml +++ b/charts/data-sync-operator/templates/crd/vmdiskimages.crd.pelotech.ot.yaml @@ -137,6 +137,9 @@ spec: type: array failureCount: type: integer + lastFailureTime: + format: date-time + type: string message: description: A human-readable message providing more details about the current phase. type: string @@ -146,6 +149,7 @@ spec: - Syncing - Ready - Failed + - RetryableFailure type: string required: - phase diff --git a/config/crd/bases/crd.pelotech.ot_vmdiskimages.yaml b/config/crd/bases/crd.pelotech.ot_vmdiskimages.yaml index 2188d92..8d60673 100644 --- a/config/crd/bases/crd.pelotech.ot_vmdiskimages.yaml +++ b/config/crd/bases/crd.pelotech.ot_vmdiskimages.yaml @@ -140,6 +140,9 @@ spec: type: array failureCount: type: integer + lastFailureTime: + format: date-time + type: string message: description: A human-readable message providing more details about the current phase. @@ -150,6 +153,7 @@ spec: - Syncing - Ready - Failed + - RetryableFailure type: string required: - phase diff --git a/dist/install.yaml b/dist/install.yaml index e43aff7..18b4cc7 100644 --- a/dist/install.yaml +++ b/dist/install.yaml @@ -148,6 +148,9 @@ spec: type: array failureCount: type: integer + lastFailureTime: + format: date-time + type: string message: description: A human-readable message providing more details about the current phase. @@ -158,6 +161,7 @@ spec: - Syncing - Ready - Failed + - RetryableFailure type: string required: - phase diff --git a/internal/vm-disk-image/config/vmdi-controller-config-reader.go b/internal/vm-disk-image/config/vmdi-controller-config-reader.go index 3c5cd49..8583781 100644 --- a/internal/vm-disk-image/config/vmdi-controller-config-reader.go +++ b/internal/vm-disk-image/config/vmdi-controller-config-reader.go @@ -6,38 +6,44 @@ import ( ) const ( - defaultConcurrency = 10 // TODO: We will need to tune this default - defaultRetryLimit = 2 - defaultBackoffDuration = 10 * time.Second - defaultMaxSyncDuration = 1 * time.Hour + defaultConcurrency = 5 // TODO: We will need to tune this default + defaultMaxBackoffDelay = 1 * time.Hour + defaultMaxSyncDuration = 12 * time.Hour + defaultMaxSyncAttemptRetries = 3 + defaultMaxSyncAttemptDuration = 1 * time.Hour ) type VMDiskImageControllerConfig struct { - Concurrency int - RetryLimit int - RetryBackoffDuration time.Duration - MaxSyncDuration time.Duration + Concurrency int + MaxBackoffDelay time.Duration + MaxSyncDuration time.Duration + MaxSyncAttemptDuration time.Duration + MaxSyncAttemptRetries int } // This function will allow us to get the required config variables from the environment. // Locally this is your "env" and in production these values will come from a configmap func LoadVMDIControllerConfigFromEnv() VMDiskImageControllerConfig { // The max amount of VMDIs we can have syncing at one time. - concurrency := corecfg.GetIntEnvOrDefault("CONCURRENCY", defaultConcurrency) + concurrency := corecfg.GetIntEnvOrDefault("MAX_VMDI_SYNC_CONCURRENCY", defaultConcurrency) - // How many times we will retry a failed sync. - retryLimit := corecfg.GetIntEnvOrDefault("RETRY_LIMIT", defaultRetryLimit) + // The longest we will ever wait to retry. + maxBackoffDelay := corecfg.GetDurationEnvOrDefault("MAX_SYNC_RETRY_BACKOFF_DURATION", defaultMaxBackoffDelay) - // How long we want to wait before trying to resync a failed VMDI. - retryBackoffDuration := corecfg.GetDurationEnvOrDefault("RETRY_BACKOFF_DURATION", defaultBackoffDuration) + // How long we will try to run a sync before we fail it forever. + maxSyncDuration := corecfg.GetDurationEnvOrDefault("MAX_SYNC_DURATION", defaultMaxSyncDuration) // How long we will let a VMDI sit in syncing status. - maxSyncDuration := corecfg.GetDurationEnvOrDefault("MAX_SYNC_DURATION", defaultMaxSyncDuration) + maxAttemptDuration := corecfg.GetDurationEnvOrDefault("MAX_SYNC_ATTEMPT_DURATION", defaultMaxSyncAttemptDuration) + + // How many times we will retry on a given attempt. + maxSyncAttemptRetries := corecfg.GetIntEnvOrDefault("MAX_SYNC_ATTEMPT_RETRIES", defaultMaxSyncAttemptRetries) return VMDiskImageControllerConfig{ - Concurrency: concurrency, - RetryLimit: retryLimit, - RetryBackoffDuration: retryBackoffDuration, - MaxSyncDuration: maxSyncDuration, + Concurrency: concurrency, + MaxBackoffDelay: maxBackoffDelay, + MaxSyncAttemptDuration: maxAttemptDuration, + MaxSyncAttemptRetries: maxSyncAttemptRetries, + MaxSyncDuration: maxSyncDuration, } } diff --git a/internal/vm-disk-image/controller/controller.go b/internal/vm-disk-image/controller/controller.go index 5675549..7d92cd2 100644 --- a/internal/vm-disk-image/controller/controller.go +++ b/internal/vm-disk-image/controller/controller.go @@ -75,13 +75,9 @@ func (r *VMDiskImageReconciler) Reconcile(ctx context.Context, req ctrl.Request) return r.VMDiskImageOrchestrator.DeleteResource(ctx, &VMDiskImage) } - resourceHasFinalizer := !crutils.ContainsFinalizer(&VMDiskImage, crdv1.VMDiskImageFinalizer) - if resourceHasFinalizer { - err := r.AddControllerFinalizer(ctx, &VMDiskImage) - if err != nil { - return r.HandleResourceUpdateError(ctx, &VMDiskImage, err, "Failed to add finalizer to our resource") - } - + resourceMissingFinalizer := !crutils.ContainsFinalizer(&VMDiskImage, crdv1.VMDiskImageFinalizer) + if resourceMissingFinalizer { + return r.AddControllerFinalizer(ctx, &VMDiskImage) } currentPhase := VMDiskImage.Status.Phase @@ -93,6 +89,8 @@ func (r *VMDiskImageReconciler) Reconcile(ctx context.Context, req ctrl.Request) return r.AttemptSyncingOfResource(ctx, &VMDiskImage) case crdv1.PhaseSyncing: return r.TransitonFromSyncing(ctx, &VMDiskImage) + case crdv1.PhaseRetryableFailure: + return r.AttemptRetry(ctx, &VMDiskImage) case crdv1.PhaseReady, crdv1.PhaseFailed: return ctrl.Result{}, nil default: @@ -112,18 +110,18 @@ func (r *VMDiskImageReconciler) SetupWithManager(mgr ctrl.Manager) error { resourceGenerator := &vmdi.Generator{} vmdiProvisioner := vmdi.K8sVMDIProvisioner{ - Client: client, - ResourceGenerator: resourceGenerator, - MaxSyncDuration: config.MaxSyncDuration, - RetryLimit: config.RetryLimit, + Client: client, + ResourceGenerator: resourceGenerator, + MaxSyncAttemptDuration: config.MaxSyncAttemptDuration, + MaxSyncAttemptRetries: config.MaxSyncAttemptRetries, } orchestrator := vmdi.Orchestrator{ - Client: client, - Recorder: mgr.GetEventRecorderFor(crdv1.VMDiskImageControllerName), - Provisioner: vmdiProvisioner, - RetryLimit: config.RetryLimit, - RetryBackoff: config.RetryBackoffDuration, - SyncLimit: config.Concurrency, + Client: client, + Recorder: mgr.GetEventRecorderFor(crdv1.VMDiskImageControllerName), + Provisioner: vmdiProvisioner, + MaxRetryBackoff: config.MaxBackoffDelay, + MaxSyncTime: config.MaxSyncDuration, + ConcurrentSyncLimit: config.Concurrency, } reconciler := &VMDiskImageReconciler{ Scheme: mgr.GetScheme(), diff --git a/internal/vm-disk-image/service/orchestrator.go b/internal/vm-disk-image/service/orchestrator.go index 4b8ddd5..87a8116 100644 --- a/internal/vm-disk-image/service/orchestrator.go +++ b/internal/vm-disk-image/service/orchestrator.go @@ -2,10 +2,13 @@ package service import ( "context" + "errors" + "math" crdv1 "pelotech/data-sync-operator/api/v1alpha1" "time" types "k8s.io/apimachinery/pkg/types" + "k8s.io/utils/ptr" crutils "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -19,34 +22,38 @@ import ( type VMDiskImageOrchestrator interface { GetVMDiskImage(ctx context.Context, namespace types.NamespacedName, vmdi *crdv1.VMDiskImage) error - AddControllerFinalizer(ctx context.Context, vmdi *crdv1.VMDiskImage) error + AddControllerFinalizer(ctx context.Context, vmdi *crdv1.VMDiskImage) (ctrl.Result, error) IndexVMDiskImageByPhase(rawObj client.Object) []string ListVMDiskImagesByPhase(ctx context.Context, phase string) (*crdv1.VMDiskImageList, error) QueueResourceCreation(ctx context.Context, vmdi *crdv1.VMDiskImage) (ctrl.Result, error) AttemptSyncingOfResource(ctx context.Context, vmdi *crdv1.VMDiskImage) (ctrl.Result, error) TransitonFromSyncing(ctx context.Context, vmdi *crdv1.VMDiskImage) (ctrl.Result, error) + AttemptRetry(ctx context.Context, vmdi *crdv1.VMDiskImage) (ctrl.Result, error) DeleteResource(ctx context.Context, vmdi *crdv1.VMDiskImage) (ctrl.Result, error) - HandleResourceUpdateError(ctx context.Context, ds *crdv1.VMDiskImage, originalErr error, message string) (ctrl.Result, error) - HandleSyncError(ctx context.Context, vmdi *crdv1.VMDiskImage, originalErr error, message string) (ctrl.Result, error) } type Orchestrator struct { client.Client - Recorder record.EventRecorder - Provisioner VMDiskImageProvisioner - RetryLimit int - RetryBackoff time.Duration - SyncLimit int + Recorder record.EventRecorder + Provisioner VMDiskImageProvisioner + MaxRetryBackoff time.Duration + MaxSyncTime time.Duration + ConcurrentSyncLimit int } func (o Orchestrator) GetVMDiskImage(ctx context.Context, namespace types.NamespacedName, vmdi *crdv1.VMDiskImage) error { return o.Get(ctx, namespace, vmdi) } -func (o Orchestrator) AddControllerFinalizer(ctx context.Context, vmdi *crdv1.VMDiskImage) error { +func (o Orchestrator) AddControllerFinalizer(ctx context.Context, vmdi *crdv1.VMDiskImage) (ctrl.Result, error) { crutils.AddFinalizer(vmdi, crdv1.VMDiskImageFinalizer) - return o.Update(ctx, vmdi) + err := o.Update(ctx, vmdi) + if err != nil { + return o.HandleResourceUpdateError(ctx, vmdi, err, "Failed to add finalizer to our resource") + } + + return ctrl.Result{}, nil } func (o Orchestrator) ListVMDiskImagesByPhase(ctx context.Context, phase string) (*crdv1.VMDiskImageList, error) { @@ -92,7 +99,7 @@ func (o Orchestrator) QueueResourceCreation(ctx context.Context, vmdi *crdv1.VMD o.Recorder.Eventf(vmdi, "Normal", "Queued", "Resource successfully queued for sync orchestration") - return ctrl.Result{Requeue: true}, nil + return ctrl.Result{}, nil } func (o Orchestrator) AttemptSyncingOfResource( @@ -106,9 +113,9 @@ func (o Orchestrator) AttemptSyncingOfResource( logger.Error(err, "Failed to list syncing resources") return ctrl.Result{}, err } - if len(syncingList.Items) >= o.SyncLimit { - o.Recorder.Eventf(vmdi, "Normal", "WaitingToSync", "No more than %d VMDiskImages can be syncing at once. Waiting...", o.SyncLimit) - return ctrl.Result{RequeueAfter: o.RetryBackoff}, nil + if len(syncingList.Items) >= o.ConcurrentSyncLimit { + o.Recorder.Eventf(vmdi, "Normal", "WaitingToSync", "No more than %d VMDiskImages can be syncing at once. Waiting...", o.ConcurrentSyncLimit) + return ctrl.Result{RequeueAfter: 10 * time.Second}, nil } err = o.Provisioner.CreateResources(ctx, vmdi) @@ -141,7 +148,6 @@ func (o Orchestrator) TransitonFromSyncing(ctx context.Context, vmdi *crdv1.VMDi // Check if there is an error occurring in the sync syncError := o.Provisioner.ResourcesHaveErrors(ctx, vmdi) if syncError != nil { - logger.Error(syncError, "A sync error has occurred.") return o.HandleSyncError(ctx, vmdi, syncError, "A error has occurred while syncing") } @@ -152,7 +158,7 @@ func (o Orchestrator) TransitonFromSyncing(ctx context.Context, vmdi *crdv1.VMDi } if !isDone { logger.Info("Sync is not complete. Requeuing.") - return ctrl.Result{RequeueAfter: o.RetryBackoff}, nil + return ctrl.Result{RequeueAfter: 10 * time.Second}, nil } vmdi.Status.Phase = crdv1.PhaseReady @@ -172,6 +178,38 @@ func (o Orchestrator) TransitonFromSyncing(ctx context.Context, vmdi *crdv1.VMDi return ctrl.Result{}, nil } +func (o Orchestrator) AttemptRetry(ctx context.Context, vmdi *crdv1.VMDiskImage) (ctrl.Result, error) { + syncDeadline := metav1.NewTime(vmdi.CreationTimestamp.Add(o.MaxSyncTime)) + exceededSyncDeadline := metav1.Now().After(syncDeadline.Time) + + // Fail forever if we're past the deadline + if exceededSyncDeadline { + vmdi.Status.Phase = crdv1.PhaseFailed + vmdi.Status.Message = "Exceeded overall sync retry window. Failed Permanently" + + if err := o.Status().Update(ctx, vmdi); err != nil { + return o.HandleResourceUpdateError(ctx, vmdi, err, "failed to reset VMDI on retry") + } + + return ctrl.Result{}, nil + } + + // Exponential retry + var backoffInterval time.Duration + nextBackoffMinutes := int(math.Floor(math.Pow(3, float64(vmdi.Status.FailureCount)))) + nextBackoffDuration := time.Duration(nextBackoffMinutes) * time.Minute + backoffInterval = min(nextBackoffDuration, o.MaxRetryBackoff) + timeSinceFailure := time.Since(vmdi.Status.LastFailureTime.Time) + // If we haven't waited as long as we need to backoff and requeue + if timeSinceFailure < backoffInterval { + remaingWaitTime := backoffInterval - timeSinceFailure + return ctrl.Result{RequeueAfter: remaingWaitTime}, nil + } + + return o.QueueResourceCreation(ctx, vmdi) + +} + func (o Orchestrator) DeleteResource(ctx context.Context, vmdi *crdv1.VMDiskImage) (ctrl.Result, error) { logger := logf.FromContext(ctx) @@ -193,7 +231,7 @@ func (o Orchestrator) HandleResourceUpdateError( logger.Error(originalErr, message) // Mark the resource as Failed - vmdi.Status.Phase = crdv1.PhaseFailed + vmdi.Status.Phase = crdv1.PhaseRetryableFailure vmdi.Status.Message = "An error occurred during reconciliation: " + originalErr.Error() meta.SetStatusCondition(&vmdi.Status.Conditions, metav1.Condition{ Type: crdv1.ConditionTypeReady, @@ -215,7 +253,7 @@ func (o Orchestrator) HandleResourceCreationError(ctx context.Context, vmdi *crd logger.Error(originalErr, "Failed to create a resource.") o.Recorder.Eventf(vmdi, "Warning", "ResourceCreationFailed", "Failed to create resources.") - vmdi.Status.Phase = crdv1.PhaseFailed + vmdi.Status.Phase = crdv1.PhaseRetryableFailure vmdi.Status.Message = "Failed while creating resources: " + originalErr.Error() meta.SetStatusCondition(&vmdi.Status.Conditions, metav1.Condition{ Type: crdv1.ConditionTypeReady, @@ -243,20 +281,24 @@ func (o Orchestrator) HandleSyncError(ctx context.Context, vmdi *crdv1.VMDiskIma o.Recorder.Eventf(vmdi, "Warning", "SyncErrorOccurred", originalErr.Error()) vmdi.Status.FailureCount += 1 - if err := o.Status().Update(ctx, vmdi); err != nil { - logger.Error(err, "Failed to update resource failure count") - } - if vmdi.Status.FailureCount < o.RetryLimit { - return ctrl.Result{RequeueAfter: o.RetryBackoff}, nil + vmdi.Status.Phase = crdv1.PhaseRetryableFailure + vmdi.Status.LastFailureTime = ptr.To(metav1.Now()) + vmdi.Status.Message = "An error occurred during reconciliation: " + originalErr.Error() + + reason := crdv1.ReasonUnknownSyncFailure + switch { + case errors.Is(originalErr, ErrSyncAttemptExceedsMaxDuration): + reason = crdv1.ReasonSyncAttemptDurationExceeded + case errors.Is(originalErr, ErrSyncAttemptExceedsRetries): + reason = crdv1.ReasonRetryLimitExceeded + case errors.Is(originalErr, ErrMissingSourceArtifact): + reason = crdv1.ReasonMissingSourceArtifact } - o.Recorder.Eventf(vmdi, "Warning", "SyncExceededRetryCount", "The sync has failed beyond the set retry limit of %d", o.RetryLimit) - vmdi.Status.Phase = crdv1.PhaseFailed - vmdi.Status.Message = "An error occurred during reconciliation: " + originalErr.Error() meta.SetStatusCondition(&vmdi.Status.Conditions, metav1.Condition{ Type: crdv1.ConditionTypeReady, Status: metav1.ConditionFalse, - Reason: crdv1.ReasonRetryLimitExceeded, + Reason: reason, Message: originalErr.Error(), }) @@ -269,5 +311,5 @@ func (o Orchestrator) HandleSyncError(ctx context.Context, vmdi *crdv1.VMDiskIma logger.Error(err, "Failed to teardown resources.") } - return ctrl.Result{}, originalErr + return ctrl.Result{}, nil } diff --git a/internal/vm-disk-image/service/provisioner.go b/internal/vm-disk-image/service/provisioner.go index 55b2b4f..4e3bcae 100644 --- a/internal/vm-disk-image/service/provisioner.go +++ b/internal/vm-disk-image/service/provisioner.go @@ -2,8 +2,10 @@ package service import ( "context" + "errors" "fmt" crdv1 "pelotech/data-sync-operator/api/v1alpha1" + "strings" "time" snapshotv1 "github.com/kubernetes-csi/external-snapshotter/client/v8/apis/volumesnapshot/v1" @@ -24,13 +26,17 @@ type VMDiskImageProvisioner interface { type K8sVMDIProvisioner struct { client.Client - ResourceGenerator VMDIResourceGenerator - MaxSyncDuration time.Duration - RetryLimit int + ResourceGenerator VMDIResourceGenerator + MaxSyncAttemptDuration time.Duration + MaxSyncAttemptRetries int } const dataVolumeDonePhase = "Succeeded" +var ErrMissingSourceArtifact = errors.New("the requested artifact does not exist") +var ErrSyncAttemptExceedsRetries = errors.New("the sync attempt has failed beyond the retry limit") +var ErrSyncAttemptExceedsMaxDuration = errors.New("the sync attempt has lasted beyond its max duration") + // Create resources for a given VMDiskImage. Stops creating them if // a single resource fails to create. Does not cleanup after itself func (p K8sVMDIProvisioner) CreateResources( @@ -172,8 +178,8 @@ func (p K8sVMDIProvisioner) ResourcesHaveErrors( // Normal calculation timeSyncing = now.Sub(syncStartTime) } - if timeSyncing > p.MaxSyncDuration { - return fmt.Errorf("the VMDiskImage %s has been syncing longer than the allowed sync time", vmdi.Name) + if timeSyncing > p.MaxSyncAttemptDuration { + return ErrSyncAttemptExceedsMaxDuration } searchLabels := getLabelsToMatch(vmdi) @@ -187,9 +193,17 @@ func (p K8sVMDIProvisioner) ResourcesHaveErrors( } for _, dv := range dataVolumeList.Items { - if dv.Status.RestartCount >= int32(p.RetryLimit) { - return fmt.Errorf("a datavolume has restarted more than the max for a sync") + for _, cond := range dv.Status.Conditions { + missingSourceArtifact := strings.Contains(cond.Message, "404") || strings.Contains(strings.ToLower(cond.Message), "not found") + if missingSourceArtifact { + return ErrMissingSourceArtifact + } + } + + if dv.Status.RestartCount >= int32(p.MaxSyncAttemptRetries) { + return ErrSyncAttemptExceedsRetries } + } return nil