Skip to content

Commit dd40212

Browse files
authored
Merge pull request #8259 from yalosev/fix/factory-start
[VPA] Use factory start to fill caches instead of separate informers
2 parents ffd18e3 + 1332499 commit dd40212

File tree

7 files changed

+37
-33
lines changed

7 files changed

+37
-33
lines changed

vertical-pod-autoscaler/pkg/admission-controller/main.go

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,17 @@ func main() {
116116
recommendationProvider := recommendation.NewProvider(limitRangeCalculator, vpa_api_util.NewCappingRecommendationProcessor(limitRangeCalculator))
117117
vpaMatcher := vpa.NewMatcher(vpaLister, targetSelectorFetcher, controllerFetcher)
118118

119+
stopCh := make(chan struct{})
120+
defer close(stopCh)
121+
factory.Start(stopCh)
122+
informerMap := factory.WaitForCacheSync(stopCh)
123+
for kind, synced := range informerMap {
124+
if !synced {
125+
klog.ErrorS(nil, fmt.Sprintf("Could not sync cache for the %s informer", kind.String()))
126+
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
127+
}
128+
}
129+
119130
hostname, err := os.Hostname()
120131
if err != nil {
121132
klog.ErrorS(err, "Unable to get hostname")
@@ -126,15 +137,13 @@ func main() {
126137
if namespace != "" {
127138
statusNamespace = namespace
128139
}
129-
stopCh := make(chan struct{})
130140
statusUpdater := status.NewUpdater(
131141
kubeClient,
132142
status.AdmissionControllerStatusName,
133143
statusNamespace,
134144
statusUpdateInterval,
135145
hostname,
136146
)
137-
defer close(stopCh)
138147

139148
calculators := []patch.Calculator{patch.NewResourceUpdatesCalculator(recommendationProvider), patch.NewObservedContainersCalculator()}
140149
as := logic.NewAdmissionServer(podPreprocessor, vpaPreprocessor, limitRangeCalculator, vpaMatcher, calculators)

vertical-pod-autoscaler/pkg/recommender/main.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package main
1919
import (
2020
"context"
2121
"flag"
22+
"fmt"
2223
"os"
2324
"strings"
2425
"time"
@@ -232,6 +233,15 @@ func run(ctx context.Context, healthCheck *metrics.HealthCheck, commonFlag *comm
232233
controllerFetcher := controllerfetcher.NewControllerFetcher(config, kubeClient, factory, scaleCacheEntryFreshnessTime, scaleCacheEntryLifetime, scaleCacheEntryJitterFactor)
233234
podLister, oomObserver := input.NewPodListerAndOOMObserver(ctx, kubeClient, commonFlag.VpaObjectNamespace, stopCh)
234235

236+
factory.Start(stopCh)
237+
informerMap := factory.WaitForCacheSync(stopCh)
238+
for kind, synced := range informerMap {
239+
if !synced {
240+
klog.ErrorS(nil, fmt.Sprintf("Could not sync cache for the %s informer", kind.String()))
241+
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
242+
}
243+
}
244+
235245
model.InitializeAggregationsConfig(model.NewAggregationsConfig(*memoryAggregationInterval, *memoryAggregationIntervalCount, *memoryHistogramDecayHalfLife, *cpuHistogramDecayHalfLife, *oomBumpUpRatio, *oomMinBumpUp))
236246

237247
useCheckpoints := *storage != "prometheus"

vertical-pod-autoscaler/pkg/target/controller_fetcher/controller_fetcher.go

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -136,17 +136,6 @@ func NewControllerFetcher(config *rest.Config, kubeClient kube_client.Interface,
136136
cronJob: factory.Batch().V1().CronJobs().Informer(),
137137
}
138138

139-
for kind, informer := range informersMap {
140-
stopCh := make(chan struct{})
141-
go informer.Run(stopCh)
142-
synced := cache.WaitForCacheSync(stopCh, informer.HasSynced)
143-
if !synced {
144-
klog.V(0).InfoS("Initial sync failed", "kind", kind)
145-
} else {
146-
klog.InfoS("Initial sync completed", "kind", kind)
147-
}
148-
}
149-
150139
scaleNamespacer := scale.New(restClient, mapper, dynamic.LegacyAPIPathResolverFunc, resolver)
151140
return &controllerFetcher{
152141
scaleNamespacer: scaleNamespacer,

vertical-pod-autoscaler/pkg/target/fetcher.go

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -91,18 +91,6 @@ func NewVpaTargetSelectorFetcher(config *rest.Config, kubeClient kube_client.Int
9191
cronJob: factory.Batch().V1().CronJobs().Informer(),
9292
}
9393

94-
for kind, informer := range informersMap {
95-
stopCh := make(chan struct{})
96-
go informer.Run(stopCh)
97-
synced := cache.WaitForCacheSync(stopCh, informer.HasSynced)
98-
if !synced {
99-
klog.ErrorS(nil, "Could not sync cache for "+string(kind))
100-
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
101-
} else {
102-
klog.InfoS("Initial sync completed", "kind", kind)
103-
}
104-
}
105-
10694
scaleNamespacer := scale.New(restClient, mapper, dynamic.LegacyAPIPathResolverFunc, resolver)
10795
return &vpaTargetSelectorFetcher{
10896
scaleNamespacer: scaleNamespacer,

vertical-pod-autoscaler/pkg/updater/main.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package main
1919
import (
2020
"context"
2121
"flag"
22+
"fmt"
2223
"os"
2324
"strings"
2425
"time"
@@ -172,6 +173,8 @@ func defaultLeaderElectionConfiguration() componentbaseconfig.LeaderElectionConf
172173
}
173174

174175
func run(healthCheck *metrics.HealthCheck, commonFlag *common.CommonFlags) {
176+
stopCh := make(chan struct{})
177+
defer close(stopCh)
175178
config := common.CreateKubeConfigOrDie(commonFlag.KubeConfig, float32(commonFlag.KubeApiQps), int(commonFlag.KubeApiBurst))
176179
kubeClient := kube_client.NewForConfigOrDie(config)
177180
vpaClient := vpa_clientset.NewForConfigOrDie(config)
@@ -184,6 +187,16 @@ func run(healthCheck *metrics.HealthCheck, commonFlag *common.CommonFlags) {
184187
klog.ErrorS(err, "Failed to create limitRangeCalculator, falling back to not checking limits")
185188
limitRangeCalculator = limitrange.NewNoopLimitsCalculator()
186189
}
190+
191+
factory.Start(stopCh)
192+
informerMap := factory.WaitForCacheSync(stopCh)
193+
for kind, synced := range informerMap {
194+
if !synced {
195+
klog.ErrorS(nil, fmt.Sprintf("Could not sync cache for the %s informer", kind.String()))
196+
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
197+
}
198+
}
199+
187200
admissionControllerStatusNamespace := status.AdmissionControllerStatusNamespace
188201
if namespace != "" {
189202
admissionControllerStatusNamespace = namespace

vertical-pod-autoscaler/pkg/utils/limitrange/limit_range_calculator.go

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ import (
2424
"k8s.io/apimachinery/pkg/labels"
2525
"k8s.io/client-go/informers"
2626
listers "k8s.io/client-go/listers/core/v1"
27-
"k8s.io/client-go/tools/cache"
2827
)
2928

3029
// LimitRangeCalculator calculates limit range items that has the same effect as all limit range items present in the cluster.
@@ -55,13 +54,6 @@ func NewLimitsRangeCalculator(f informers.SharedInformerFactory) (*limitsChecker
5554
return nil, fmt.Errorf("NewLimitsRangeCalculator requires a SharedInformerFactory but got nil")
5655
}
5756
limitRangeLister := f.Core().V1().LimitRanges().Lister()
58-
stopCh := make(chan struct{})
59-
informer := f.Core().V1().LimitRanges().Informer()
60-
go informer.Run(stopCh)
61-
ok := cache.WaitForCacheSync(stopCh, informer.HasSynced)
62-
if !ok {
63-
return nil, fmt.Errorf("informer did not sync")
64-
}
6557
return &limitsChecker{limitRangeLister}, nil
6658
}
6759

vertical-pod-autoscaler/pkg/utils/limitrange/limit_range_calculator_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,9 @@ func TestGetContainerLimitRangeItem(t *testing.T) {
138138
cs := fake.NewSimpleClientset(tc.limitRanges...)
139139
factory := informers.NewSharedInformerFactory(cs, 0)
140140
lc, err := NewLimitsRangeCalculator(factory)
141+
142+
factory.Start(t.Context().Done())
143+
_ = factory.WaitForCacheSync(t.Context().Done())
141144
if assert.NoError(t, err) {
142145
limitRange, err := lc.GetContainerLimitRangeItem(testNamespace)
143146
if tc.expectedErr == nil {

0 commit comments

Comments
 (0)