Skip to content

Commit a3fb18b

Browse files
committed
Use factory start to fill caches instead of separate informers
Signed-off-by: Yuriy Losev <yuriy.losev@flant.com>
1 parent ffd18e3 commit a3fb18b

File tree

6 files changed

+29
-33
lines changed

6 files changed

+29
-33
lines changed

vertical-pod-autoscaler/pkg/admission-controller/main.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,16 @@ func main() {
116116
recommendationProvider := recommendation.NewProvider(limitRangeCalculator, vpa_api_util.NewCappingRecommendationProcessor(limitRangeCalculator))
117117
vpaMatcher := vpa.NewMatcher(vpaLister, targetSelectorFetcher, controllerFetcher)
118118

119+
stopCh := make(chan struct{})
120+
defer close(stopCh)
121+
factory.Start(stopCh)
122+
informerMap := factory.WaitForCacheSync(stopCh)
123+
for informerType, synced := range informerMap {
124+
if !synced {
125+
klog.V(0).InfoS("Initial sync failed", "kind", informerType)
126+
}
127+
}
128+
119129
hostname, err := os.Hostname()
120130
if err != nil {
121131
klog.ErrorS(err, "Unable to get hostname")
@@ -126,15 +136,13 @@ func main() {
126136
if namespace != "" {
127137
statusNamespace = namespace
128138
}
129-
stopCh := make(chan struct{})
130139
statusUpdater := status.NewUpdater(
131140
kubeClient,
132141
status.AdmissionControllerStatusName,
133142
statusNamespace,
134143
statusUpdateInterval,
135144
hostname,
136145
)
137-
defer close(stopCh)
138146

139147
calculators := []patch.Calculator{patch.NewResourceUpdatesCalculator(recommendationProvider), patch.NewObservedContainersCalculator()}
140148
as := logic.NewAdmissionServer(podPreprocessor, vpaPreprocessor, limitRangeCalculator, vpaMatcher, calculators)

vertical-pod-autoscaler/pkg/recommender/main.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,14 @@ func run(ctx context.Context, healthCheck *metrics.HealthCheck, commonFlag *comm
232232
controllerFetcher := controllerfetcher.NewControllerFetcher(config, kubeClient, factory, scaleCacheEntryFreshnessTime, scaleCacheEntryLifetime, scaleCacheEntryJitterFactor)
233233
podLister, oomObserver := input.NewPodListerAndOOMObserver(ctx, kubeClient, commonFlag.VpaObjectNamespace, stopCh)
234234

235+
factory.Start(stopCh)
236+
informerMap := factory.WaitForCacheSync(stopCh)
237+
for informerType, synced := range informerMap {
238+
if !synced {
239+
klog.V(0).InfoS("Initial sync failed", "kind", informerType)
240+
}
241+
}
242+
235243
model.InitializeAggregationsConfig(model.NewAggregationsConfig(*memoryAggregationInterval, *memoryAggregationIntervalCount, *memoryHistogramDecayHalfLife, *cpuHistogramDecayHalfLife, *oomBumpUpRatio, *oomMinBumpUp))
236244

237245
useCheckpoints := *storage != "prometheus"

vertical-pod-autoscaler/pkg/target/controller_fetcher/controller_fetcher.go

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -136,17 +136,6 @@ func NewControllerFetcher(config *rest.Config, kubeClient kube_client.Interface,
136136
cronJob: factory.Batch().V1().CronJobs().Informer(),
137137
}
138138

139-
for kind, informer := range informersMap {
140-
stopCh := make(chan struct{})
141-
go informer.Run(stopCh)
142-
synced := cache.WaitForCacheSync(stopCh, informer.HasSynced)
143-
if !synced {
144-
klog.V(0).InfoS("Initial sync failed", "kind", kind)
145-
} else {
146-
klog.InfoS("Initial sync completed", "kind", kind)
147-
}
148-
}
149-
150139
scaleNamespacer := scale.New(restClient, mapper, dynamic.LegacyAPIPathResolverFunc, resolver)
151140
return &controllerFetcher{
152141
scaleNamespacer: scaleNamespacer,

vertical-pod-autoscaler/pkg/target/fetcher.go

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -91,18 +91,6 @@ func NewVpaTargetSelectorFetcher(config *rest.Config, kubeClient kube_client.Int
9191
cronJob: factory.Batch().V1().CronJobs().Informer(),
9292
}
9393

94-
for kind, informer := range informersMap {
95-
stopCh := make(chan struct{})
96-
go informer.Run(stopCh)
97-
synced := cache.WaitForCacheSync(stopCh, informer.HasSynced)
98-
if !synced {
99-
klog.ErrorS(nil, "Could not sync cache for "+string(kind))
100-
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
101-
} else {
102-
klog.InfoS("Initial sync completed", "kind", kind)
103-
}
104-
}
105-
10694
scaleNamespacer := scale.New(restClient, mapper, dynamic.LegacyAPIPathResolverFunc, resolver)
10795
return &vpaTargetSelectorFetcher{
10896
scaleNamespacer: scaleNamespacer,

vertical-pod-autoscaler/pkg/updater/main.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,8 @@ func defaultLeaderElectionConfiguration() componentbaseconfig.LeaderElectionConf
172172
}
173173

174174
func run(healthCheck *metrics.HealthCheck, commonFlag *common.CommonFlags) {
175+
stopCh := make(chan struct{})
176+
defer close(stopCh)
175177
config := common.CreateKubeConfigOrDie(commonFlag.KubeConfig, float32(commonFlag.KubeApiQps), int(commonFlag.KubeApiBurst))
176178
kubeClient := kube_client.NewForConfigOrDie(config)
177179
vpaClient := vpa_clientset.NewForConfigOrDie(config)
@@ -184,6 +186,15 @@ func run(healthCheck *metrics.HealthCheck, commonFlag *common.CommonFlags) {
184186
klog.ErrorS(err, "Failed to create limitRangeCalculator, falling back to not checking limits")
185187
limitRangeCalculator = limitrange.NewNoopLimitsCalculator()
186188
}
189+
190+
factory.Start(stopCh)
191+
informerMap := factory.WaitForCacheSync(stopCh)
192+
for informerType, synced := range informerMap {
193+
if !synced {
194+
klog.V(0).InfoS("Initial sync failed", "kind", informerType)
195+
}
196+
}
197+
187198
admissionControllerStatusNamespace := status.AdmissionControllerStatusNamespace
188199
if namespace != "" {
189200
admissionControllerStatusNamespace = namespace

vertical-pod-autoscaler/pkg/utils/limitrange/limit_range_calculator.go

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ import (
2424
"k8s.io/apimachinery/pkg/labels"
2525
"k8s.io/client-go/informers"
2626
listers "k8s.io/client-go/listers/core/v1"
27-
"k8s.io/client-go/tools/cache"
2827
)
2928

3029
// LimitRangeCalculator calculates limit range items that has the same effect as all limit range items present in the cluster.
@@ -55,13 +54,6 @@ func NewLimitsRangeCalculator(f informers.SharedInformerFactory) (*limitsChecker
5554
return nil, fmt.Errorf("NewLimitsRangeCalculator requires a SharedInformerFactory but got nil")
5655
}
5756
limitRangeLister := f.Core().V1().LimitRanges().Lister()
58-
stopCh := make(chan struct{})
59-
informer := f.Core().V1().LimitRanges().Informer()
60-
go informer.Run(stopCh)
61-
ok := cache.WaitForCacheSync(stopCh, informer.HasSynced)
62-
if !ok {
63-
return nil, fmt.Errorf("informer did not sync")
64-
}
6557
return &limitsChecker{limitRangeLister}, nil
6658
}
6759

0 commit comments

Comments
 (0)