Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 22 additions & 7 deletions pkg/dependenciesdistributor/dependencies_distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ func (d *DependenciesDistributor) handleDependentResource(
return err
}
attachedBinding := buildAttachedBinding(independentBinding, rawObject)
return d.createOrUpdateAttachedBinding(attachedBinding)
return d.createOrUpdateAttachedBinding(ctx, attachedBinding)
case dependent.LabelSelector != nil:
var selector labels.Selector
var err error
Expand All @@ -375,7 +375,7 @@ func (d *DependenciesDistributor) handleDependentResource(
}
for _, rawObject := range rawObjects {
attachedBinding := buildAttachedBinding(independentBinding, rawObject)
if err := d.createOrUpdateAttachedBinding(attachedBinding); err != nil {
if err := d.createOrUpdateAttachedBinding(ctx, attachedBinding); err != nil {
return err
}
}
Expand Down Expand Up @@ -573,10 +573,10 @@ func (d *DependenciesDistributor) removeScheduleResultFromAttachedBindings(bindi
return utilerrors.NewAggregate(errs)
}

func (d *DependenciesDistributor) createOrUpdateAttachedBinding(attachedBinding *workv1alpha2.ResourceBinding) error {
func (d *DependenciesDistributor) createOrUpdateAttachedBinding(ctx context.Context, attachedBinding *workv1alpha2.ResourceBinding) error {
existBinding := &workv1alpha2.ResourceBinding{}
bindingKey := client.ObjectKeyFromObject(attachedBinding)
err := d.Client.Get(context.TODO(), bindingKey, existBinding)
err := d.Client.Get(ctx, bindingKey, existBinding)
if err == nil {
// If this binding exists and its owner is not the input object, return error and let garbage collector
// delete this binding and try again later. See https://github.com/karmada-io/karmada/issues/6034.
Expand All @@ -585,17 +585,32 @@ func (d *DependenciesDistributor) createOrUpdateAttachedBinding(attachedBinding
"try again later after binding is garbage collected, see https://github.com/karmada-io/karmada/issues/6034", bindingKey)
}

mergedRequiredBy := mergeBindingSnapshot(existBinding.Spec.RequiredBy, attachedBinding.Spec.RequiredBy)

// If the spec.Placement is nil, this means that existBinding is generated by the dependency mechanism.
// If the spec.Placement is not nil, then it must be generated by PropagationPolicy.
if existBinding.Spec.Placement == nil {
// Pre-fetch referencing ResourceBindings once and reuse for conflict checks.
rbs, err := d.resolveResourceBindingFromSnapshots(ctx, mergedRequiredBy)
if err != nil {
klog.Errorf("Failed to resolve referencing ResourceBindings for (%s): %v", bindingKey, err)
return err
}

crConflictDetected := d.conflictDetectedForConflictResolution(rbs)
preserveConflictDetected := d.conflictDetectedForPreserveOnDeletion(rbs)

d.recordEventIfPolicyConflict(existBinding, crConflictDetected, preserveConflictDetected)

existBinding.Spec.ConflictResolution = attachedBinding.Spec.ConflictResolution
}
existBinding.Spec.RequiredBy = mergeBindingSnapshot(existBinding.Spec.RequiredBy, attachedBinding.Spec.RequiredBy)

existBinding.Spec.RequiredBy = mergedRequiredBy
existBinding.Labels = util.DedupeAndMergeLabels(existBinding.Labels, attachedBinding.Labels)
existBinding.Spec.Resource = attachedBinding.Spec.Resource
existBinding.Spec.PreserveResourcesOnDeletion = attachedBinding.Spec.PreserveResourcesOnDeletion

if err := d.Client.Update(context.TODO(), existBinding); err != nil {
if err := d.Client.Update(ctx, existBinding); err != nil {
klog.Errorf("Failed to update resourceBinding(%s): %v", bindingKey, err)
return err
}
Expand All @@ -607,7 +622,7 @@ func (d *DependenciesDistributor) createOrUpdateAttachedBinding(attachedBinding
return err
}

return d.Client.Create(context.TODO(), attachedBinding)
return d.Client.Create(ctx, attachedBinding)
}

// Start runs the distributor, never stop until context canceled.
Expand Down
173 changes: 169 additions & 4 deletions pkg/dependenciesdistributor/dependencies_distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"fmt"
"reflect"
"strings"
"testing"
"time"

Expand All @@ -34,6 +35,7 @@ import (
"k8s.io/client-go/dynamic"
dynamicfake "k8s.io/client-go/dynamic/fake"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/record"
"k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
Expand All @@ -42,6 +44,7 @@ import (
configv1alpha1 "github.com/karmada-io/karmada/pkg/apis/config/v1alpha1"
policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1"
workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2"
"github.com/karmada-io/karmada/pkg/events"
"github.com/karmada-io/karmada/pkg/util"
"github.com/karmada-io/karmada/pkg/util/fedinformer/genericmanager"
"github.com/karmada-io/karmada/pkg/util/fedinformer/keys"
Expand Down Expand Up @@ -2523,7 +2526,15 @@ func Test_createOrUpdateAttachedBinding(t *testing.T) {
},
},
}
return fake.NewClientBuilder().WithScheme(Scheme).WithObjects(rb).Build()

// Add the referenced ResourceBindings that will be looked up during conflict checks.
ref1 := &workv1alpha2.ResourceBinding{ObjectMeta: metav1.ObjectMeta{Name: "default-binding-1", Namespace: "default-1"}}
ref2 := &workv1alpha2.ResourceBinding{ObjectMeta: metav1.ObjectMeta{Name: "default-binding-2", Namespace: "default-2"}}
ref3 := &workv1alpha2.ResourceBinding{ObjectMeta: metav1.ObjectMeta{Name: "default-binding-3", Namespace: "default-3"}}
ref4 := &workv1alpha2.ResourceBinding{ObjectMeta: metav1.ObjectMeta{Name: "test-binding-1", Namespace: "test-1"}}
ref5 := &workv1alpha2.ResourceBinding{ObjectMeta: metav1.ObjectMeta{Name: "test-binding-2", Namespace: "test-2"}}

return fake.NewClientBuilder().WithScheme(Scheme).WithObjects(rb, ref1, ref2, ref3, ref4, ref5).Build()
},
},
{
Expand Down Expand Up @@ -2826,7 +2837,15 @@ func Test_createOrUpdateAttachedBinding(t *testing.T) {
},
},
}
return fake.NewClientBuilder().WithScheme(Scheme).WithObjects(rb).Build()

