From 485817637b6de1ca6aa982035e3ab37b95d4b77a Mon Sep 17 00:00:00 2001 From: Rob Holland Date: Thu, 17 Jul 2025 12:59:12 +0100 Subject: [PATCH 1/4] WIP. --- api/v1alpha1/worker_types.go | 81 ++++++++ api/v1alpha1/zz_generated.deepcopy.go | 109 +++++++++- docs/version-patches.md | 149 ++++++++++++++ ...al.io_temporalworkerdeploymentpatches.yaml | 90 ++++++++ internal/controller/genplan.go | 17 +- internal/controller/patch_integration.go | 185 +++++++++++++++++ internal/controller/worker_controller.go | 35 +++- internal/planner/planner.go | 45 +++- internal/planner/planner_test.go | 193 ++++++++++++++++++ 9 files changed, 891 insertions(+), 13 deletions(-) create mode 100644 docs/version-patches.md create mode 100644 helm/temporal-worker-controller/templates/crds/temporal.io_temporalworkerdeploymentpatches.yaml create mode 100644 internal/controller/patch_integration.go diff --git a/api/v1alpha1/worker_types.go b/api/v1alpha1/worker_types.go index d8cd4111..2272a6b0 100644 --- a/api/v1alpha1/worker_types.go +++ b/api/v1alpha1/worker_types.go @@ -337,6 +337,87 @@ type TemporalWorkerDeploymentList struct { Items []TemporalWorkerDeployment `json:"items"` } +// TemporalWorkerDeploymentPatchSpec defines version-specific overrides for a TemporalWorkerDeployment +type TemporalWorkerDeploymentPatchSpec struct { + // TemporalWorkerDeploymentName is the name of the TemporalWorkerDeployment this patch applies to. + // The patch must be in the same namespace as the target deployment. + TemporalWorkerDeploymentName string `json:"temporalWorkerDeploymentName"` + + // VersionID specifies which version this patch applies to + VersionID string `json:"versionID"` + + // Replicas overrides the number of desired pods for this specific version. + // If not specified, the version will use the replicas from the main TemporalWorkerDeployment spec. + // +optional + Replicas *int32 `json:"replicas,omitempty"` + + // SunsetStrategy overrides how to manage sunsetting this specific version. + // If not specified, the version will use the sunset strategy from the main TemporalWorkerDeployment spec. + // +optional + SunsetStrategy *SunsetStrategy `json:"sunsetStrategy,omitempty"` +} + +// PatchStatus indicates the current state of the patch +// +enum +type PatchStatus string + +const ( + // PatchStatusActive indicates the patch is currently applied to an existing version + PatchStatusActive PatchStatus = "Active" + + // PatchStatusOrphaned indicates the referenced version no longer exists + PatchStatusOrphaned PatchStatus = "Orphaned" + + // PatchStatusInvalid indicates the patch references a TemporalWorkerDeployment that doesn't exist + PatchStatusInvalid PatchStatus = "Invalid" +) + +// TemporalWorkerDeploymentPatchStatus defines the observed state of TemporalWorkerDeploymentPatch +type TemporalWorkerDeploymentPatchStatus struct { + // Status indicates the current state of this patch + Status PatchStatus `json:"status"` + + // AppliedAt indicates when this patch was last successfully applied + // +optional + AppliedAt *metav1.Time `json:"appliedAt,omitempty"` + + // Message provides additional information about the patch status + // +optional + Message string `json:"message,omitempty"` + + // ObservedGeneration reflects the generation of the most recently observed patch spec + // +optional + ObservedGeneration int64 `json:"observedGeneration,omitempty"` +} + +//+kubebuilder:object:root=true +//+kubebuilder:subresource:status +// +kubebuilder:resource:shortName=twdpatch;twd-patch;temporalworkerpatch +//+kubebuilder:printcolumn:name="Target",type="string",JSONPath=".spec.temporalWorkerDeploymentName",description="Target TemporalWorkerDeployment" +//+kubebuilder:printcolumn:name="Version",type="string",JSONPath=".spec.versionID",description="Target Version ID" +//+kubebuilder:printcolumn:name="Replicas",type="integer",JSONPath=".spec.replicas",description="Override Replicas" +//+kubebuilder:printcolumn:name="Status",type="string",JSONPath=".status.status",description="Patch Status" +//+kubebuilder:printcolumn:name="Age",type="date",JSONPath=".metadata.creationTimestamp" + +// TemporalWorkerDeploymentPatch is the Schema for the temporalworkerdeploymentpatches API +type TemporalWorkerDeploymentPatch struct { + metav1.TypeMeta `json:",inline"` + metav1.ObjectMeta `json:"metadata,omitempty"` + + Spec TemporalWorkerDeploymentPatchSpec `json:"spec,omitempty"` + Status TemporalWorkerDeploymentPatchStatus `json:"status,omitempty"` +} + +//+kubebuilder:object:root=true + +// TemporalWorkerDeploymentPatchList contains a list of TemporalWorkerDeploymentPatch +type TemporalWorkerDeploymentPatchList struct { + metav1.TypeMeta `json:",inline"` + metav1.ListMeta `json:"metadata,omitempty"` + Items []TemporalWorkerDeploymentPatch `json:"items"` +} + func init() { SchemeBuilder.Register(&TemporalWorkerDeployment{}, &TemporalWorkerDeploymentList{}) + SchemeBuilder.Register(&TemporalWorkerDeploymentPatch{}, &TemporalWorkerDeploymentPatchList{}) } diff --git a/api/v1alpha1/zz_generated.deepcopy.go b/api/v1alpha1/zz_generated.deepcopy.go index 90646487..90e2991b 100644 --- a/api/v1alpha1/zz_generated.deepcopy.go +++ b/api/v1alpha1/zz_generated.deepcopy.go @@ -7,7 +7,7 @@ package v1alpha1 import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/apis/meta/v1" - runtime "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime" ) // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. @@ -220,6 +220,10 @@ func (in *TargetWorkerDeploymentVersion) DeepCopyInto(out *TargetWorkerDeploymen in, out := &in.RampingSince, &out.RampingSince *out = (*in).DeepCopy() } + if in.RampLastModifiedAt != nil { + in, out := &in.RampLastModifiedAt, &out.RampLastModifiedAt + *out = (*in).DeepCopy() + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TargetWorkerDeploymentVersion. @@ -395,6 +399,109 @@ func (in *TemporalWorkerDeploymentList) DeepCopyObject() runtime.Object { return nil } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *TemporalWorkerDeploymentPatch) DeepCopyInto(out *TemporalWorkerDeploymentPatch) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) + in.Spec.DeepCopyInto(&out.Spec) + in.Status.DeepCopyInto(&out.Status) +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TemporalWorkerDeploymentPatch. +func (in *TemporalWorkerDeploymentPatch) DeepCopy() *TemporalWorkerDeploymentPatch { + if in == nil { + return nil + } + out := new(TemporalWorkerDeploymentPatch) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *TemporalWorkerDeploymentPatch) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *TemporalWorkerDeploymentPatchList) DeepCopyInto(out *TemporalWorkerDeploymentPatchList) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ListMeta.DeepCopyInto(&out.ListMeta) + if in.Items != nil { + in, out := &in.Items, &out.Items + *out = make([]TemporalWorkerDeploymentPatch, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TemporalWorkerDeploymentPatchList. +func (in *TemporalWorkerDeploymentPatchList) DeepCopy() *TemporalWorkerDeploymentPatchList { + if in == nil { + return nil + } + out := new(TemporalWorkerDeploymentPatchList) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *TemporalWorkerDeploymentPatchList) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *TemporalWorkerDeploymentPatchSpec) DeepCopyInto(out *TemporalWorkerDeploymentPatchSpec) { + *out = *in + if in.Replicas != nil { + in, out := &in.Replicas, &out.Replicas + *out = new(int32) + **out = **in + } + if in.SunsetStrategy != nil { + in, out := &in.SunsetStrategy, &out.SunsetStrategy + *out = new(SunsetStrategy) + (*in).DeepCopyInto(*out) + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TemporalWorkerDeploymentPatchSpec. +func (in *TemporalWorkerDeploymentPatchSpec) DeepCopy() *TemporalWorkerDeploymentPatchSpec { + if in == nil { + return nil + } + out := new(TemporalWorkerDeploymentPatchSpec) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *TemporalWorkerDeploymentPatchStatus) DeepCopyInto(out *TemporalWorkerDeploymentPatchStatus) { + *out = *in + if in.AppliedAt != nil { + in, out := &in.AppliedAt, &out.AppliedAt + *out = (*in).DeepCopy() + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TemporalWorkerDeploymentPatchStatus. +func (in *TemporalWorkerDeploymentPatchStatus) DeepCopy() *TemporalWorkerDeploymentPatchStatus { + if in == nil { + return nil + } + out := new(TemporalWorkerDeploymentPatchStatus) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *TemporalWorkerDeploymentSpec) DeepCopyInto(out *TemporalWorkerDeploymentSpec) { *out = *in diff --git a/docs/version-patches.md b/docs/version-patches.md new file mode 100644 index 00000000..e88475dc --- /dev/null +++ b/docs/version-patches.md @@ -0,0 +1,149 @@ +# Version-Specific Configuration Patches + +The Temporal Worker Controller supports version-specific configuration overrides through the `TemporalWorkerDeploymentPatch` custom resource. This allows fine-grained control over individual worker deployment versions without modifying the main `TemporalWorkerDeployment` resource. + +## Overview + +Version patches enable you to: + +- **Scale specific versions independently**: Adjust replica counts for individual versions based on their workload or importance +- **Customize sunset strategies per version**: Apply different cleanup policies to versions based on their lifecycle needs +- **Maintain operational flexibility**: Make per-version adjustments without affecting the overall deployment strategy + +## Use Cases + +### 1. Scaling Down Deprecated Versions + +When a version is deprecated but still processing workflows, you might want to reduce its resource consumption: + +```yaml +apiVersion: temporal.io/v1alpha1 +kind: TemporalWorkerDeploymentPatch +metadata: + name: scale-down-deprecated-v1 +spec: + temporalWorkerDeploymentName: my-worker-deployment + versionID: "my-worker-deployment.v1.0.0" + replicas: 1 # Reduce from default 3 to 1 + sunsetStrategy: + scaledownDelay: 30m # Faster scaledown + deleteDelay: 2h # Quicker cleanup +``` + +### 2. Scaling Up Critical Versions + +For versions handling critical workflows that require higher availability: + +```yaml +apiVersion: temporal.io/v1alpha1 +kind: TemporalWorkerDeploymentPatch +metadata: + name: scale-up-critical-v2 +spec: + temporalWorkerDeploymentName: my-worker-deployment + versionID: "my-worker-deployment.v2.1.0" + replicas: 10 # Increase from default 3 to 10 + sunsetStrategy: + scaledownDelay: 24h # Keep running longer + deleteDelay: 7d # Preserve for a week +``` + + + +## Patch Status + +The controller automatically updates patch status to indicate whether patches are successfully applied: + +### Status Types + +- **Active**: The patch is applied to an existing version +- **Orphaned**: The referenced version no longer exists +- **Invalid**: The referenced TemporalWorkerDeployment doesn't exist + +### Status Fields + +```yaml +status: + status: Active + appliedAt: "2024-01-15T10:30:00Z" + message: "Patch successfully applied to active version" + observedGeneration: 1 +``` + +## Controller Behavior + +### Reconciliation + +1. **Patch Discovery**: The controller watches for changes to `TemporalWorkerDeploymentPatch` resources +2. **Status Updates**: Patch statuses are updated during each reconciliation loop +3. **Spec Application**: When creating or updating deployments, patches are applied to compute the effective configuration +4. **Event Propagation**: Changes to patches trigger reconciliation of the target `TemporalWorkerDeployment` + +### Conflict Resolution + +- If multiple patches target the same version and field, the last applied patch wins +- Patches are applied in alphabetical order by name for deterministic behavior +- Invalid patches are marked as such and do not affect deployment behavior + +## Best Practices + +### Naming Conventions + +Use descriptive names that indicate the purpose and target: + +```yaml +metadata: + name: scale-down-v1-deprecated + # or + name: extend-sunset-critical-v2 +``` + +### Lifecycle Management + +1. **Create patches proactively** for versions you know will need special handling +2. **Monitor patch statuses** to identify orphaned patches that can be cleaned up +3. **Use labels** to group related patches for easier management: + +```yaml +metadata: + labels: + temporal.io/version-class: deprecated + temporal.io/scaling-policy: conservative +``` + +### Resource Management + +- **Clean up orphaned patches** regularly to avoid resource accumulation +- **Co-locate patches** with their target deployments in the same namespace +- **Set appropriate RBAC** to control who can create/modify patches + +## Limitations + +1. **Namespace Scope**: Patches must be in the same namespace as their target `TemporalWorkerDeployment` +2. **Supported Fields**: Currently only `replicas` and `sunsetStrategy` can be overridden +3. **Version Scope**: Patches apply to specific version IDs, not version patterns +4. **Runtime Changes**: Patches are applied during deployment creation/update, not to existing deployments + +## Monitoring + +### Kubectl Commands + +```bash +# List all patches +kubectl get temporalworkerdeploymentpatches + +# Show patch details +kubectl describe twdpatch scale-down-old-version + +# Check patch status +kubectl get twdpatch -o custom-columns=NAME:.metadata.name,STATUS:.status.status,VERSION:.spec.versionID +``` + +### Metrics + +The controller provides metrics for: +- Number of active/orphaned/invalid patches +- Patch application success/failure rates +- Time since last patch status update + +This enables monitoring and alerting on patch health and effectiveness. \ No newline at end of file diff --git a/helm/temporal-worker-controller/templates/crds/temporal.io_temporalworkerdeploymentpatches.yaml b/helm/temporal-worker-controller/templates/crds/temporal.io_temporalworkerdeploymentpatches.yaml new file mode 100644 index 00000000..9799ed7c --- /dev/null +++ b/helm/temporal-worker-controller/templates/crds/temporal.io_temporalworkerdeploymentpatches.yaml @@ -0,0 +1,90 @@ +--- +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + annotations: + controller-gen.kubebuilder.io/version: v0.16.2 + name: temporalworkerdeploymentpatches.temporal.io +spec: + group: temporal.io + names: + kind: TemporalWorkerDeploymentPatch + listKind: TemporalWorkerDeploymentPatchList + plural: temporalworkerdeploymentpatches + shortNames: + - twdpatch + - twd-patch + - temporalworkerpatch + singular: temporalworkerdeploymentpatch + scope: Namespaced + versions: + - additionalPrinterColumns: + - description: Target TemporalWorkerDeployment + jsonPath: .spec.temporalWorkerDeploymentName + name: Target + type: string + - description: Target Version ID + jsonPath: .spec.versionID + name: Version + type: string + - description: Override Replicas + jsonPath: .spec.replicas + name: Replicas + type: integer + - description: Patch Status + jsonPath: .status.status + name: Status + type: string + - jsonPath: .metadata.creationTimestamp + name: Age + type: date + name: v1alpha1 + schema: + openAPIV3Schema: + properties: + apiVersion: + type: string + kind: + type: string + metadata: + type: object + spec: + properties: + replicas: + format: int32 + type: integer + sunsetStrategy: + properties: + deleteDelay: + type: string + scaledownDelay: + type: string + type: object + temporalWorkerDeploymentName: + type: string + versionID: + type: string + required: + - temporalWorkerDeploymentName + - versionID + type: object + status: + properties: + appliedAt: + format: date-time + type: string + message: + type: string + observedGeneration: + format: int64 + type: integer + status: + type: string + required: + - status + type: object + type: object + served: true + storage: true + subresources: + status: {} diff --git a/internal/controller/genplan.go b/internal/controller/genplan.go index 394927e7..84731c46 100644 --- a/internal/controller/genplan.go +++ b/internal/controller/genplan.go @@ -66,6 +66,19 @@ func (r *TemporalWorkerDeploymentReconciler) generatePlan( return nil, fmt.Errorf("unable to get Kubernetes deployment state: %w", err) } + // Gather version-specific patches + versionPatches := make(map[string]*temporaliov1alpha1.TemporalWorkerDeploymentPatchSpec) + if r.PatchApplier != nil { + patches, err := r.PatchApplier.getPatches(ctx, w) + if err != nil { + l.Error(err, "failed to get patches, continuing without them") + } else { + for _, patch := range patches { + versionPatches[patch.Spec.VersionID] = &patch.Spec + } + } + } + // Create a simple plan structure plan := &plan{ TemporalNamespace: w.Spec.WorkerOptions.TemporalNamespace, @@ -88,6 +101,7 @@ func (r *TemporalWorkerDeploymentReconciler) generatePlan( TargetVersionID: targetVersionID, Replicas: *w.Spec.Replicas, ConflictToken: w.Status.VersionConflictToken, + VersionPatches: versionPatches, } planResult, err := planner.GeneratePlan( @@ -119,7 +133,7 @@ func (r *TemporalWorkerDeploymentReconciler) generatePlan( // Handle deployment creation if needed if planResult.ShouldCreateDeployment { _, buildID, _ := k8s.SplitVersionID(targetVersionID) - d, err := r.newDeployment(w, buildID, connection) + d, err := r.newDeployment(ctx, w, buildID, connection) if err != nil { return nil, err } @@ -131,6 +145,7 @@ func (r *TemporalWorkerDeploymentReconciler) generatePlan( // Create a new deployment with owner reference func (r *TemporalWorkerDeploymentReconciler) newDeployment( + ctx context.Context, w *temporaliov1alpha1.TemporalWorkerDeployment, buildID string, connection temporaliov1alpha1.TemporalConnectionSpec, diff --git a/internal/controller/patch_integration.go b/internal/controller/patch_integration.go new file mode 100644 index 00000000..0cbb2b1c --- /dev/null +++ b/internal/controller/patch_integration.go @@ -0,0 +1,185 @@ +// Unless explicitly stated otherwise all files in this repository are licensed under the MIT License. +// +// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2024 Datadog, Inc. + +package controller + +import ( + "context" + "fmt" + + "github.com/go-logr/logr" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/util/workqueue" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/event" + + temporaliov1alpha1 "github.com/temporalio/temporal-worker-controller/api/v1alpha1" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/reconcile" +) + +// PatchApplier contains the logic for applying version-specific overrides +type PatchApplier struct { + client client.Client + logger logr.Logger +} + +// NewPatchApplier creates a new PatchApplier +func NewPatchApplier(client client.Client, logger logr.Logger) *PatchApplier { + return &PatchApplier{ + client: client, + logger: logger, + } +} + +// getPatches retrieves all patches targeting the given TemporalWorkerDeployment +func (pa *PatchApplier) getPatches( + ctx context.Context, + workerDeployment *temporaliov1alpha1.TemporalWorkerDeployment, +) ([]temporaliov1alpha1.TemporalWorkerDeploymentPatch, error) { + patchList := &temporaliov1alpha1.TemporalWorkerDeploymentPatchList{} + + // List all patches in the same namespace targeting this worker deployment + err := pa.client.List(ctx, patchList, client.MatchingFields{ + patchTargetKey: workerDeployment.Name, + }, client.InNamespace(workerDeployment.Namespace)) + if err != nil { + return nil, fmt.Errorf("failed to list patches: %w", err) + } + + return patchList.Items, nil +} + +// UpdatePatchStatuses updates the status of patches based on the current state of versions +func (pa *PatchApplier) UpdatePatchStatuses( + ctx context.Context, + workerDeployment *temporaliov1alpha1.TemporalWorkerDeployment, +) error { + patches, err := pa.getPatches(ctx, workerDeployment) + if err != nil { + return fmt.Errorf("failed to get patches: %w", err) + } + + // Get current version IDs from the worker deployment status + activeVersions := pa.getActiveVersionIDs(workerDeployment) + + for _, patch := range patches { + newStatus := pa.determinePatchStatus(patch, workerDeployment, activeVersions) + + if patch.Status.Status != newStatus { + patch.Status.Status = newStatus + patch.Status.ObservedGeneration = patch.Generation + + switch newStatus { + case temporaliov1alpha1.PatchStatusActive: + now := metav1.Now() + patch.Status.AppliedAt = &now + patch.Status.Message = "Patch successfully applied to active version" + case temporaliov1alpha1.PatchStatusOrphaned: + patch.Status.Message = "Referenced version no longer exists" + case temporaliov1alpha1.PatchStatusInvalid: + patch.Status.Message = "Referenced TemporalWorkerDeployment not found" + } + + if err := pa.client.Status().Update(ctx, &patch); err != nil { + pa.logger.Error(err, "Failed to update patch status", "patch", patch.Name) + continue + } + } + } + + return nil +} + +// determinePatchStatus determines the appropriate status for a patch +func (pa *PatchApplier) determinePatchStatus( + patch temporaliov1alpha1.TemporalWorkerDeploymentPatch, + workerDeployment *temporaliov1alpha1.TemporalWorkerDeployment, + activeVersions map[string]bool, +) temporaliov1alpha1.PatchStatus { + // Check if the target deployment name matches (namespace is assumed to be the same) + if workerDeployment.Name != patch.Spec.TemporalWorkerDeploymentName { + return temporaliov1alpha1.PatchStatusInvalid + } + + // Check if the version exists + if activeVersions[patch.Spec.VersionID] { + return temporaliov1alpha1.PatchStatusActive + } + + return temporaliov1alpha1.PatchStatusOrphaned +} + +// getActiveVersionIDs extracts all active version IDs from the worker deployment status +func (pa *PatchApplier) getActiveVersionIDs( + workerDeployment *temporaliov1alpha1.TemporalWorkerDeployment, +) map[string]bool { + versions := make(map[string]bool) + + if workerDeployment.Status.CurrentVersion != nil { + versions[workerDeployment.Status.CurrentVersion.VersionID] = true + } + + if workerDeployment.Status.TargetVersion != nil { + versions[workerDeployment.Status.TargetVersion.VersionID] = true + } + + if workerDeployment.Status.RampingVersion != nil { + versions[workerDeployment.Status.RampingVersion.VersionID] = true + } + + for _, deprecatedVersion := range workerDeployment.Status.DeprecatedVersions { + versions[deprecatedVersion.VersionID] = true + } + + return versions +} + +// enqueuePatchHandler handles events from TemporalWorkerDeploymentPatch resources +// and enqueues reconciliation requests for the target TemporalWorkerDeployment +type enqueuePatchHandler struct { + client client.Client +} + +// Create implements handler.TypedEventHandler +func (h *enqueuePatchHandler) Create(ctx context.Context, e event.TypedCreateEvent[client.Object], q workqueue.TypedRateLimitingInterface[reconcile.Request]) { + h.enqueuePatch(e.Object, q) +} + +// Update implements handler.TypedEventHandler +func (h *enqueuePatchHandler) Update(ctx context.Context, e event.TypedUpdateEvent[client.Object], q workqueue.TypedRateLimitingInterface[reconcile.Request]) { + h.enqueuePatch(e.ObjectNew, q) +} + +// Delete implements handler.TypedEventHandler +func (h *enqueuePatchHandler) Delete(ctx context.Context, e event.TypedDeleteEvent[client.Object], q workqueue.TypedRateLimitingInterface[reconcile.Request]) { + h.enqueuePatch(e.Object, q) +} + +// Generic implements handler.TypedEventHandler +func (h *enqueuePatchHandler) Generic(ctx context.Context, e event.TypedGenericEvent[client.Object], q workqueue.TypedRateLimitingInterface[reconcile.Request]) { + h.enqueuePatch(e.Object, q) +} + +// enqueuePatch enqueues a reconciliation request for the TemporalWorkerDeployment +// that this patch targets +func (h *enqueuePatchHandler) enqueuePatch(obj client.Object, q workqueue.TypedRateLimitingInterface[reconcile.Request]) { + patch, ok := obj.(*temporaliov1alpha1.TemporalWorkerDeploymentPatch) + if !ok { + return + } + + req := reconcile.Request{ + NamespacedName: types.NamespacedName{ + Name: patch.Spec.TemporalWorkerDeploymentName, + Namespace: patch.Namespace, + }, + } + + q.Add(req) +} + +// Ensure enqueuePatchHandler implements the correct event handler interface +var _ handler.TypedEventHandler[client.Object, reconcile.Request] = &enqueuePatchHandler{} diff --git a/internal/controller/worker_controller.go b/internal/controller/worker_controller.go index a3e849ae..bb5cb9c1 100644 --- a/internal/controller/worker_controller.go +++ b/internal/controller/worker_controller.go @@ -28,9 +28,7 @@ var ( ) const ( - // TODO(jlegrone): add this everywhere - deployOwnerKey = ".metadata.controller" - buildIDLabel = "temporal.io/build-id" + buildIDLabel = "temporal.io/build-id" ) // TemporalWorkerDeploymentReconciler reconciles a TemporalWorkerDeployment object @@ -38,11 +36,15 @@ type TemporalWorkerDeploymentReconciler struct { client.Client Scheme *runtime.Scheme TemporalClientPool *clientpool.ClientPool + PatchApplier *PatchApplier } //+kubebuilder:rbac:groups=temporal.io,resources=temporalworkerdeployments,verbs=get;list;watch;create;update;patch;delete //+kubebuilder:rbac:groups=temporal.io,resources=temporalworkerdeployments/status,verbs=get;update;patch //+kubebuilder:rbac:groups=temporal.io,resources=temporalworkerdeployments/finalizers,verbs=update +//+kubebuilder:rbac:groups=temporal.io,resources=temporalworkerdeploymentpatches,verbs=get;list;watch;create;update;patch;delete +//+kubebuilder:rbac:groups=temporal.io,resources=temporalworkerdeploymentpatches/status,verbs=get;update;patch +//+kubebuilder:rbac:groups=temporal.io,resources=temporalworkerdeploymentpatches/finalizers,verbs=update //+kubebuilder:rbac:groups=temporal.io,resources=temporalconnections,verbs=get;list;watch //+kubebuilder:rbac:groups=core,resources=secrets,verbs=get;list;watch //+kubebuilder:rbac:groups=apps,resources=deployments,verbs=get;list;watch;create;update;patch;delete @@ -143,6 +145,11 @@ func (r *TemporalWorkerDeploymentReconciler) Reconcile(ctx context.Context, req return ctrl.Result{}, err } + // Initialize PatchApplier if not already set + if r.PatchApplier == nil { + r.PatchApplier = NewPatchApplier(r.Client, l) + } + // TODO(jlegrone): Set defaults via webhook rather than manually // (defaults were already set above, but have to be set again after status update) if err := workerDeploy.Default(ctx, &workerDeploy); err != nil { @@ -161,6 +168,12 @@ func (r *TemporalWorkerDeploymentReconciler) Reconcile(ctx context.Context, req return ctrl.Result{}, err } + // Update patch statuses after successful plan execution + if err := r.PatchApplier.UpdatePatchStatuses(ctx, &workerDeploy); err != nil { + l.Error(err, "failed to update patch statuses") + // Don't fail reconciliation if patch status update fails + } + return ctrl.Result{ Requeue: true, // TODO(jlegrone): Consider increasing this value if the only thing we need to check for is unreachable versions. @@ -170,6 +183,13 @@ func (r *TemporalWorkerDeploymentReconciler) Reconcile(ctx context.Context, req }, nil } +const ( + // Index key for deployments owned by TemporalWorkerDeployment + deployOwnerKey = ".metadata.controller" + // Index key for patches targeting a TemporalWorkerDeployment (for future use) + patchTargetKey = ".spec.temporalWorkerDeploymentName" +) + // SetupWithManager sets up the controller with the Manager. func (r *TemporalWorkerDeploymentReconciler) SetupWithManager(mgr ctrl.Manager) error { if err := mgr.GetFieldIndexer().IndexField(context.Background(), &appsv1.Deployment{}, deployOwnerKey, func(rawObj client.Object) []string { @@ -192,9 +212,18 @@ func (r *TemporalWorkerDeploymentReconciler) SetupWithManager(mgr ctrl.Manager) return err } + // Index patches by their target TemporalWorkerDeployment + if err := mgr.GetFieldIndexer().IndexField(context.Background(), &temporaliov1alpha1.TemporalWorkerDeploymentPatch{}, patchTargetKey, func(rawObj client.Object) []string { + patch := rawObj.(*temporaliov1alpha1.TemporalWorkerDeploymentPatch) + return []string{patch.Spec.TemporalWorkerDeploymentName} + }); err != nil { + return err + } + return ctrl.NewControllerManagedBy(mgr). For(&temporaliov1alpha1.TemporalWorkerDeployment{}). Owns(&appsv1.Deployment{}). + Watches(&temporaliov1alpha1.TemporalWorkerDeploymentPatch{}, &enqueuePatchHandler{client: mgr.GetClient()}). WithOptions(controller.Options{ MaxConcurrentReconciles: 100, }). diff --git a/internal/planner/planner.go b/internal/planner/planner.go index 044eaec5..b16cff35 100644 --- a/internal/planner/planner.go +++ b/internal/planner/planner.go @@ -64,6 +64,8 @@ type Config struct { Replicas int32 // Token to use for conflict detection ConflictToken []byte + // Version-specific patches that override replicas and sunset strategies + VersionPatches map[string]*temporaliov1alpha1.TemporalWorkerDeploymentPatchSpec } // ScaledownDelay returns the scaledown delay from the sunset strategy @@ -82,6 +84,30 @@ func getDeleteDelay(spec *temporaliov1alpha1.TemporalWorkerDeploymentSpec) time. return spec.SunsetStrategy.DeleteDelay.Duration } +// getEffectiveReplicas returns the effective replicas for a version, considering patches +func getEffectiveReplicas(versionID string, baseReplicas int32, patches map[string]*temporaliov1alpha1.TemporalWorkerDeploymentPatchSpec) int32 { + if patch, exists := patches[versionID]; exists && patch.Replicas != nil { + return *patch.Replicas + } + return baseReplicas +} + +// getEffectiveScaledownDelay returns the effective scaledown delay for a version, considering patches +func getEffectiveScaledownDelay(versionID string, spec *temporaliov1alpha1.TemporalWorkerDeploymentSpec, patches map[string]*temporaliov1alpha1.TemporalWorkerDeploymentPatchSpec) time.Duration { + if patch, exists := patches[versionID]; exists && patch.SunsetStrategy != nil && patch.SunsetStrategy.ScaledownDelay != nil { + return patch.SunsetStrategy.ScaledownDelay.Duration + } + return getScaledownDelay(spec) +} + +// getEffectiveDeleteDelay returns the effective delete delay for a version, considering patches +func getEffectiveDeleteDelay(versionID string, spec *temporaliov1alpha1.TemporalWorkerDeploymentSpec, patches map[string]*temporaliov1alpha1.TemporalWorkerDeploymentPatchSpec) time.Duration { + if patch, exists := patches[versionID]; exists && patch.SunsetStrategy != nil && patch.SunsetStrategy.DeleteDelay != nil { + return patch.SunsetStrategy.DeleteDelay.Duration + } + return getDeleteDelay(spec) +} + // GeneratePlan creates a plan for updating the worker deployment func GeneratePlan( l logr.Logger, @@ -132,7 +158,7 @@ func getDeleteDeployments( // Deleting a deployment is only possible when: // 1. The deployment has been drained for deleteDelay + scaledownDelay. // 2. The deployment is scaled to 0 replicas. - if (time.Since(version.DrainedSince.Time) > getDeleteDelay(config.Spec)+getScaledownDelay(config.Spec)) && + if (time.Since(version.DrainedSince.Time) > getEffectiveDeleteDelay(version.VersionID, config.Spec, config.VersionPatches)+getEffectiveScaledownDelay(version.VersionID, config.Spec, config.VersionPatches)) && *d.Spec.Replicas == 0 { deleteDeployments = append(deleteDeployments, d) } @@ -167,8 +193,9 @@ func getScaleDeployments( if config.Status.CurrentVersion != nil && config.Status.CurrentVersion.Deployment != nil { ref := config.Status.CurrentVersion.Deployment if d, exists := k8sState.Deployments[config.Status.CurrentVersion.VersionID]; exists { - if d.Spec.Replicas != nil && *d.Spec.Replicas != config.Replicas { - scaleDeployments[ref] = uint32(config.Replicas) + effectiveReplicas := getEffectiveReplicas(config.Status.CurrentVersion.VersionID, config.Replicas, config.VersionPatches) + if d.Spec.Replicas != nil && *d.Spec.Replicas != effectiveReplicas { + scaleDeployments[ref] = uint32(effectiveReplicas) } } } @@ -190,11 +217,12 @@ func getScaleDeployments( temporaliov1alpha1.VersionStatusCurrent: // TODO(carlydf): Consolidate scale up cases and verify that scale up is the correct action for inactive versions // Scale up these deployments - if d.Spec.Replicas != nil && *d.Spec.Replicas != config.Replicas { - scaleDeployments[version.Deployment] = uint32(config.Replicas) + effectiveReplicas := getEffectiveReplicas(version.VersionID, config.Replicas, config.VersionPatches) + if d.Spec.Replicas != nil && *d.Spec.Replicas != effectiveReplicas { + scaleDeployments[version.Deployment] = uint32(effectiveReplicas) } case temporaliov1alpha1.VersionStatusDrained: - if time.Since(version.DrainedSince.Time) > getScaledownDelay(config.Spec) { + if time.Since(version.DrainedSince.Time) > getEffectiveScaledownDelay(version.VersionID, config.Spec, config.VersionPatches) { // TODO(jlegrone): Compute scale based on load? Or percentage of replicas? // Scale down drained deployments after delay if d.Spec.Replicas != nil && *d.Spec.Replicas != 0 { @@ -208,8 +236,9 @@ func getScaleDeployments( if config.Status.TargetVersion != nil && config.Status.TargetVersion.Deployment != nil && config.Status.TargetVersion.VersionID == config.TargetVersionID { if d, exists := k8sState.Deployments[config.Status.TargetVersion.VersionID]; exists { - if d.Spec.Replicas == nil || *d.Spec.Replicas != config.Replicas { - scaleDeployments[config.Status.TargetVersion.Deployment] = uint32(config.Replicas) + effectiveReplicas := getEffectiveReplicas(config.Status.TargetVersion.VersionID, config.Replicas, config.VersionPatches) + if d.Spec.Replicas == nil || *d.Spec.Replicas != effectiveReplicas { + scaleDeployments[config.Status.TargetVersion.Deployment] = uint32(effectiveReplicas) } } } diff --git a/internal/planner/planner_test.go b/internal/planner/planner_test.go index 2f46ed0b..a0beacd5 100644 --- a/internal/planner/planner_test.go +++ b/internal/planner/planner_test.go @@ -1703,3 +1703,196 @@ func rolloutStep(ramp float32, d time.Duration) temporaliov1alpha1.RolloutStep { PauseDuration: metav1Duration(d), } } + +// Helper function to ensure a config has an empty version patches map +func ensureVersionPatches(config *Config) { + if config.VersionPatches == nil { + config.VersionPatches = make(map[string]*temporaliov1alpha1.TemporalWorkerDeploymentPatchSpec) + } +} + +func TestVersionPatches(t *testing.T) { + testCases := []struct { + name string + k8sState *k8s.DeploymentState + config *Config + expectScales int + expectReplicas map[string]uint32 // version ID -> expected replicas + expectDeletes int + }{ + { + name: "patch overrides replicas for deprecated version", + k8sState: &k8s.DeploymentState{ + Deployments: map[string]*appsv1.Deployment{ + "test/namespace.v1": createDeploymentWithReplicas(5), // Current version already at desired replicas + "test/namespace.v2": createDeploymentWithReplicas(1), // Deprecated version with 1 replica + }, + DeploymentRefs: map[string]*v1.ObjectReference{ + "test/namespace.v1": {Name: "test-v1"}, + "test/namespace.v2": {Name: "test-v2"}, + }, + }, + config: &Config{ + TargetVersionID: "test/namespace.v1", + Status: &temporaliov1alpha1.TemporalWorkerDeploymentStatus{ + CurrentVersion: &temporaliov1alpha1.CurrentWorkerDeploymentVersion{ + BaseWorkerDeploymentVersion: temporaliov1alpha1.BaseWorkerDeploymentVersion{ + VersionID: "test/namespace.v1", + Status: temporaliov1alpha1.VersionStatusCurrent, + Deployment: &v1.ObjectReference{Name: "test-v1"}, + }, + }, + DeprecatedVersions: []*temporaliov1alpha1.DeprecatedWorkerDeploymentVersion{ + { + BaseWorkerDeploymentVersion: temporaliov1alpha1.BaseWorkerDeploymentVersion{ + VersionID: "test/namespace.v2", + Status: temporaliov1alpha1.VersionStatusInactive, + Deployment: &v1.ObjectReference{Name: "test-v2"}, + }, + }, + }, + }, + Spec: &temporaliov1alpha1.TemporalWorkerDeploymentSpec{}, + RolloutStrategy: temporaliov1alpha1.RolloutStrategy{}, + Replicas: 5, // Base desired replicas + ConflictToken: []byte{}, + VersionPatches: map[string]*temporaliov1alpha1.TemporalWorkerDeploymentPatchSpec{ + "test/namespace.v2": { + VersionID: "test/namespace.v2", + Replicas: func() *int32 { r := int32(10); return &r }(), // Patch overrides to 10 replicas + }, + }, + }, + expectScales: 1, // Only deprecated version needs scaling + expectReplicas: map[string]uint32{ + "test/namespace.v2": 10, // Should use patched value, not base replicas + }, + }, + { + name: "patch overrides sunset delay for drained version", + k8sState: &k8s.DeploymentState{ + Deployments: map[string]*appsv1.Deployment{ + "test/namespace.v1": createDeploymentWithReplicas(3), + "test/namespace.v2": createDeploymentWithReplicas(2), // Drained version still has replicas + }, + DeploymentRefs: map[string]*v1.ObjectReference{ + "test/namespace.v1": {Name: "test-v1"}, + "test/namespace.v2": {Name: "test-v2"}, + }, + }, + config: &Config{ + TargetVersionID: "test/namespace.v1", + Status: &temporaliov1alpha1.TemporalWorkerDeploymentStatus{ + CurrentVersion: &temporaliov1alpha1.CurrentWorkerDeploymentVersion{ + BaseWorkerDeploymentVersion: temporaliov1alpha1.BaseWorkerDeploymentVersion{ + VersionID: "test/namespace.v1", + Status: temporaliov1alpha1.VersionStatusCurrent, + Deployment: &v1.ObjectReference{Name: "test-v1"}, + }, + }, + DeprecatedVersions: []*temporaliov1alpha1.DeprecatedWorkerDeploymentVersion{ + { + BaseWorkerDeploymentVersion: temporaliov1alpha1.BaseWorkerDeploymentVersion{ + VersionID: "test/namespace.v2", + Status: temporaliov1alpha1.VersionStatusDrained, + Deployment: &v1.ObjectReference{Name: "test-v2"}, + }, + DrainedSince: &metav1.Time{ + Time: time.Now().Add(-30 * time.Minute), // Drained 30 minutes ago + }, + }, + }, + }, + Spec: &temporaliov1alpha1.TemporalWorkerDeploymentSpec{ + SunsetStrategy: temporaliov1alpha1.SunsetStrategy{ + ScaledownDelay: &metav1.Duration{Duration: 2 * time.Hour}, // Base delay: 2 hours + }, + }, + RolloutStrategy: temporaliov1alpha1.RolloutStrategy{}, + Replicas: 3, + ConflictToken: []byte{}, + VersionPatches: map[string]*temporaliov1alpha1.TemporalWorkerDeploymentPatchSpec{ + "test/namespace.v2": { + VersionID: "test/namespace.v2", + SunsetStrategy: &temporaliov1alpha1.SunsetStrategy{ + ScaledownDelay: &metav1.Duration{Duration: 15 * time.Minute}, // Patch: much shorter delay + }, + }, + }, + }, + expectScales: 1, // Should scale down because patched delay is shorter + expectReplicas: map[string]uint32{ + "test/namespace.v2": 0, // Should scale to 0 due to patch override + }, + }, + { + name: "no patches applied when none exist", + k8sState: &k8s.DeploymentState{ + Deployments: map[string]*appsv1.Deployment{ + "test/namespace.v1": createDeploymentWithReplicas(5), // Already at desired replicas + "test/namespace.v2": createDeploymentWithReplicas(1), + }, + DeploymentRefs: map[string]*v1.ObjectReference{ + "test/namespace.v1": {Name: "test-v1"}, + "test/namespace.v2": {Name: "test-v2"}, + }, + }, + config: &Config{ + TargetVersionID: "test/namespace.v1", + Status: &temporaliov1alpha1.TemporalWorkerDeploymentStatus{ + CurrentVersion: &temporaliov1alpha1.CurrentWorkerDeploymentVersion{ + BaseWorkerDeploymentVersion: temporaliov1alpha1.BaseWorkerDeploymentVersion{ + VersionID: "test/namespace.v1", + Status: temporaliov1alpha1.VersionStatusCurrent, + Deployment: &v1.ObjectReference{Name: "test-v1"}, + }, + }, + DeprecatedVersions: []*temporaliov1alpha1.DeprecatedWorkerDeploymentVersion{ + { + BaseWorkerDeploymentVersion: temporaliov1alpha1.BaseWorkerDeploymentVersion{ + VersionID: "test/namespace.v2", + Status: temporaliov1alpha1.VersionStatusInactive, + Deployment: &v1.ObjectReference{Name: "test-v2"}, + }, + }, + }, + }, + Spec: &temporaliov1alpha1.TemporalWorkerDeploymentSpec{}, + RolloutStrategy: temporaliov1alpha1.RolloutStrategy{}, + Replicas: 5, + ConflictToken: []byte{}, + VersionPatches: make(map[string]*temporaliov1alpha1.TemporalWorkerDeploymentPatchSpec), // No patches + }, + expectScales: 1, // Only deprecated version needs scaling + expectReplicas: map[string]uint32{ + "test/namespace.v2": 5, // Should use base replicas (no patch) + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + plan, err := GeneratePlan(logr.Discard(), tc.k8sState, tc.config) + require.NoError(t, err) + + assert.Equal(t, tc.expectScales, len(plan.ScaleDeployments), "unexpected number of scales") + assert.Equal(t, tc.expectDeletes, len(plan.DeleteDeployments), "unexpected number of deletes") + + // Verify the specific replicas for each deployment + for versionID, expectedReplicas := range tc.expectReplicas { + found := false + for objRef, actualReplicas := range plan.ScaleDeployments { + // Find the deployment reference that matches this version + if tc.k8sState.DeploymentRefs[versionID] != nil && + tc.k8sState.DeploymentRefs[versionID].Name == objRef.Name { + assert.Equal(t, expectedReplicas, actualReplicas, + "unexpected replicas for version %s", versionID) + found = true + break + } + } + assert.True(t, found, "expected scaling operation for version %s not found", versionID) + } + }) + } +} From 25c31b2f20175ca0471fdc57f5bee64508ce165d Mon Sep 17 00:00:00 2001 From: Rob Holland Date: Tue, 22 Jul 2025 16:05:03 +0100 Subject: [PATCH 2/4] Cleaner interface. --- api/v1alpha1/worker_types.go | 5 -- internal/controller/genplan.go | 18 +++-- ...tch_integration.go => patch_management.go} | 74 +++++-------------- internal/controller/worker_controller.go | 10 +-- 4 files changed, 30 insertions(+), 77 deletions(-) rename internal/controller/{patch_integration.go => patch_management.go} (68%) diff --git a/api/v1alpha1/worker_types.go b/api/v1alpha1/worker_types.go index fd4870f9..b2b26844 100644 --- a/api/v1alpha1/worker_types.go +++ b/api/v1alpha1/worker_types.go @@ -390,10 +390,6 @@ type TemporalWorkerDeploymentPatchStatus struct { // +optional AppliedAt *metav1.Time `json:"appliedAt,omitempty"` - // Message provides additional information about the patch status - // +optional - Message string `json:"message,omitempty"` - // ObservedGeneration reflects the generation of the most recently observed patch spec // +optional ObservedGeneration int64 `json:"observedGeneration,omitempty"` @@ -404,7 +400,6 @@ type TemporalWorkerDeploymentPatchStatus struct { // +kubebuilder:resource:shortName=twdpatch;twd-patch;temporalworkerpatch //+kubebuilder:printcolumn:name="Target",type="string",JSONPath=".spec.temporalWorkerDeploymentName",description="Target TemporalWorkerDeployment" //+kubebuilder:printcolumn:name="Version",type="string",JSONPath=".spec.versionID",description="Target Version ID" -//+kubebuilder:printcolumn:name="Replicas",type="integer",JSONPath=".spec.replicas",description="Override Replicas" //+kubebuilder:printcolumn:name="Status",type="string",JSONPath=".status.status",description="Patch Status" //+kubebuilder:printcolumn:name="Age",type="date",JSONPath=".metadata.creationTimestamp" diff --git a/internal/controller/genplan.go b/internal/controller/genplan.go index ef3f09e1..0f72c8e0 100644 --- a/internal/controller/genplan.go +++ b/internal/controller/genplan.go @@ -13,6 +13,7 @@ import ( appsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" temporaliov1alpha1 "github.com/temporalio/temporal-worker-controller/api/v1alpha1" "github.com/temporalio/temporal-worker-controller/internal/k8s" @@ -70,14 +71,15 @@ func (r *TemporalWorkerDeploymentReconciler) generatePlan( // Gather version-specific patches versionPatches := make(map[string]*temporaliov1alpha1.TemporalWorkerDeploymentPatchSpec) - if r.PatchApplier != nil { - patches, err := r.PatchApplier.getPatches(ctx, w) - if err != nil { - l.Error(err, "failed to get patches, continuing without them") - } else { - for _, patch := range patches { - versionPatches[patch.Spec.VersionID] = &patch.Spec - } + patchList := &temporaliov1alpha1.TemporalWorkerDeploymentPatchList{} + err = r.Client.List(ctx, patchList, client.MatchingFields{ + patchTargetKey: w.Name, + }, client.InNamespace(w.Namespace)) + if err != nil { + l.Error(err, "failed to get patches, continuing without them") + } else { + for _, patch := range patchList.Items { + versionPatches[patch.Spec.VersionID] = &patch.Spec } } diff --git a/internal/controller/patch_integration.go b/internal/controller/patch_management.go similarity index 68% rename from internal/controller/patch_integration.go rename to internal/controller/patch_management.go index 15c0dc1e..77f9dd0d 100644 --- a/internal/controller/patch_integration.go +++ b/internal/controller/patch_management.go @@ -9,82 +9,44 @@ import ( "fmt" "github.com/go-logr/logr" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/util/workqueue" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/event" - - temporaliov1alpha1 "github.com/temporalio/temporal-worker-controller/api/v1alpha1" - "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/reconcile" -) - -// PatchApplier contains the logic for applying version-specific overrides -type PatchApplier struct { - client client.Client - logger logr.Logger -} -// NewPatchApplier creates a new PatchApplier -func NewPatchApplier(client client.Client, logger logr.Logger) *PatchApplier { - return &PatchApplier{ - client: client, - logger: logger, - } -} + temporaliov1alpha1 "github.com/temporalio/temporal-worker-controller/api/v1alpha1" +) -// getPatches retrieves all patches targeting the given TemporalWorkerDeployment -func (pa *PatchApplier) getPatches( +// updatePatchStatuses updates the status of patches based on the current state of versions +func updatePatchStatuses( ctx context.Context, + k8sClient client.Client, + logger logr.Logger, workerDeployment *temporaliov1alpha1.TemporalWorkerDeployment, -) ([]temporaliov1alpha1.TemporalWorkerDeploymentPatch, error) { +) error { + // List all patches targeting this worker deployment patchList := &temporaliov1alpha1.TemporalWorkerDeploymentPatchList{} - - // List all patches in the same namespace targeting this worker deployment - err := pa.client.List(ctx, patchList, client.MatchingFields{ + err := k8sClient.List(ctx, patchList, client.MatchingFields{ patchTargetKey: workerDeployment.Name, }, client.InNamespace(workerDeployment.Namespace)) if err != nil { - return nil, fmt.Errorf("failed to list patches: %w", err) - } - - return patchList.Items, nil -} - -// UpdatePatchStatuses updates the status of patches based on the current state of versions -func (pa *PatchApplier) UpdatePatchStatuses( - ctx context.Context, - workerDeployment *temporaliov1alpha1.TemporalWorkerDeployment, -) error { - patches, err := pa.getPatches(ctx, workerDeployment) - if err != nil { - return fmt.Errorf("failed to get patches: %w", err) + return fmt.Errorf("failed to list patches: %w", err) } // Get current version IDs from the worker deployment status - activeVersions := pa.getActiveVersionIDs(workerDeployment) + activeVersions := getActiveVersionIDs(workerDeployment) - for _, patch := range patches { - newStatus := pa.determinePatchStatus(patch, workerDeployment, activeVersions) + for _, patch := range patchList.Items { + newStatus := determinePatchStatus(patch, workerDeployment, activeVersions) if patch.Status.Status != newStatus { patch.Status.Status = newStatus patch.Status.ObservedGeneration = patch.Generation - switch newStatus { - case temporaliov1alpha1.PatchStatusActive: - now := metav1.Now() - patch.Status.AppliedAt = &now - patch.Status.Message = "Patch successfully applied to active version" - case temporaliov1alpha1.PatchStatusOrphaned: - patch.Status.Message = "Referenced version no longer exists" - case temporaliov1alpha1.PatchStatusInvalid: - patch.Status.Message = "Referenced TemporalWorkerDeployment not found" - } - - if err := pa.client.Status().Update(ctx, &patch); err != nil { - pa.logger.Error(err, "Failed to update patch status", "patch", patch.Name) + if err := k8sClient.Status().Update(ctx, &patch); err != nil { + logger.Error(err, "Failed to update patch status", "patch", patch.Name) continue } } @@ -94,7 +56,7 @@ func (pa *PatchApplier) UpdatePatchStatuses( } // determinePatchStatus determines the appropriate status for a patch -func (pa *PatchApplier) determinePatchStatus( +func determinePatchStatus( patch temporaliov1alpha1.TemporalWorkerDeploymentPatch, workerDeployment *temporaliov1alpha1.TemporalWorkerDeployment, activeVersions map[string]bool, @@ -113,7 +75,7 @@ func (pa *PatchApplier) determinePatchStatus( } // getActiveVersionIDs extracts all active version IDs from the worker deployment status -func (pa *PatchApplier) getActiveVersionIDs( +func getActiveVersionIDs( workerDeployment *temporaliov1alpha1.TemporalWorkerDeployment, ) map[string]bool { versions := make(map[string]bool) diff --git a/internal/controller/worker_controller.go b/internal/controller/worker_controller.go index 3e6f4064..24146dc3 100644 --- a/internal/controller/worker_controller.go +++ b/internal/controller/worker_controller.go @@ -41,7 +41,6 @@ type TemporalWorkerDeploymentReconciler struct { client.Client Scheme *runtime.Scheme TemporalClientPool *clientpool.ClientPool - PatchApplier *PatchApplier } //+kubebuilder:rbac:groups=temporal.io,resources=temporalworkerdeployments,verbs=get;list;watch;create;update;patch;delete @@ -162,11 +161,6 @@ func (r *TemporalWorkerDeploymentReconciler) Reconcile(ctx context.Context, req return ctrl.Result{}, err } - // Initialize PatchApplier if not already set - if r.PatchApplier == nil { - r.PatchApplier = NewPatchApplier(r.Client, l) - } - // TODO(jlegrone): Set defaults via webhook rather than manually // (defaults were already set above, but have to be set again after status update) if err := workerDeploy.Default(ctx, &workerDeploy); err != nil { @@ -186,7 +180,7 @@ func (r *TemporalWorkerDeploymentReconciler) Reconcile(ctx context.Context, req } // Update patch statuses after successful plan execution - if err := r.PatchApplier.UpdatePatchStatuses(ctx, &workerDeploy); err != nil { + if err := updatePatchStatuses(ctx, r.Client, l, &workerDeploy); err != nil { l.Error(err, "failed to update patch statuses") // Don't fail reconciliation if patch status update fails } @@ -201,7 +195,7 @@ func (r *TemporalWorkerDeploymentReconciler) Reconcile(ctx context.Context, req } const ( - // Index key for patches targeting a TemporalWorkerDeployment (for future use) + // Index key for patches targeting a TemporalWorkerDeployment patchTargetKey = ".spec.temporalWorkerDeploymentName" ) From 829d79f38cd92eb647137e7f3be5b2efe23c51f8 Mon Sep 17 00:00:00 2001 From: Rob Holland Date: Tue, 22 Jul 2025 16:33:28 +0100 Subject: [PATCH 3/4] Doc tweaks. --- docs/version-patches.md | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/docs/version-patches.md b/docs/version-patches.md index e88475dc..a78666f0 100644 --- a/docs/version-patches.md +++ b/docs/version-patches.md @@ -48,8 +48,6 @@ spec: deleteDelay: 7d # Preserve for a week ``` - - ## Patch Status The controller automatically updates patch status to indicate whether patches are successfully applied: @@ -66,7 +64,6 @@ The controller automatically updates patch status to indicate whether patches ar status: status: Active appliedAt: "2024-01-15T10:30:00Z" - message: "Patch successfully applied to active version" observedGeneration: 1 ``` @@ -76,9 +73,19 @@ status: 1. **Patch Discovery**: The controller watches for changes to `TemporalWorkerDeploymentPatch` resources 2. **Status Updates**: Patch statuses are updated during each reconciliation loop -3. **Spec Application**: When creating or updating deployments, patches are applied to compute the effective configuration +3. **Continuous Application**: During every reconciliation, patches are applied to compute effective configuration for all operations 4. **Event Propagation**: Changes to patches trigger reconciliation of the target `TemporalWorkerDeployment` +### How Patches Are Applied + +Patches are applied **continuously during each reconciliation loop**, affecting: + +- **Scaling Operations**: Existing deployments are scaled immediately when patch replica counts differ from current state +- **Sunset Decisions**: Patch sunset strategies are used to determine when to scale down and delete deployments +- **New Deployment Creation**: When new deployments are created, they use the effective configuration including patches + +The controller actively monitors the difference between current deployment state and the effective configuration (including patches) and takes action to reconcile any differences. + ### Conflict Resolution - If multiple patches target the same version and field, the last applied patch wins @@ -120,9 +127,9 @@ metadata: ## Limitations 1. **Namespace Scope**: Patches must be in the same namespace as their target `TemporalWorkerDeployment` -2. **Supported Fields**: Currently only `replicas` and `sunsetStrategy` can be overridden +2. **Supported Fields**: Currently only `replicas` and `sunsetStrategy` fields can be overridden 3. **Version Scope**: Patches apply to specific version IDs, not version patterns -4. **Runtime Changes**: Patches are applied during deployment creation/update, not to existing deployments +4. **Immediate Effect**: Patch changes take effect during the next reconciliation loop (typically within 10 seconds) ## Monitoring From dab42b045980ac247600b8a0ca7b4846e101d2bc Mon Sep 17 00:00:00 2001 From: Rob Holland Date: Thu, 24 Jul 2025 16:34:30 +0100 Subject: [PATCH 4/4] Set owner references on patches. --- internal/controller/patch_management.go | 56 +++++++++++++++++++++++++ 1 file changed, 56 insertions(+) diff --git a/internal/controller/patch_management.go b/internal/controller/patch_management.go index 77f9dd0d..5884c1fa 100644 --- a/internal/controller/patch_management.go +++ b/internal/controller/patch_management.go @@ -17,6 +17,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/reconcile" temporaliov1alpha1 "github.com/temporalio/temporal-worker-controller/api/v1alpha1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) // updatePatchStatuses updates the status of patches based on the current state of versions @@ -39,12 +40,33 @@ func updatePatchStatuses( activeVersions := getActiveVersionIDs(workerDeployment) for _, patch := range patchList.Items { + patchNeedsUpdate := false + statusNeedsUpdate := false + + // Check if we need to set OwnerReference + if !hasOwnerReference(&patch, workerDeployment) { + setOwnerReference(&patch, workerDeployment) + patchNeedsUpdate = true + } + newStatus := determinePatchStatus(patch, workerDeployment, activeVersions) if patch.Status.Status != newStatus { patch.Status.Status = newStatus patch.Status.ObservedGeneration = patch.Generation + statusNeedsUpdate = true + } + // Update the patch object if OwnerReference was added + if patchNeedsUpdate { + if err := k8sClient.Update(ctx, &patch); err != nil { + logger.Error(err, "Failed to update patch", "patch", patch.Name) + continue + } + } + + // Update status separately if status was changed + if statusNeedsUpdate { if err := k8sClient.Status().Update(ctx, &patch); err != nil { logger.Error(err, "Failed to update patch status", "patch", patch.Name) continue @@ -139,3 +161,37 @@ func (h *enqueuePatchHandler) enqueuePatch(obj client.Object, q workqueue.TypedR // Ensure enqueuePatchHandler implements the correct event handler interface var _ handler.TypedEventHandler[client.Object, reconcile.Request] = &enqueuePatchHandler{} + +// hasOwnerReference checks if the patch already has an OwnerReference to the specified TemporalWorkerDeployment +func hasOwnerReference( + patch *temporaliov1alpha1.TemporalWorkerDeploymentPatch, + workerDeployment *temporaliov1alpha1.TemporalWorkerDeployment, +) bool { + for _, ownerRef := range patch.GetOwnerReferences() { + if ownerRef.UID == workerDeployment.GetUID() && + ownerRef.Kind == "TemporalWorkerDeployment" && + ownerRef.APIVersion == temporaliov1alpha1.GroupVersion.String() { + return true + } + } + return false +} + +// setOwnerReference sets the OwnerReference on the patch to point to the TemporalWorkerDeployment +func setOwnerReference( + patch *temporaliov1alpha1.TemporalWorkerDeploymentPatch, + workerDeployment *temporaliov1alpha1.TemporalWorkerDeployment, +) { + blockOwnerDeletion := true + + ownerRef := metav1.OwnerReference{ + APIVersion: temporaliov1alpha1.GroupVersion.String(), + Kind: "TemporalWorkerDeployment", + Name: workerDeployment.GetName(), + UID: workerDeployment.GetUID(), + BlockOwnerDeletion: &blockOwnerDeletion, + Controller: nil, + } + + patch.SetOwnerReferences(append(patch.GetOwnerReferences(), ownerRef)) +}