Skip to content

Commit f4c93f0

Browse files
committed
Support re-engaging a cluster in multicluster manager when the kubeconfig is changed.
Signed-off-by: Shingo Omura <everpeace@gmail.com>
1 parent 0f887e8 commit f4c93f0

File tree

1 file changed

+141
-55
lines changed

1 file changed

+141
-55
lines changed

providers/cluster-inventory-api/provider.go

Lines changed: 141 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package clusterinventoryapi
1919
import (
2020
"context"
2121
"fmt"
22+
"reflect"
2223
"sync"
2324
"time"
2425

@@ -30,15 +31,18 @@ import (
3031
apierrors "k8s.io/apimachinery/pkg/api/errors"
3132
"k8s.io/apimachinery/pkg/api/meta"
3233
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
34+
"k8s.io/apimachinery/pkg/types"
3335
"k8s.io/client-go/rest"
3436
"k8s.io/client-go/tools/clientcmd"
3537

3638
"sigs.k8s.io/controller-runtime/pkg/builder"
3739
"sigs.k8s.io/controller-runtime/pkg/client"
3840
"sigs.k8s.io/controller-runtime/pkg/cluster"
3941
"sigs.k8s.io/controller-runtime/pkg/controller"
42+
"sigs.k8s.io/controller-runtime/pkg/handler"
4043
"sigs.k8s.io/controller-runtime/pkg/log"
4144
"sigs.k8s.io/controller-runtime/pkg/manager"
45+
"sigs.k8s.io/controller-runtime/pkg/predicate"
4246
"sigs.k8s.io/controller-runtime/pkg/reconcile"
4347

4448
mcmanager "sigs.k8s.io/multicluster-runtime/pkg/manager"
@@ -47,15 +51,66 @@ import (
4751

4852
var _ multicluster.Provider = &Provider{}
4953

50-
var (
51-
// GetKubeConfigFromSecret is a function that fetches the kubeconfig for a ClusterProfile from Secret
52-
// It supposes that the Secret is managed by following "Push Model via Credentials in Secret" in "KEP-4322: ClusterProfile API"
53-
// ref: https://github.com/kubernetes/enhancements/blob/master/keps/sig-multicluster/4322-cluster-inventory/README.md#push-model-via-credentials-in-secret-not-recommended
54-
GetKubeConfigFromSecret = func(ctx context.Context, cli client.Client, consumerName string, clp *clusterinventoryv1alpha1.ClusterProfile) (*rest.Config, error) {
54+
const (
55+
labelKeyClusterInventoryConsumer = "x-k8s.io/cluster-inventory-consumer"
56+
labelKeyClusterProfile = "x-k8s.io/cluster-profile"
57+
)
58+
59+
// Options are the options for the Cluster-API cluster Provider.
60+
type Options struct {
61+
// ConsumerName is the name of the consumer that will use the cluster inventory API.
62+
ConsumerName string
63+
64+
// ClusterOptions are the options passed to the cluster constructor.
65+
ClusterOptions []cluster.Option
66+
67+
// GetKubeConfig is a function that returns the kubeconfig secret for a cluster profile.
68+
GetKubeConfig func(ctx context.Context, cli client.Client, clp *clusterinventoryv1alpha1.ClusterProfile) (*rest.Config, error)
69+
70+
// NewCluster is a function that creates a new cluster from a rest.Config.
71+
// The cluster will be started by the provider.
72+
NewCluster func(ctx context.Context, clp *clusterinventoryv1alpha1.ClusterProfile, cfg *rest.Config, opts ...cluster.Option) (cluster.Cluster, error)
73+
74+
// CustomWatches can add custom watches to the provider controller
75+
CustomWatches []CustomWatch
76+
}
77+
78+
// CustomWatch specifies a custom watch spec that can be added to the provider controller.
79+
type CustomWatch struct {
80+
Object client.Object
81+
EventHandler handler.TypedEventHandler[client.Object, reconcile.Request]
82+
Opts []builder.WatchesOption
83+
}
84+
85+
type index struct {
86+
object client.Object
87+
field string
88+
extractValue client.IndexerFunc
89+
}
90+
91+
// Provider is a cluster Provider that works with Cluster Inventory API.
92+
type Provider struct {
93+
opts Options
94+
log logr.Logger
95+
client client.Client
96+
97+
lock sync.RWMutex
98+
mcMgr mcmanager.Manager
99+
clusters map[string]cluster.Cluster
100+
cancelFns map[string]context.CancelFunc
101+
kubeconfig map[string]*rest.Config
102+
indexers []index
103+
}
104+
105+
// GetKubeConfigFromSecret returns a function that fetches the kubeconfig for a specified consumer for ClusterProfile from Secret
106+
// It supposes that the Secrets for ClusterProfiles are managed by following "Push Model via Credentials in Secret" in "KEP-4322: ClusterProfile API"
107+
// ref: https://github.com/kubernetes/enhancements/blob/master/keps/sig-multicluster/4322-cluster-inventory/README.md#push-model-via-credentials-in-secret-not-recommended
108+
func GetKubeConfigFromSecret(consumerName string) func(ctx context.Context, cli client.Client, clp *clusterinventoryv1alpha1.ClusterProfile) (*rest.Config, error) {
109+
return func(ctx context.Context, cli client.Client, clp *clusterinventoryv1alpha1.ClusterProfile) (*rest.Config, error) {
55110
secrets := corev1.SecretList{}
56111
if err := cli.List(ctx, &secrets, client.InNamespace(clp.Namespace), client.MatchingLabels{
57-
"x-k8s.io/cluster-inventory-consumer": consumerName,
58-
"x-k8s.io/cluster-profile": clp.Name,
112+
labelKeyClusterInventoryConsumer: consumerName,
113+
labelKeyClusterProfile: clp.Name,
59114
}); err != nil {
60115
return nil, fmt.Errorf("failed to list secrets: %w", err)
61116
}
@@ -76,27 +131,50 @@ var (
76131
}
77132
return clientcmd.RESTConfigFromKubeConfig(data)
78133
}
79-
)
80-
81-
// Options are the options for the Cluster-API cluster Provider.
82-
type Options struct {
83-
// ConsumerName is the name of the consumer that will use the cluster inventory API.
84-
ConsumerName string
134+
}
85135

86-
// ClusterOptions are the options passed to the cluster constructor.
87-
ClusterOptions []cluster.Option
136+
// WatchKubeConfigSecret returns a CustomWatch that watches for kubeconfig secrets for specified consumer of ClusterProfile
137+
// It supposes that the Secrets for ClusterProfiles are managed by following "Push Model via Credentials in Secret" in "KEP-4322: ClusterProfile API"
138+
// ref: https://github.com/kubernetes/enhancements/blob/master/keps/sig-multicluster/4322-cluster-inventory/README.md#push-model-via-credentials-in-secret-not-recommended
139+
func WatchKubeConfigSecret(consumerName string) CustomWatch {
140+
return CustomWatch{
141+
Object: &corev1.Secret{},
142+
EventHandler: handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, obj client.Object) []reconcile.Request {
143+
secret, ok := obj.(*corev1.Secret)
144+
if !ok {
145+
return nil
146+
}
88147

89-
// GetKubeConfig is a function that returns the kubeconfig secret for a cluster profile.
90-
GetKubeConfig func(ctx context.Context, cli client.Client, consumerName string, clp *clusterinventoryv1alpha1.ClusterProfile) (*rest.Config, error)
148+
if secret.GetLabels() == nil ||
149+
secret.GetLabels()[labelKeyClusterInventoryConsumer] != consumerName ||
150+
secret.GetLabels()[labelKeyClusterProfile] == "" {
151+
return nil
152+
}
91153

92-
// NewCluster is a function that creates a new cluster from a rest.Config.
93-
// The cluster will be started by the provider.
94-
NewCluster func(ctx context.Context, clp *clusterinventoryv1alpha1.ClusterProfile, cfg *rest.Config, opts ...cluster.Option) (cluster.Cluster, error)
154+
return []reconcile.Request{{
155+
NamespacedName: types.NamespacedName{
156+
Namespace: secret.GetNamespace(),
157+
Name: secret.GetLabels()[labelKeyClusterProfile],
158+
},
159+
}}
160+
}),
161+
Opts: []builder.WatchesOption{
162+
builder.WithPredicates(predicate.NewPredicateFuncs(func(object client.Object) bool {
163+
secret, ok := object.(*corev1.Secret)
164+
if !ok {
165+
return false
166+
}
167+
return secret.GetLabels()[labelKeyClusterInventoryConsumer] == consumerName &&
168+
secret.GetLabels()[labelKeyClusterProfile] != ""
169+
})),
170+
},
171+
}
95172
}
96173

97174
func setDefaults(opts *Options, cli client.Client) {
98175
if opts.GetKubeConfig == nil {
99-
opts.GetKubeConfig = GetKubeConfigFromSecret
176+
opts.GetKubeConfig = GetKubeConfigFromSecret(opts.ConsumerName)
177+
opts.CustomWatches = append(opts.CustomWatches, WatchKubeConfigSecret(opts.ConsumerName))
100178
}
101179
if opts.NewCluster == nil {
102180
opts.NewCluster = func(ctx context.Context, clp *clusterinventoryv1alpha1.ClusterProfile, cfg *rest.Config, opts ...cluster.Option) (cluster.Cluster, error) {
@@ -108,44 +186,38 @@ func setDefaults(opts *Options, cli client.Client) {
108186
// New creates a new Cluster Inventory API cluster Provider.
109187
func New(localMgr manager.Manager, opts Options) (*Provider, error) {
110188
p := &Provider{
111-
opts: opts,
112-
log: log.Log.WithName("cluster-inventory-api-cluster-provider"),
113-
client: localMgr.GetClient(),
114-
clusters: map[string]cluster.Cluster{},
115-
cancelFns: map[string]context.CancelFunc{},
189+
opts: opts,
190+
log: log.Log.WithName("cluster-inventory-api-cluster-provider"),
191+
client: localMgr.GetClient(),
192+
clusters: map[string]cluster.Cluster{},
193+
cancelFns: map[string]context.CancelFunc{},
194+
kubeconfig: map[string]*rest.Config{},
116195
}
117196

118197
setDefaults(&p.opts, p.client)
119198

120-
if err := builder.ControllerManagedBy(localMgr).
199+
// Create a controller builder
200+
controllerBuilder := builder.ControllerManagedBy(localMgr).
121201
For(&clusterinventoryv1alpha1.ClusterProfile{}).
122-
WithOptions(controller.Options{MaxConcurrentReconciles: 1}). // no prallelism.
123-
Complete(p); err != nil {
202+
WithOptions(controller.Options{MaxConcurrentReconciles: 1}) // no parallelism.
203+
204+
// Apply any custom watches provided by the user
205+
for _, customWatch := range p.opts.CustomWatches {
206+
controllerBuilder.Watches(
207+
customWatch.Object,
208+
customWatch.EventHandler,
209+
customWatch.Opts...,
210+
)
211+
}
212+
213+
// Complete the controller setup
214+
if err := controllerBuilder.Complete(p); err != nil {
124215
return nil, fmt.Errorf("failed to create controller: %w", err)
125216
}
126217

127218
return p, nil
128219
}
129220

130-
type index struct {
131-
object client.Object
132-
field string
133-
extractValue client.IndexerFunc
134-
}
135-
136-
// Provider is a cluster Provider that works with Cluster Inventory API.
137-
type Provider struct {
138-
opts Options
139-
log logr.Logger
140-
client client.Client
141-
142-
lock sync.RWMutex
143-
mcMgr mcmanager.Manager
144-
clusters map[string]cluster.Cluster
145-
cancelFns map[string]context.CancelFunc
146-
indexers []index
147-
}
148-
149221
// Get returns the cluster with the given name, if it is known.
150222
func (p *Provider) Get(_ context.Context, clusterName string) (cluster.Cluster, error) {
151223
p.lock.RLock()
@@ -206,12 +278,6 @@ func (p *Provider) Reconcile(ctx context.Context, req reconcile.Request) (reconc
206278
return reconcile.Result{RequeueAfter: time.Second * 2}, nil
207279
}
208280

209-
// already engaged?
210-
if _, ok := p.clusters[key]; ok {
211-
log.Info("ClusterProfile already engaged")
212-
return reconcile.Result{}, nil
213-
}
214-
215281
// ready?
216282
controlPlaneHealthyCondition := meta.FindStatusCondition(clp.Status.Conditions, clusterinventoryv1alpha1.ClusterConditionControlPlaneHealthy)
217283
if controlPlaneHealthyCondition == nil || controlPlaneHealthyCondition.Status != metav1.ConditionTrue {
@@ -220,12 +286,30 @@ func (p *Provider) Reconcile(ctx context.Context, req reconcile.Request) (reconc
220286
}
221287

222288
// get kubeconfig
223-
cfg, err := p.opts.GetKubeConfig(ctx, p.client, p.opts.ConsumerName, clp)
289+
cfg, err := p.opts.GetKubeConfig(ctx, p.client, clp)
224290
if err != nil {
225291
log.Error(err, "Failed to get kubeconfig for ClusterProfile")
226292
return reconcile.Result{}, fmt.Errorf("failed to get kubeconfig for ClusterProfile=%s: %w", key, err)
227293
}
228294

295+
// already engaged and kubeconfig is not changed?
296+
if _, ok := p.clusters[key]; ok {
297+
if p.kubeconfig[key] != nil && reflect.DeepEqual(p.kubeconfig[key], cfg) {
298+
log.Info("ClusterProfile already engaged and kubeconfig is unchanged, skipping")
299+
return reconcile.Result{}, nil
300+
}
301+
302+
log.Info("ClusterProfile already engaged but kubeconfig is changed, re-engaging the ClusterProfile")
303+
// disengage existing cluster first if it exists.
304+
if cancel, ok := p.cancelFns[key]; ok {
305+
log.V(3).Info("Cancelling existing context for ClusterProfile")
306+
cancel()
307+
delete(p.clusters, key)
308+
delete(p.cancelFns, key)
309+
delete(p.kubeconfig, key)
310+
}
311+
}
312+
229313
// create cluster.
230314
cl, err := p.opts.NewCluster(ctx, clp, cfg, p.opts.ClusterOptions...)
231315
if err != nil {
@@ -253,6 +337,7 @@ func (p *Provider) Reconcile(ctx context.Context, req reconcile.Request) (reconc
253337
// remember.
254338
p.clusters[key] = cl
255339
p.cancelFns[key] = cancel
340+
p.kubeconfig[key] = cfg
256341

257342
log.Info("Added new cluster for ClusterProfile")
258343

@@ -261,6 +346,7 @@ func (p *Provider) Reconcile(ctx context.Context, req reconcile.Request) (reconc
261346
log.Error(err, "failed to engage manager for ClusterProfile")
262347
delete(p.clusters, key)
263348
delete(p.cancelFns, key)
349+
delete(p.kubeconfig, key)
264350
return reconcile.Result{}, err
265351
}
266352

0 commit comments

Comments
 (0)