// Referenced ResourceBindings for conflict checks
ref1 := &workv1alpha2.ResourceBinding{ObjectMeta: metav1.ObjectMeta{Name: "default-binding-1", Namespace: "default-1"}}
ref2 := &workv1alpha2.ResourceBinding{ObjectMeta: metav1.ObjectMeta{Name: "default-binding-2", Namespace: "default-2"}}
ref3 := &workv1alpha2.ResourceBinding{ObjectMeta: metav1.ObjectMeta{Name: "default-binding-3", Namespace: "default-3"}}
ref4 := &workv1alpha2.ResourceBinding{ObjectMeta: metav1.ObjectMeta{Name: "test-binding-1", Namespace: "test-1"}}
ref5 := &workv1alpha2.ResourceBinding{ObjectMeta: metav1.ObjectMeta{Name: "test-binding-2", Namespace: "test-2"}}

return fake.NewClientBuilder().WithScheme(Scheme).WithObjects(rb, ref1, ref2, ref3, ref4, ref5).Build()
},
},
{
Expand Down Expand Up @@ -2933,7 +2952,12 @@ func Test_createOrUpdateAttachedBinding(t *testing.T) {
},
},
}
return fake.NewClientBuilder().WithScheme(Scheme).WithObjects(rb).Build()

// Referenced ResourceBindings for conflict checks
ref1 := &workv1alpha2.ResourceBinding{ObjectMeta: metav1.ObjectMeta{Name: "default-binding-1", Namespace: "default-1"}}
ref2 := &workv1alpha2.ResourceBinding{ObjectMeta: metav1.ObjectMeta{Name: "test-binding-1", Namespace: "test-1"}}

