Skip to content

Commit 9519fdb

Browse files
authored
fix(cluster): add a check for the unfinished update (#539)
* fix(cluster): add a check for the unfinished update * fix(cluster): The lebel switching time is too long when updating the pods add timeout for get sqlrunner.
1 parent 5be7c46 commit 9519fdb

File tree

3 files changed

+82
-24
lines changed

3 files changed

+82
-24
lines changed

internal/sql_runner.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,15 @@ func NewSQLRunner(cfg *Config, errs ...error) (SQLRunner, closeFunc, error) {
139139
if err != nil {
140140
return nil, close, err
141141
}
142+
db.SetConnMaxIdleTime(10 * time.Second)
143+
db.SetConnMaxLifetime(1 * time.Minute)
144+
if err := db.Ping(); err != nil {
145+
internalLog.V(1).Info("failed to ping mysql", "error", err)
146+
if cErr := db.Close(); cErr != nil {
147+
internalLog.Error(cErr, "failed closing the database connection")
148+
}
149+
return nil, close, err
150+
}
142151

143152
// Close connection function.
144153
close = func() {

mysqlcluster/syncer/statefulset.go

Lines changed: 45 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ import (
2323
"time"
2424

2525
"github.com/go-logr/logr"
26-
"github.com/go-test/deep"
2726
"github.com/iancoleman/strcase"
2827
"github.com/imdario/mergo"
2928
"github.com/presslabs/controller-util/mergo/transformers"
@@ -34,7 +33,9 @@ import (
3433
"k8s.io/apimachinery/pkg/api/equality"
3534
k8serrors "k8s.io/apimachinery/pkg/api/errors"
3635
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
36+
"k8s.io/apimachinery/pkg/labels"
3737
"k8s.io/apimachinery/pkg/runtime"
38+
"k8s.io/apimachinery/pkg/selection"
3839
"k8s.io/apimachinery/pkg/types"
3940
"k8s.io/apimachinery/pkg/util/wait"
4041
"sigs.k8s.io/controller-runtime/pkg/client"
@@ -277,17 +278,18 @@ func (s *StatefulSetSyncer) createOrUpdate(ctx context.Context) (controllerutil.
277278
}
278279
// Check if statefulset changed.
279280
if !s.sfsUpdated(existing) {
280-
return controllerutil.OperationResultNone, nil
281-
}
282-
283-
// If changed, update statefulset.
284-
if err := s.cli.Update(ctx, s.sfs); err != nil {
285-
return controllerutil.OperationResultNone, err
286-
}
287-
// Need roll update.
288-
if !equality.Semantic.DeepEqual(existing.Spec.Template, s.sfs.Spec.Template) {
289-
s.log.Info("update statefulset pods", "name", s.Name, "diff", deep.Equal(existing.Spec.Template, s.sfs.Spec.Template))
290-
// Update every pods of statefulset.
281+
if s.podsAllUpdated(ctx) {
282+
return controllerutil.OperationResultNone, nil
283+
} else {
284+
if err := s.updatePod(ctx); err != nil {
285+
return controllerutil.OperationResultNone, err
286+
}
287+
}
288+
} else {
289+
// If changed, update statefulset.
290+
if err := s.cli.Update(ctx, s.sfs); err != nil {
291+
return controllerutil.OperationResultNone, err
292+
}
291293
if err := s.updatePod(ctx); err != nil {
292294
return controllerutil.OperationResultNone, err
293295
}
@@ -302,7 +304,7 @@ func (s *StatefulSetSyncer) createOrUpdate(ctx context.Context) (controllerutil.
302304
// updatePod update the pods, update follower nodes first.
303305
// This can reduce the number of master-slave switching during the update process.
304306
func (s *StatefulSetSyncer) updatePod(ctx context.Context) error {
305-
// updatedRevision will not update with the currentRevision when using `onDelete`.
307+
// currentRevision will not update with the updatedRevision when using `onDelete`.
306308
// https://github.com/kubernetes/kubernetes/pull/106059
307309
if s.sfs.Status.UpdatedReplicas == *s.sfs.Spec.Replicas {
308310
return nil
@@ -333,7 +335,7 @@ func (s *StatefulSetSyncer) updatePod(ctx context.Context) error {
333335
var leaderPod corev1.Pod
334336
for _, pod := range pods.Items {
335337
// Check if the pod is healthy.
336-
err := wait.PollImmediate(time.Second*2, time.Minute, func() (bool, error) {
338+
err := wait.PollImmediate(time.Second*2, time.Second*30, func() (bool, error) {
337339
s.cli.Get(ctx, client.ObjectKeyFromObject(&pod), &pod)
338340
if pod.ObjectMeta.Labels["healthy"] == "yes" {
339341
return true, nil
@@ -489,14 +491,17 @@ func (s *StatefulSetSyncer) updatePVC(ctx context.Context) error {
489491
}
490492

491493
func (s *StatefulSetSyncer) applyNWait(ctx context.Context, pod *corev1.Pod) error {
494+
if s.sfs.Status.UpdateRevision == "" {
495+
return fmt.Errorf("update revision is empty")
496+
}
492497
// Check version, if not latest, delete node.
493498
if pod.ObjectMeta.Labels["controller-revision-hash"] == s.sfs.Status.UpdateRevision {
494499
s.log.Info("pod is already updated", "pod name", pod.Name)
495500
} else {
496501
s.Status.State = apiv1alpha1.ClusterUpdateState
497-
s.log.Info("updating pod", "pod", pod.Name, "key", s.Unwrap())
502+
s.log.Info("updating pod", "pod", pod.Name)
498503
// Try to delete pod and wait for pod restart.
499-
err := wait.PollImmediate(time.Second*5, time.Minute*5, func() (bool, error) {
504+
err := wait.PollImmediate(time.Second*5, time.Minute*2, func() (bool, error) {
500505
if err := s.cli.Get(ctx, types.NamespacedName{Name: pod.Name, Namespace: pod.Namespace}, pod); err != nil {
501506
return false, nil
502507
}
@@ -585,7 +590,30 @@ func (s *StatefulSetSyncer) backupIsRunning(ctx context.Context) (bool, error) {
585590

586591
// Updates to statefulset spec for fields other than 'replicas', 'template', and 'updateStrategy' are forbidden.
587592
func (s *StatefulSetSyncer) sfsUpdated(existing *appsv1.StatefulSet) bool {
588-
return existing.Spec.Replicas != s.sfs.Spec.Replicas ||
593+
return *existing.Spec.Replicas != *s.sfs.Spec.Replicas ||
589594
!equality.Semantic.DeepEqual(existing.Spec.Template, s.sfs.Spec.Template) ||
590595
existing.Spec.UpdateStrategy != s.sfs.Spec.UpdateStrategy
591596
}
597+
598+
func (s *StatefulSetSyncer) podsAllUpdated(ctx context.Context) bool {
599+
podlist := corev1.PodList{}
600+
labelSelector := s.GetLabels().AsSelector()
601+
// Find the pods that revision is old.
602+
r, err := labels.NewRequirement("controller-revision-hash", selection.NotEquals, []string{s.sfs.Status.UpdateRevision})
603+
if err != nil {
604+
s.log.V(1).Info("failed to create label requirement", "error", err)
605+
return false
606+
}
607+
labelSelector = labelSelector.Add(*r)
608+
if err := s.cli.List(ctx,
609+
&podlist,
610+
&client.ListOptions{
611+
Namespace: s.sfs.Namespace,
612+
LabelSelector: labelSelector,
613+
},
614+
); err != nil {
615+
s.log.V(1).Info("failed to list pods", "error", err)
616+
return false
617+
}
618+
return len(podlist.Items) == 0
619+
}

mysqlcluster/syncer/status.go

Lines changed: 28 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,7 @@ func (s *StatusSyncer) AutoRebuild(ctx context.Context, pod *corev1.Pod) error {
232232

233233
// updateNodeStatus update the node status.
234234
func (s *StatusSyncer) updateNodeStatus(ctx context.Context, cli client.Client, pods []corev1.Pod) error {
235+
closeCh := make(chan func())
235236
for _, pod := range pods {
236237
podName := pod.Name
237238
host := fmt.Sprintf("%s.%s.%s", podName, s.GetNameForResource(utils.HeadlessSVC), s.Namespace)
@@ -245,13 +246,33 @@ func (s *StatusSyncer) updateNodeStatus(ctx context.Context, cli client.Client,
245246
}
246247

247248
isLagged, isReplicating, isReadOnly := corev1.ConditionUnknown, corev1.ConditionUnknown, corev1.ConditionUnknown
248-
sqlRunner, closeConn, err := s.SQLRunnerFactory(internal.NewConfigFromClusterKey(
249-
s.cli, s.MysqlCluster.GetClusterKey(), utils.OperatorUser, host))
250-
defer closeConn()
251-
if err != nil {
252-
s.log.V(1).Info("failed to connect the mysql", "node", node.Name, "error", err)
253-
node.Message = err.Error()
254-
} else {
249+
var sqlRunner internal.SQLRunner
250+
var closeConn func()
251+
errCh := make(chan error)
252+
go func(sqlRunner *internal.SQLRunner, errCh chan error, closeCh chan func()) {
253+
var err error
254+
*sqlRunner, closeConn, err = s.SQLRunnerFactory(internal.NewConfigFromClusterKey(
255+
s.cli, s.MysqlCluster.GetClusterKey(), utils.OperatorUser, host))
256+
if err != nil {
257+
s.log.V(1).Info("failed to get sql runner", "node", node.Name, "error", err)
258+
errCh <- err
259+
return
260+
}
261+
if closeConn != nil {
262+
closeCh <- closeConn
263+
return
264+
}
265+
errCh <- nil
266+
}(&sqlRunner, errCh, closeCh)
267+
268+
var err error
269+
select {
270+
case <-errCh:
271+
case closeConn := <-closeCh:
272+
defer closeConn()
273+
case <-time.After(time.Second * 5):
274+
}
275+
if sqlRunner != nil {
255276
isLagged, isReplicating, err = internal.CheckSlaveStatusWithRetry(sqlRunner, checkNodeStatusRetry)
256277
if err != nil {
257278
s.log.V(1).Info("failed to check slave status", "node", node.Name, "error", err)

0 commit comments

Comments
 (0)