diff --git a/api/v1/mdb/mongodb_types.go b/api/v1/mdb/mongodb_types.go index a009deaa3..199146431 100644 --- a/api/v1/mdb/mongodb_types.go +++ b/api/v1/mdb/mongodb_types.go @@ -243,6 +243,9 @@ func GetLastAdditionalMongodConfigByType(lastSpec *MongoDbSpec, configType Addit // GetLastAdditionalMongodConfigByType returns the last successfully achieved AdditionalMongodConfigType for the given component. func (m *MongoDB) GetLastAdditionalMongodConfigByType(configType AdditionalMongodConfigType) (*AdditionalMongodConfig, error) { + if m.Spec.GetResourceType() == ReplicaSet { + panic(errors.Errorf("this method cannot be used from ReplicaSet controller; use non-method GetLastAdditionalMongodConfigByType and pass lastSpec from the deployment state.")) + } if m.Spec.GetResourceType() == ShardedCluster { panic(errors.Errorf("this method cannot be used from ShardedCluster controller; use non-method GetLastAdditionalMongodConfigByType and pass lastSpec from the deployment state.")) } diff --git a/controllers/om/deployment/testing_utils.go b/controllers/om/deployment/testing_utils.go index d62a1aca2..79f0ebf82 100644 --- a/controllers/om/deployment/testing_utils.go +++ b/controllers/om/deployment/testing_utils.go @@ -26,7 +26,7 @@ func CreateFromReplicaSet(mongoDBImage string, forceEnterprise bool, rs *mdb.Mon ), zap.S()) d := om.NewDeployment() - lastConfig, err := rs.GetLastAdditionalMongodConfigByType(mdb.ReplicaSetConfig) + lastConfig, err := mdb.GetLastAdditionalMongodConfigByType(nil, mdb.ReplicaSetConfig) if err != nil { panic(err) } diff --git a/controllers/operator/mongodbreplicaset_controller.go b/controllers/operator/mongodbreplicaset_controller.go index bb30016a9..378161e5c 100644 --- a/controllers/operator/mongodbreplicaset_controller.go +++ b/controllers/operator/mongodbreplicaset_controller.go @@ -64,7 +64,9 @@ import ( "github.com/mongodb/mongodb-kubernetes/pkg/vault/vaultwatcher" ) -// ReconcileMongoDbReplicaSet reconciles a MongoDB with a type of ReplicaSet +// ReconcileMongoDbReplicaSet reconciles a MongoDB with a type of ReplicaSet. +// WARNING: do not put any mutable state into this struct. +// Controller runtime uses and shares a single instance of it. type ReconcileMongoDbReplicaSet struct { *ReconcileCommonController omConnectionFactory om.ConnectionFactory @@ -76,58 +78,115 @@ type ReconcileMongoDbReplicaSet struct { databaseNonStaticImageVersion string } +type ReplicaSetDeploymentState struct { + LastAchievedSpec *mdbv1.MongoDbSpec `json:"lastAchievedSpec"` +} + var _ reconcile.Reconciler = &ReconcileMongoDbReplicaSet{} -func newReplicaSetReconciler(ctx context.Context, kubeClient client.Client, imageUrls images.ImageUrls, initDatabaseNonStaticImageVersion, databaseNonStaticImageVersion string, forceEnterprise bool, enableClusterMongoDBRoles bool, omFunc om.ConnectionFactory) *ReconcileMongoDbReplicaSet { - return &ReconcileMongoDbReplicaSet{ - ReconcileCommonController: NewReconcileCommonController(ctx, kubeClient), - omConnectionFactory: omFunc, - imageUrls: imageUrls, - forceEnterprise: forceEnterprise, - enableClusterMongoDBRoles: enableClusterMongoDBRoles, +// ReplicaSetReconcilerHelper contains state and logic for a SINGLE reconcile execution. +// This object is NOT shared between reconcile invocations. +type ReplicaSetReconcilerHelper struct { + resource *mdbv1.MongoDB + deploymentState *ReplicaSetDeploymentState + reconciler *ReconcileMongoDbReplicaSet + log *zap.SugaredLogger +} - initDatabaseNonStaticImageVersion: initDatabaseNonStaticImageVersion, - databaseNonStaticImageVersion: databaseNonStaticImageVersion, +func (r *ReconcileMongoDbReplicaSet) newReconcilerHelper( + ctx context.Context, + rs *mdbv1.MongoDB, + log *zap.SugaredLogger, +) (*ReplicaSetReconcilerHelper, error) { + helper := &ReplicaSetReconcilerHelper{ + resource: rs, + reconciler: r, + log: log, + } + + if err := helper.initialize(ctx); err != nil { + return nil, err } + + return helper, nil } -type deploymentOptionsRS struct { - agentCertPath string - agentCertHash string - prometheusCertHash string - currentAgentAuthMode string +// readState abstract reading the state of the resource that we store on the cluster between reconciliations. +func (h *ReplicaSetReconcilerHelper) readState() (*ReplicaSetDeploymentState, error) { + // Try to get the last achieved spec from annotations and store it in state + if lastAchievedSpec, err := h.resource.GetLastSpec(); err != nil { + return nil, err + } else { + return &ReplicaSetDeploymentState{LastAchievedSpec: lastAchievedSpec}, nil + } } -// Generic Kubernetes Resources -// +kubebuilder:rbac:groups=core,resources=namespaces,verbs=list;watch,namespace=placeholder -// +kubebuilder:rbac:groups=core,resources=pods,verbs=get;list;watch,namespace=placeholder -// +kubebuilder:rbac:groups=core,resources=services,verbs=get;list;watch;create;update,namespace=placeholder -// +kubebuilder:rbac:groups=core,resources={secrets,configmaps},verbs=get;list;watch;create;delete;update,namespace=placeholder -// +kubebuilder:rbac:groups=apps,resources=statefulsets,verbs=create;get;list;watch;delete;update,namespace=placeholder +// writeState abstract writing the state of the resource that we store on the cluster between reconciliations. +func (h *ReplicaSetReconcilerHelper) writeState(ctx context.Context) error { + // Serialize the state to annotations + annotationsToAdd, err := getAnnotationsForResource(h.resource) + if err != nil { + return err + } -// MongoDB Resource -// +kubebuilder:rbac:groups=mongodb.com,resources={mongodb,mongodb/status,mongodb/finalizers},verbs=*,namespace=placeholder + // Add vault annotations if needed + if vault.IsVaultSecretBackend() { + secrets := h.resource.GetSecretsMountedIntoDBPod() + vaultMap := make(map[string]string) + for _, s := range secrets { + path := fmt.Sprintf("%s/%s/%s", h.reconciler.VaultClient.DatabaseSecretMetadataPath(), h.resource.Namespace, s) + vaultMap = merge.StringToStringMap(vaultMap, h.reconciler.VaultClient.GetSecretAnnotation(path)) + } + path := fmt.Sprintf("%s/%s/%s", h.reconciler.VaultClient.OperatorScretMetadataPath(), h.resource.Namespace, h.resource.Spec.Credentials) + vaultMap = merge.StringToStringMap(vaultMap, h.reconciler.VaultClient.GetSecretAnnotation(path)) + for k, val := range vaultMap { + annotationsToAdd[k] = val + } + } -// Setting up a webhook -// +kubebuilder:rbac:groups=admissionregistration.k8s.io,resources=validatingwebhookconfigurations,verbs=get;create;update;delete + // Write annotations back to the resource + if err := annotations.SetAnnotations(ctx, h.resource, annotationsToAdd, h.reconciler.client); err != nil { + return err + } -// Certificate generation -// +kubebuilder:rbac:groups=certificates.k8s.io,resources=certificatesigningrequests,verbs=get;create;list;watch + h.log.Debugf("Successfully wrote deployment state for ReplicaSet %s/%s", h.resource.Namespace, h.resource.Name) + return nil +} -// Reconcile reads that state of the cluster for a MongoDbReplicaSet object and makes changes based on the state read -// and what is in the MongoDbReplicaSet.Spec -func (r *ReconcileMongoDbReplicaSet) Reconcile(ctx context.Context, request reconcile.Request) (res reconcile.Result, e error) { - // === 1. Initial Checks and setup - log := zap.S().With("ReplicaSet", request.NamespacedName) - rs := &mdbv1.MongoDB{} +func (h *ReplicaSetReconcilerHelper) initialize(ctx context.Context) error { + state, err := h.readState() + if err != nil { + return xerrors.Errorf("Failed to initialize replica set state: %w", err) + } + h.deploymentState = state + return nil +} - if reconcileResult, err := r.prepareResourceForReconciliation(ctx, request, rs, log); err != nil { - if errors.IsNotFound(err) { - return workflow.Invalid("Object for reconciliation not found").ReconcileResult() - } - return reconcileResult, err +// updateStatus overrides the common controller's updateStatus to ensure that the deployment state +// is written after every status update. This ensures state consistency even on early returns. +// It must be executed only once per reconcile (with a return) +func (h *ReplicaSetReconcilerHelper) updateStatus(ctx context.Context, status workflow.Status, statusOptions ...mdbstatus.Option) (reconcile.Result, error) { + result, err := h.reconciler.ReconcileCommonController.updateStatus(ctx, h.resource, status, h.log, statusOptions...) + if err != nil { + return result, err + } + + // Write deployment state after every status update + if err := h.writeState(ctx); err != nil { + return h.reconciler.ReconcileCommonController.updateStatus(ctx, h.resource, workflow.Failed(xerrors.Errorf("Failed to write deployment state after updating status: %w", err)), h.log) } + return result, nil +} + +// Reconcile performs the full reconciliation logic for a replica set. +// This is the main entry point for all reconciliation work and contains all +// state and logic specific to a single reconcile execution. +func (h *ReplicaSetReconcilerHelper) Reconcile(ctx context.Context) (reconcile.Result, error) { + rs := h.resource + log := h.log + r := h.reconciler + if !architectures.IsRunningStaticArchitecture(rs.Annotations) { agents.UpgradeAllIfNeeded(ctx, agents.ClientSecret{Client: r.client, SecretClient: r.SecretClient}, r.omConnectionFactory, GetWatchedNamespace(), false) } @@ -136,37 +195,38 @@ func (r *ReconcileMongoDbReplicaSet) Reconcile(ctx context.Context, request reco log.Infow("ReplicaSet.Spec", "spec", rs.Spec, "desiredReplicas", scale.ReplicasThisReconciliation(rs), "isScaling", scale.IsStillScaling(rs)) log.Infow("ReplicaSet.Status", "status", rs.Status) + // TODO: adapt validations to multi cluster if err := rs.ProcessValidationsOnReconcile(nil); err != nil { - return r.updateStatus(ctx, rs, workflow.Invalid("%s", err.Error()), log) + return h.updateStatus(ctx, workflow.Invalid("%s", err.Error())) } projectConfig, credsConfig, err := project.ReadConfigAndCredentials(ctx, r.client, r.SecretClient, rs, log) if err != nil { - return r.updateStatus(ctx, rs, workflow.Failed(err), log) + return h.updateStatus(ctx, workflow.Failed(err)) } conn, _, err := connection.PrepareOpsManagerConnection(ctx, r.SecretClient, projectConfig, credsConfig, r.omConnectionFactory, rs.Namespace, log) if err != nil { - return r.updateStatus(ctx, rs, workflow.Failed(xerrors.Errorf("Failed to prepare Ops Manager connection: %w", err)), log) + return h.updateStatus(ctx, workflow.Failed(xerrors.Errorf("Failed to prepare Ops Manager connection: %w", err))) } if status := ensureSupportedOpsManagerVersion(conn); status.Phase() != mdbstatus.PhaseRunning { - return r.updateStatus(ctx, rs, status, log) + return h.updateStatus(ctx, status) } r.SetupCommonWatchers(rs, nil, nil, rs.Name) reconcileResult := checkIfHasExcessProcesses(conn, rs.Name, log) if !reconcileResult.IsOK() { - return r.updateStatus(ctx, rs, reconcileResult, log) + return h.updateStatus(ctx, reconcileResult) } if status := validateMongoDBResource(rs, conn); !status.IsOK() { - return r.updateStatus(ctx, rs, status, log) + return h.updateStatus(ctx, status) } if status := controlledfeature.EnsureFeatureControls(*rs, conn, conn.OpsManagerVersion(), log); !status.IsOK() { - return r.updateStatus(ctx, rs, status, log) + return h.updateStatus(ctx, status) } // === 2. Auth and Certificates @@ -194,18 +254,18 @@ func (r *ReconcileMongoDbReplicaSet) Reconcile(ctx context.Context, request reco prometheusCertHash, err := certs.EnsureTLSCertsForPrometheus(ctx, r.SecretClient, rs.GetNamespace(), rs.GetPrometheus(), certs.Database, log) if err != nil { - return r.updateStatus(ctx, rs, workflow.Failed(xerrors.Errorf("could not generate certificates for Prometheus: %w", err)), log) + return h.updateStatus(ctx, workflow.Failed(xerrors.Errorf("Could not generate certificates for Prometheus: %w", err))) } currentAgentAuthMode, err := conn.GetAgentAuthMode() if err != nil { - return r.updateStatus(ctx, rs, workflow.Failed(xerrors.Errorf("failed to get agent auth mode: %w", err)), log) + return h.updateStatus(ctx, workflow.Failed(xerrors.Errorf("failed to get agent auth mode: %w", err))) } // Check if we need to prepare for scale-down if scale.ReplicasThisReconciliation(rs) < rs.Status.Members { if err := replicaset.PrepareScaleDownFromMongoDB(conn, rs, log); err != nil { - return r.updateStatus(ctx, rs, workflow.Failed(xerrors.Errorf("Failed to prepare Replica Set for scaling down using Ops Manager: %w", err)), log) + return h.updateStatus(ctx, workflow.Failed(xerrors.Errorf("Failed to prepare Replica Set for scaling down using Ops Manager: %w", err))) } } deploymentOpts := deploymentOptionsRS{ @@ -218,7 +278,7 @@ func (r *ReconcileMongoDbReplicaSet) Reconcile(ctx context.Context, request reco // 3. Search Overrides // Apply search overrides early so searchCoordinator role is present before ensureRoles runs // This must happen before the ordering logic to ensure roles are synced regardless of order - shouldMirrorKeyfileForMongot := r.applySearchOverrides(ctx, rs, log) + shouldMirrorKeyfile := h.applySearchOverrides(ctx) // 4. Recovery @@ -227,8 +287,8 @@ func (r *ReconcileMongoDbReplicaSet) Reconcile(ctx context.Context, request reco // See CLOUDP-189433 and CLOUDP-229222 for more details. if recovery.ShouldTriggerRecovery(rs.Status.Phase != mdbstatus.PhaseRunning, rs.Status.LastTransition) { log.Warnf("Triggering Automatic Recovery. The MongoDB resource %s/%s is in %s state since %s", rs.Namespace, rs.Name, rs.Status.Phase, rs.Status.LastTransition) - automationConfigStatus := r.updateOmDeploymentRs(ctx, conn, rs.Status.Members, rs, log, tlsCertPath, internalClusterCertPath, deploymentOpts, shouldMirrorKeyfileForMongot, true).OnErrorPrepend("Failed to create/update (Ops Manager reconciliation phase):") - reconcileStatus := r.reconcileMemberResources(ctx, rs, conn, log, projectConfig, deploymentOpts) + automationConfigStatus := h.updateOmDeploymentRs(ctx, conn, rs.Status.Members, tlsCertPath, internalClusterCertPath, deploymentOpts, shouldMirrorKeyfile, true).OnErrorPrepend("Failed to create/update (Ops Manager reconciliation phase):") + reconcileStatus := h.reconcileMemberResources(ctx, conn, projectConfig, deploymentOpts) if !reconcileStatus.IsOK() { log.Errorf("Recovery failed because of reconcile errors, %v", reconcileStatus) } @@ -238,54 +298,87 @@ func (r *ReconcileMongoDbReplicaSet) Reconcile(ctx context.Context, request reco } // 5. Actual reconciliation execution, Ops Manager and kubernetes resources update - lastSpec, err := rs.GetLastSpec() - if err != nil { - lastSpec = &mdbv1.MongoDbSpec{} - } - publishAutomationConfigFirst := publishAutomationConfigFirstRS(ctx, r.client, *rs, lastSpec, deploymentOpts.currentAgentAuthMode, projectConfig.SSLMMSCAConfigMap, log) + publishAutomationConfigFirst := publishAutomationConfigFirstRS(ctx, r.client, *rs, h.deploymentState.LastAchievedSpec, deploymentOpts.currentAgentAuthMode, projectConfig.SSLMMSCAConfigMap, log) status := workflow.RunInGivenOrder(publishAutomationConfigFirst, func() workflow.Status { - return r.updateOmDeploymentRs(ctx, conn, rs.Status.Members, rs, log, tlsCertPath, internalClusterCertPath, deploymentOpts, shouldMirrorKeyfileForMongot, false).OnErrorPrepend("Failed to create/update (Ops Manager reconciliation phase):") + return h.updateOmDeploymentRs(ctx, conn, rs.Status.Members, tlsCertPath, internalClusterCertPath, deploymentOpts, shouldMirrorKeyfile, false).OnErrorPrepend("Failed to create/update (Ops Manager reconciliation phase):") }, func() workflow.Status { - return r.reconcileMemberResources(ctx, rs, conn, log, projectConfig, deploymentOpts) + return h.reconcileMemberResources(ctx, conn, projectConfig, deploymentOpts) }) if !status.IsOK() { - return r.updateStatus(ctx, rs, status, log) + return h.updateStatus(ctx, status) } // === 6. Final steps if scale.IsStillScaling(rs) { - return r.updateStatus(ctx, rs, workflow.Pending("Continuing scaling operation for ReplicaSet %s, desiredMembers=%d, currentMembers=%d", rs.ObjectKey(), rs.DesiredReplicas(), scale.ReplicasThisReconciliation(rs)), log, mdbstatus.MembersOption(rs)) + return h.updateStatus(ctx, workflow.Pending("Continuing scaling operation for ReplicaSet %s, desiredMembers=%d, currentMembers=%d", rs.ObjectKey(), rs.DesiredReplicas(), scale.ReplicasThisReconciliation(rs)), mdbstatus.MembersOption(rs)) } - annotationsToAdd, err := getAnnotationsForResource(rs) - if err != nil { - return r.updateStatus(ctx, rs, workflow.Failed(err), log) + log.Infof("Finished reconciliation for MongoDbReplicaSet! %s", completionMessage(conn.BaseURL(), conn.GroupID())) + return h.updateStatus(ctx, workflow.OK(), mdbstatus.NewBaseUrlOption(deployment.Link(conn.BaseURL(), conn.GroupID())), mdbstatus.MembersOption(rs), mdbstatus.NewPVCsStatusOptionEmptyStatus()) +} + +func newReplicaSetReconciler(ctx context.Context, kubeClient client.Client, imageUrls images.ImageUrls, initDatabaseNonStaticImageVersion, databaseNonStaticImageVersion string, forceEnterprise bool, enableClusterMongoDBRoles bool, omFunc om.ConnectionFactory) *ReconcileMongoDbReplicaSet { + return &ReconcileMongoDbReplicaSet{ + ReconcileCommonController: NewReconcileCommonController(ctx, kubeClient), + omConnectionFactory: omFunc, + imageUrls: imageUrls, + forceEnterprise: forceEnterprise, + enableClusterMongoDBRoles: enableClusterMongoDBRoles, + + initDatabaseNonStaticImageVersion: initDatabaseNonStaticImageVersion, + databaseNonStaticImageVersion: databaseNonStaticImageVersion, } +} - if vault.IsVaultSecretBackend() { - secrets := rs.GetSecretsMountedIntoDBPod() - vaultMap := make(map[string]string) - for _, s := range secrets { - path := fmt.Sprintf("%s/%s/%s", r.VaultClient.DatabaseSecretMetadataPath(), rs.Namespace, s) - vaultMap = merge.StringToStringMap(vaultMap, r.VaultClient.GetSecretAnnotation(path)) - } - path := fmt.Sprintf("%s/%s/%s", r.VaultClient.OperatorScretMetadataPath(), rs.Namespace, rs.Spec.Credentials) - vaultMap = merge.StringToStringMap(vaultMap, r.VaultClient.GetSecretAnnotation(path)) - for k, val := range vaultMap { - annotationsToAdd[k] = val +type deploymentOptionsRS struct { + agentCertPath string + agentCertHash string + prometheusCertHash string + currentAgentAuthMode string +} + +// Generic Kubernetes Resources +// +kubebuilder:rbac:groups=core,resources=namespaces,verbs=list;watch,namespace=placeholder +// +kubebuilder:rbac:groups=core,resources=pods,verbs=get;list;watch,namespace=placeholder +// +kubebuilder:rbac:groups=core,resources=services,verbs=get;list;watch;create;update,namespace=placeholder +// +kubebuilder:rbac:groups=core,resources={secrets,configmaps},verbs=get;list;watch;create;delete;update,namespace=placeholder +// +kubebuilder:rbac:groups=apps,resources=statefulsets,verbs=create;get;list;watch;delete;update,namespace=placeholder + +// MongoDB Resource +// +kubebuilder:rbac:groups=mongodb.com,resources={mongodb,mongodb/status,mongodb/finalizers},verbs=*,namespace=placeholder + +// Setting up a webhook +// +kubebuilder:rbac:groups=admissionregistration.k8s.io,resources=validatingwebhookconfigurations,verbs=get;create;update;delete + +// Certificate generation +// +kubebuilder:rbac:groups=certificates.k8s.io,resources=certificatesigningrequests,verbs=get;create;list;watch + +// Reconcile reads that state of the cluster for a MongoDbReplicaSet object and makes changes based on the state read +// and what is in the MongoDbReplicaSet.Spec +func (r *ReconcileMongoDbReplicaSet) Reconcile(ctx context.Context, request reconcile.Request) (res reconcile.Result, e error) { + // === 1. Initial Checks and setup + log := zap.S().With("ReplicaSet", request.NamespacedName) + rs := &mdbv1.MongoDB{} + + if reconcileResult, err := r.prepareResourceForReconciliation(ctx, request, rs, log); err != nil { + if errors.IsNotFound(err) { + return workflow.Invalid("Object for reconciliation not found").ReconcileResult() } + return reconcileResult, err } - if err := annotations.SetAnnotations(ctx, rs, annotationsToAdd, r.client); err != nil { + // Create helper for THIS reconciliation + helper, err := r.newReconcilerHelper(ctx, rs, log) + if err != nil { return r.updateStatus(ctx, rs, workflow.Failed(err), log) } - log.Infof("Finished reconciliation for MongoDbReplicaSet! %s", completionMessage(conn.BaseURL(), conn.GroupID())) - return r.updateStatus(ctx, rs, workflow.OK(), log, mdbstatus.NewBaseUrlOption(deployment.Link(conn.BaseURL(), conn.GroupID())), mdbstatus.MembersOption(rs), mdbstatus.NewPVCsStatusOptionEmptyStatus()) + // Delegate all reconciliation logic to helper + return helper.Reconcile(ctx) } func publishAutomationConfigFirstRS(ctx context.Context, getter kubernetesClient.Client, mdb mdbv1.MongoDB, lastSpec *mdbv1.MongoDbSpec, currentAgentAuthMode string, sslMMSCAConfigMap string, log *zap.SugaredLogger) bool { @@ -339,7 +432,7 @@ func publishAutomationConfigFirstRS(ctx context.Context, getter kubernetesClient return false } -func getHostnameOverrideConfigMapForReplicaset(mdb mdbv1.MongoDB) corev1.ConfigMap { +func getHostnameOverrideConfigMapForReplicaset(mdb *mdbv1.MongoDB) corev1.ConfigMap { data := make(map[string]string) if mdb.Spec.DbCommonSpec.GetExternalDomain() != nil { @@ -359,12 +452,12 @@ func getHostnameOverrideConfigMapForReplicaset(mdb mdbv1.MongoDB) corev1.ConfigM return cm } -func (r *ReconcileMongoDbReplicaSet) reconcileHostnameOverrideConfigMap(ctx context.Context, log *zap.SugaredLogger, getUpdateCreator configmap.GetUpdateCreator, mdb mdbv1.MongoDB) error { - if mdb.Spec.DbCommonSpec.GetExternalDomain() == nil { +func (h *ReplicaSetReconcilerHelper) reconcileHostnameOverrideConfigMap(ctx context.Context, log *zap.SugaredLogger, getUpdateCreator configmap.GetUpdateCreator) error { + if h.resource.Spec.DbCommonSpec.GetExternalDomain() == nil { return nil } - cm := getHostnameOverrideConfigMapForReplicaset(mdb) + cm := getHostnameOverrideConfigMapForReplicaset(h.resource) err := configmap.CreateOrUpdate(ctx, getUpdateCreator, cm) if err != nil && !errors.IsAlreadyExists(err) { return xerrors.Errorf("failed to create configmap: %s, err: %w", cm.Name, err) @@ -377,11 +470,13 @@ func (r *ReconcileMongoDbReplicaSet) reconcileHostnameOverrideConfigMap(ctx cont // reconcileMemberResources handles the synchronization of kubernetes resources, which can be statefulsets, services etc. // All the resources required in the k8s cluster (as opposed to the automation config) for creating the replicaset // should be reconciled in this method. -func (r *ReconcileMongoDbReplicaSet) reconcileMemberResources(ctx context.Context, rs *mdbv1.MongoDB, conn om.Connection, - log *zap.SugaredLogger, projectConfig mdbv1.ProjectConfig, deploymentOptions deploymentOptionsRS, -) workflow.Status { +func (h *ReplicaSetReconcilerHelper) reconcileMemberResources(ctx context.Context, conn om.Connection, projectConfig mdbv1.ProjectConfig, deploymentOptions deploymentOptionsRS) workflow.Status { + rs := h.resource + r := h.reconciler + log := h.log + // Reconcile hostname override ConfigMap - if err := r.reconcileHostnameOverrideConfigMap(ctx, log, r.client, *rs); err != nil { + if err := h.reconcileHostnameOverrideConfigMap(ctx, log, h.reconciler.client); err != nil { return workflow.Failed(xerrors.Errorf("Failed to reconcileHostnameOverrideConfigMap: %w", err)) } @@ -390,12 +485,14 @@ func (r *ReconcileMongoDbReplicaSet) reconcileMemberResources(ctx context.Contex return status } - return r.reconcileStatefulSet(ctx, rs, log, conn, projectConfig, deploymentOptions) + return h.reconcileStatefulSet(ctx, conn, projectConfig, deploymentOptions) } -func (r *ReconcileMongoDbReplicaSet) reconcileStatefulSet(ctx context.Context, rs *mdbv1.MongoDB, - log *zap.SugaredLogger, conn om.Connection, projectConfig mdbv1.ProjectConfig, deploymentOptions deploymentOptionsRS, -) workflow.Status { +func (h *ReplicaSetReconcilerHelper) reconcileStatefulSet(ctx context.Context, conn om.Connection, projectConfig mdbv1.ProjectConfig, deploymentOptions deploymentOptionsRS) workflow.Status { + rs := h.resource + r := h.reconciler + log := h.log + certConfigurator := certs.ReplicaSetX509CertConfigurator{MongoDB: rs, SecretClient: r.SecretClient} status := r.ensureX509SecretAndCheckTLSType(ctx, certConfigurator, deploymentOptions.currentAgentAuthMode, log) if !status.IsOK() { @@ -408,7 +505,7 @@ func (r *ReconcileMongoDbReplicaSet) reconcileStatefulSet(ctx context.Context, r } // Build the replica set config - rsConfig, err := r.buildStatefulSetOptions(ctx, rs, conn, projectConfig, deploymentOptions.currentAgentAuthMode, deploymentOptions.prometheusCertHash, deploymentOptions.agentCertHash, log) + rsConfig, err := h.buildStatefulSetOptions(ctx, conn, projectConfig, deploymentOptions) if err != nil { return workflow.Failed(xerrors.Errorf("failed to build StatefulSet options: %w", err)) } @@ -420,6 +517,8 @@ func (r *ReconcileMongoDbReplicaSet) reconcileStatefulSet(ctx context.Context, r if !workflowStatus.IsOK() { return workflowStatus } + + // TODO: check if updatestatus usage is correct here if workflow.ContainsPVCOption(workflowStatus.StatusOptions()) { _, _ = r.updateStatus(ctx, rs, workflow.Pending(""), log, workflowStatus.StatusOptions()...) } @@ -439,7 +538,11 @@ func (r *ReconcileMongoDbReplicaSet) reconcileStatefulSet(ctx context.Context, r } // buildStatefulSetOptions creates the options needed for constructing the StatefulSet -func (r *ReconcileMongoDbReplicaSet) buildStatefulSetOptions(ctx context.Context, rs *mdbv1.MongoDB, conn om.Connection, projectConfig mdbv1.ProjectConfig, currentAgentAuthMode string, prometheusCertHash string, agentCertHash string, log *zap.SugaredLogger) (func(mdb mdbv1.MongoDB) construct.DatabaseStatefulSetOptions, error) { +func (h *ReplicaSetReconcilerHelper) buildStatefulSetOptions(ctx context.Context, conn om.Connection, projectConfig mdbv1.ProjectConfig, deploymentOptions deploymentOptionsRS) (func(mdb mdbv1.MongoDB) construct.DatabaseStatefulSetOptions, error) { + rs := h.resource + r := h.reconciler + log := h.log + rsCertsConfig := certs.ReplicaSetConfig(*rs) var vaultConfig vault.VaultConfiguration @@ -468,11 +571,11 @@ func (r *ReconcileMongoDbReplicaSet) buildStatefulSetOptions(ctx context.Context rsConfig := construct.ReplicaSetOptions( PodEnvVars(newPodVars(conn, projectConfig, rs.Spec.LogLevel)), - CurrentAgentAuthMechanism(currentAgentAuthMode), + CurrentAgentAuthMechanism(deploymentOptions.currentAgentAuthMode), CertificateHash(tlsCertHash), - AgentCertHash(agentCertHash), + AgentCertHash(deploymentOptions.agentCertHash), InternalClusterHash(internalClusterCertHash), - PrometheusTLSCertHash(prometheusCertHash), + PrometheusTLSCertHash(deploymentOptions.prometheusCertHash), WithVaultConfig(vaultConfig), WithLabels(rs.Labels), WithAdditionalMongodConfig(rs.Spec.GetAdditionalMongodConfig()), @@ -566,7 +669,10 @@ func AddReplicaSetController(ctx context.Context, mgr manager.Manager, imageUrls // updateOmDeploymentRs performs OM registration operation for the replicaset. So the changes will be finally propagated // to automation agents in containers -func (r *ReconcileMongoDbReplicaSet) updateOmDeploymentRs(ctx context.Context, conn om.Connection, membersNumberBefore int, rs *mdbv1.MongoDB, log *zap.SugaredLogger, tlsCertPath, internalClusterCertPath string, deploymentOptionsRS deploymentOptionsRS, shouldMirrorKeyfileForMongot bool, isRecovering bool) workflow.Status { +func (h *ReplicaSetReconcilerHelper) updateOmDeploymentRs(ctx context.Context, conn om.Connection, membersNumberBefore int, tlsCertPath, internalClusterCertPath string, deploymentOpts deploymentOptionsRS, shouldMirrorKeyfile bool, isRecovering bool) workflow.Status { + rs := h.resource + log := h.log + r := h.reconciler log.Debug("Entering UpdateOMDeployments") // Only "concrete" RS members should be observed // - if scaling down, let's observe only members that will remain after scale-down operation @@ -580,7 +686,7 @@ func (r *ReconcileMongoDbReplicaSet) updateOmDeploymentRs(ctx context.Context, c caFilePath := fmt.Sprintf("%s/ca-pem", util.TLSCaMountPath) // If current operation is to Disable TLS, then we should the current members of the Replica Set, // this is, do not scale them up or down util TLS disabling has completed. - shouldLockMembers, err := updateOmDeploymentDisableTLSConfiguration(conn, r.imageUrls[mcoConstruct.MongodbImageEnv], r.forceEnterprise, membersNumberBefore, rs, log, caFilePath, tlsCertPath) + shouldLockMembers, err := updateOmDeploymentDisableTLSConfiguration(conn, r.imageUrls[mcoConstruct.MongodbImageEnv], r.forceEnterprise, membersNumberBefore, rs, caFilePath, tlsCertPath, h.deploymentState.LastAchievedSpec, log) if err != nil && !isRecovering { return workflow.Failed(err) } @@ -606,12 +712,12 @@ func (r *ReconcileMongoDbReplicaSet) updateOmDeploymentRs(ctx context.Context, c replicaSet := replicaset.BuildFromMongoDBWithReplicas(r.imageUrls[mcoConstruct.MongodbImageEnv], r.forceEnterprise, rs, updatedMembers, rs.CalculateFeatureCompatibilityVersion(), tlsCertPath) processNames := replicaSet.GetProcessNames() - status, additionalReconciliationRequired := r.updateOmAuthentication(ctx, conn, processNames, rs, deploymentOptionsRS.agentCertPath, caFilePath, internalClusterCertPath, isRecovering, log) + status, additionalReconciliationRequired := r.updateOmAuthentication(ctx, conn, processNames, rs, deploymentOpts.agentCertPath, caFilePath, internalClusterCertPath, isRecovering, log) if !status.IsOK() && !isRecovering { return status } - lastRsConfig, err := rs.GetLastAdditionalMongodConfigByType(mdbv1.ReplicaSetConfig) + lastRsConfig, err := mdbv1.GetLastAdditionalMongodConfigByType(h.deploymentState.LastAchievedSpec, mdbv1.ReplicaSetConfig) if err != nil && !isRecovering { return workflow.Failed(err) } @@ -621,13 +727,13 @@ func (r *ReconcileMongoDbReplicaSet) updateOmDeploymentRs(ctx context.Context, c conn: conn, secretsClient: r.SecretClient, namespace: rs.GetNamespace(), - prometheusCertHash: deploymentOptionsRS.prometheusCertHash, + prometheusCertHash: deploymentOpts.prometheusCertHash, } err = conn.ReadUpdateDeployment( func(d om.Deployment) error { - if shouldMirrorKeyfileForMongot { - if err := r.mirrorKeyfileIntoSecretForMongot(ctx, d, rs, log); err != nil { + if shouldMirrorKeyfile { + if err := h.mirrorKeyfileIntoSecretForMongot(ctx, d); err != nil { return err } } @@ -660,6 +766,7 @@ func (r *ReconcileMongoDbReplicaSet) updateOmDeploymentRs(ctx context.Context, c return workflow.Failed(err) } + //TODO: check if updateStatus usage is correct hee if status := r.ensureBackupConfigurationAndUpdateStatus(ctx, conn, rs, r.SecretClient, log); !status.IsOK() && !isRecovering { return status } @@ -671,7 +778,7 @@ func (r *ReconcileMongoDbReplicaSet) updateOmDeploymentRs(ctx context.Context, c // updateOmDeploymentDisableTLSConfiguration checks if TLS configuration needs // to be disabled. In which case it will disable it and inform to the calling // function. -func updateOmDeploymentDisableTLSConfiguration(conn om.Connection, mongoDBImage string, forceEnterprise bool, membersNumberBefore int, rs *mdbv1.MongoDB, log *zap.SugaredLogger, caFilePath, tlsCertPath string) (bool, error) { +func updateOmDeploymentDisableTLSConfiguration(conn om.Connection, mongoDBImage string, forceEnterprise bool, membersNumberBefore int, rs *mdbv1.MongoDB, caFilePath, tlsCertPath string, lastSpec *mdbv1.MongoDbSpec, log *zap.SugaredLogger) (bool, error) { tlsConfigWasDisabled := false err := conn.ReadUpdateDeployment( @@ -687,7 +794,7 @@ func updateOmDeploymentDisableTLSConfiguration(conn om.Connection, mongoDBImage // there's a scale up change at the same time). replicaSet := replicaset.BuildFromMongoDBWithReplicas(mongoDBImage, forceEnterprise, rs, membersNumberBefore, rs.CalculateFeatureCompatibilityVersion(), tlsCertPath) - lastConfig, err := rs.GetLastAdditionalMongodConfigByType(mdbv1.ReplicaSetConfig) + lastConfig, err := mdbv1.GetLastAdditionalMongodConfigByType(lastSpec, mdbv1.ReplicaSetConfig) if err != nil { return err } @@ -702,6 +809,7 @@ func updateOmDeploymentDisableTLSConfiguration(conn om.Connection, mongoDBImage return tlsConfigWasDisabled, err } +// TODO: split into subfunctions, follow helper pattern func (r *ReconcileMongoDbReplicaSet) OnDelete(ctx context.Context, obj runtime.Object, log *zap.SugaredLogger) error { rs := obj.(*mdbv1.MongoDB) @@ -771,8 +879,11 @@ func getAllHostsForReplicas(rs *mdbv1.MongoDB, membersCount int) []string { return hostnames } -func (r *ReconcileMongoDbReplicaSet) applySearchOverrides(ctx context.Context, rs *mdbv1.MongoDB, log *zap.SugaredLogger) bool { - search := r.lookupCorrespondingSearchResource(ctx, rs, log) +func (h *ReplicaSetReconcilerHelper) applySearchOverrides(ctx context.Context) bool { + rs := h.resource + log := h.log + + search := h.lookupCorrespondingSearchResource(ctx) if search == nil { log.Debugf("No MongoDBSearch resource found, skipping search overrides") return false @@ -798,7 +909,11 @@ func (r *ReconcileMongoDbReplicaSet) applySearchOverrides(ctx context.Context, r return true } -func (r *ReconcileMongoDbReplicaSet) mirrorKeyfileIntoSecretForMongot(ctx context.Context, d om.Deployment, rs *mdbv1.MongoDB, log *zap.SugaredLogger) error { +func (h *ReplicaSetReconcilerHelper) mirrorKeyfileIntoSecretForMongot(ctx context.Context, d om.Deployment) error { + rs := h.resource + r := h.reconciler + log := h.log + keyfileContents := maputil.ReadMapValueAsString(d, "auth", "key") keyfileSecret := &corev1.Secret{ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("%s-%s", rs.Name, searchcontroller.MongotKeyfileFilename), Namespace: rs.Namespace}} @@ -810,12 +925,15 @@ func (r *ReconcileMongoDbReplicaSet) mirrorKeyfileIntoSecretForMongot(ctx contex }) if err != nil { return xerrors.Errorf("Failed to mirror the replicaset's keyfile into a secret: %w", err) - } else { - return nil } + return nil } -func (r *ReconcileMongoDbReplicaSet) lookupCorrespondingSearchResource(ctx context.Context, rs *mdbv1.MongoDB, log *zap.SugaredLogger) *searchv1.MongoDBSearch { +func (h *ReplicaSetReconcilerHelper) lookupCorrespondingSearchResource(ctx context.Context) *searchv1.MongoDBSearch { + rs := h.resource + r := h.reconciler + log := h.log + var search *searchv1.MongoDBSearch searchList := &searchv1.MongoDBSearchList{} if err := r.client.List(ctx, searchList, &client.ListOptions{ diff --git a/controllers/operator/mongodbreplicaset_controller_test.go b/controllers/operator/mongodbreplicaset_controller_test.go index 787883ffb..dfda4efc9 100644 --- a/controllers/operator/mongodbreplicaset_controller_test.go +++ b/controllers/operator/mongodbreplicaset_controller_test.go @@ -398,22 +398,22 @@ func TestUpdateDeploymentTLSConfiguration(t *testing.T) { deploymentNoTLS := deployment.CreateFromReplicaSet("fake-mongoDBImage", false, rsNoTLS) // TLS Disabled -> TLS Disabled - shouldLockMembers, err := updateOmDeploymentDisableTLSConfiguration(om.NewMockedOmConnection(deploymentNoTLS), "fake-mongoDBImage", false, 3, rsNoTLS, zap.S(), util.CAFilePathInContainer, "") + shouldLockMembers, err := updateOmDeploymentDisableTLSConfiguration(om.NewMockedOmConnection(deploymentNoTLS), "fake-mongoDBImage", false, 3, rsNoTLS, util.CAFilePathInContainer, "", nil, zap.S()) assert.NoError(t, err) assert.False(t, shouldLockMembers) // TLS Disabled -> TLS Enabled - shouldLockMembers, err = updateOmDeploymentDisableTLSConfiguration(om.NewMockedOmConnection(deploymentNoTLS), "fake-mongoDBImage", false, 3, rsWithTLS, zap.S(), util.CAFilePathInContainer, "") + shouldLockMembers, err = updateOmDeploymentDisableTLSConfiguration(om.NewMockedOmConnection(deploymentNoTLS), "fake-mongoDBImage", false, 3, rsWithTLS, util.CAFilePathInContainer, "", nil, zap.S()) assert.NoError(t, err) assert.False(t, shouldLockMembers) // TLS Enabled -> TLS Enabled - shouldLockMembers, err = updateOmDeploymentDisableTLSConfiguration(om.NewMockedOmConnection(deploymentWithTLS), "fake-mongoDBImage", false, 3, rsWithTLS, zap.S(), util.CAFilePathInContainer, "") + shouldLockMembers, err = updateOmDeploymentDisableTLSConfiguration(om.NewMockedOmConnection(deploymentWithTLS), "fake-mongoDBImage", false, 3, rsWithTLS, util.CAFilePathInContainer, "", nil, zap.S()) assert.NoError(t, err) assert.False(t, shouldLockMembers) // TLS Enabled -> TLS Disabled - shouldLockMembers, err = updateOmDeploymentDisableTLSConfiguration(om.NewMockedOmConnection(deploymentWithTLS), "fake-mongoDBImage", false, 3, rsNoTLS, zap.S(), util.CAFilePathInContainer, "") + shouldLockMembers, err = updateOmDeploymentDisableTLSConfiguration(om.NewMockedOmConnection(deploymentWithTLS), "fake-mongoDBImage", false, 3, rsNoTLS, util.CAFilePathInContainer, "", nil, zap.S()) assert.NoError(t, err) assert.True(t, shouldLockMembers) }