return fake.NewClientBuilder().WithScheme(Scheme).WithObjects(rb, ref1, ref2).Build()
},
},
}
Expand All @@ -2942,7 +2966,7 @@ func Test_createOrUpdateAttachedBinding(t *testing.T) {
d := &DependenciesDistributor{
Client: tt.setupClient(),
}
err := d.createOrUpdateAttachedBinding(tt.attachedBinding)
err := d.createOrUpdateAttachedBinding(context.TODO(), tt.attachedBinding)
if (err != nil) != tt.wantErr {
t.Errorf("createOrUpdateAttachedBinding() error = %v, wantErr %v", err, tt.wantErr)
}
Expand All @@ -2962,6 +2986,87 @@ func Test_createOrUpdateAttachedBinding(t *testing.T) {
}
}

func Test_createOrUpdateAttachedBinding_emitsConflictEvent(t *testing.T) {
Scheme := runtime.NewScheme()
utilruntime.Must(scheme.AddToScheme(Scheme))
utilruntime.Must(workv1alpha2.Install(Scheme))

// Existing attached binding to be updated (generated by dependency mechanism: Placement == nil)
exist := &workv1alpha2.ResourceBinding{
ObjectMeta: metav1.ObjectMeta{
Name: "test-binding",
Namespace: "test",
ResourceVersion: "1000",
OwnerReferences: []metav1.OwnerReference{{
UID: "uid-1",
Controller: ptr.To[bool](true),
}},
},
Spec: workv1alpha2.ResourceBindingSpec{},
}

// Referenced ResourceBindings with conflicting policies
rb1 := &workv1alpha2.ResourceBinding{
ObjectMeta: metav1.ObjectMeta{Name: "rb-1", Namespace: "ns-a"},
Spec: workv1alpha2.ResourceBindingSpec{
PreserveResourcesOnDeletion: ptr.To(true),
ConflictResolution: policyv1alpha1.ConflictOverwrite,
},
}
rb2 := &workv1alpha2.ResourceBinding{
ObjectMeta: metav1.ObjectMeta{Name: "rb-2", Namespace: "ns-b"},
Spec: workv1alpha2.ResourceBindingSpec{
PreserveResourcesOnDeletion: ptr.To(false),
},
}

attached := &workv1alpha2.ResourceBinding{
ObjectMeta: metav1.ObjectMeta{
Name: "test-binding",
Namespace: "test",
OwnerReferences: []metav1.OwnerReference{{
UID: "uid-1",
Controller: ptr.To[bool](true),
}},
},
Spec: workv1alpha2.ResourceBindingSpec{
Resource: workv1alpha2.ObjectReference{APIVersion: "apps/v1", Kind: "Deployment", Namespace: "fake-ns", Name: "demo-app", ResourceVersion: "1"},
RequiredBy: []workv1alpha2.BindingSnapshot{
{Namespace: "ns-a", Name: "rb-1"},
{Namespace: "ns-b", Name: "rb-2"},
},
},
}

fakeRec := record.NewFakeRecorder(10)
d := &DependenciesDistributor{
Client: fake.NewClientBuilder().WithScheme(Scheme).WithObjects(exist, rb1, rb2).Build(),
EventRecorder: fakeRec,
}

if err := d.createOrUpdateAttachedBinding(context.TODO(), attached); err != nil {
t.Fatalf("createOrUpdateAttachedBinding() error = %v", err)
}

select {
case e := <-fakeRec.Events:
if !strings.Contains(e, corev1.EventTypeWarning) {
t.Fatalf("expected Warning event, got %q", e)
}
if !strings.Contains(e, events.EventReasonDependencyPolicyConflict) {
t.Fatalf("expected reason %q in event, got %q", events.EventReasonDependencyPolicyConflict, e)
}
if !strings.Contains(e, "ConflictResolution conflicted (Overwrite vs Abort)") {
t.Fatalf("expected ConflictResolution conflicted hint in event, got %q", e)
}
if !strings.Contains(e, "PreserveResourcesOnDeletion conflicted (true vs false)") {
t.Fatalf("expected PreserveResourcesOnDeletion conflicted hint in event, got %q", e)
}
case <-time.After(1 * time.Second):
t.Fatalf("expected dependency policy conflict event, but none received")
}
}

func Test_buildAttachedBinding(t *testing.T) {
blockOwnerDeletion := true
isController := true
Expand Down Expand Up @@ -3432,3 +3537,63 @@ func Test_deleteBindingFromSnapshot(t *testing.T) {
})
}
}

func Test_resolveResourceBindingFromSnapshots_returnsBindings(t *testing.T) {
Scheme := runtime.NewScheme()
utilruntime.Must(scheme.AddToScheme(Scheme))
utilruntime.Must(workv1alpha2.Install(Scheme))

rb1 := &workv1alpha2.ResourceBinding{ObjectMeta: metav1.ObjectMeta{Name: "rb-1", Namespace: "ns-a"}}
rb2 := &workv1alpha2.ResourceBinding{ObjectMeta: metav1.ObjectMeta{Name: "rb-2", Namespace: "ns-b"}}

d := &DependenciesDistributor{
Client: fake.NewClientBuilder().WithScheme(Scheme).WithObjects(rb1, rb2).Build(),
}

snaps := []workv1alpha2.BindingSnapshot{
{Namespace: "ns-a", Name: "rb-1"},
{Namespace: "ns-b", Name: "rb-2"},
}

got, err := d.resolveResourceBindingFromSnapshots(context.TODO(), snaps)
if err != nil {
t.Fatalf("resolveResourceBindingFromSnapshots() error = %v", err)
}
if len(got) != 2 {
t.Fatalf("resolveResourceBindingFromSnapshots() len = %d, want 2", len(got))
}
if got[0].Name != "rb-1" || got[0].Namespace != "ns-a" {
t.Fatalf("resolveResourceBindingFromSnapshots()[0] = %s/%s, want ns-a/rb-1", got[0].Namespace, got[0].Name)
}
if got[1].Name != "rb-2" || got[1].Namespace != "ns-b" {
t.Fatalf("resolveResourceBindingFromSnapshots()[1] = %s/%s, want ns-b/rb-2", got[1].Namespace, got[1].Name)
}
}

func Test_conflictDetectedForConflictResolution_detectsConflict(t *testing.T) {
d := &DependenciesDistributor{}

rbs := []*workv1alpha2.ResourceBinding{
{Spec: workv1alpha2.ResourceBindingSpec{ConflictResolution: policyv1alpha1.ConflictOverwrite}},
{Spec: workv1alpha2.ResourceBindingSpec{}},
}

has := d.conflictDetectedForConflictResolution(rbs)
if !has {
t.Fatalf("conflictDetectedForConflictResolution() = false, want true")
}
}

func Test_conflictDetectedForPreserveOnDeletion_detectsConflict(t *testing.T) {
d := &DependenciesDistributor{}

rbs := []*workv1alpha2.ResourceBinding{
{Spec: workv1alpha2.ResourceBindingSpec{PreserveResourcesOnDeletion: ptr.To(true)}},
{Spec: workv1alpha2.ResourceBindingSpec{PreserveResourcesOnDeletion: ptr.To(false)}},
}

has := d.conflictDetectedForPreserveOnDeletion(rbs)
if !has {
t.Fatalf("conflictDetectedForPreserveOnDeletion() = false, want true")
}
}
Loading