Skip to content

Commit 82f5378

Browse files
authored
Adding CancelDrainTask to ASG Termination Events to close generated heartbeat when failing to process event (#1173)
1 parent 5641359 commit 82f5378

File tree

6 files changed

+59
-7
lines changed

6 files changed

+59
-7
lines changed

pkg/interruptionevent/draincordon/handler.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,9 @@ func (h *Handler) HandleEvent(drainEvent *monitor.InterruptionEvent) error {
111111
}
112112

113113
if err != nil {
114+
if drainEvent.CancelDrainTask != nil {
115+
h.commonHandler.RunCancelDrainTask(nodeName, drainEvent)
116+
}
114117
h.commonHandler.InterruptionEventStore.CancelInterruptionEvent(drainEvent.EventID)
115118
} else {
116119
h.commonHandler.InterruptionEventStore.MarkAllAsProcessed(nodeName)

pkg/interruptionevent/internal/common/handler.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,16 @@ func (h *Handler) RunPreDrainTask(nodeName string, drainEvent *monitor.Interrupt
5555
h.Metrics.NodeActionsInc("pre-drain", nodeName, drainEvent.EventID, err)
5656
}
5757

58+
func (h *Handler) RunCancelDrainTask(nodeName string, drainEvent *monitor.InterruptionEvent) {
59+
err := drainEvent.CancelDrainTask(*drainEvent, h.Node)
60+
if err != nil {
61+
log.Err(err).Msg("There was a problem executing the early exit task")
62+
h.Recorder.Emit(nodeName, observability.Warning, observability.CancelDrainErrReason, observability.CancelDrainErrMsgFmt, err.Error())
63+
} else {
64+
h.Recorder.Emit(nodeName, observability.Normal, observability.CancelDrainReason, observability.CancelDrainMsg)
65+
}
66+
}
67+
5868
func (h *Handler) RunPostDrainTask(nodeName string, drainEvent *monitor.InterruptionEvent) {
5969
err := drainEvent.PostDrainTask(*drainEvent, h.Node)
6070
if err != nil {

pkg/monitor/sqsevent/asg-lifecycle-event.go

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ func (m SQSMonitor) asgTerminationToInterruptionEvent(event *EventBridgeEvent, m
9999
}
100100

101101
stopHeartbeatCh := make(chan struct{})
102+
cancelHeartbeatCh := make(chan struct{})
102103

103104
interruptionEvent.PostDrainTask = func(interruptionEvent monitor.InterruptionEvent, _ node.Node) error {
104105

@@ -111,13 +112,18 @@ func (m SQSMonitor) asgTerminationToInterruptionEvent(event *EventBridgeEvent, m
111112
close(stopHeartbeatCh)
112113
return m.deleteMessage(message)
113114
}
115+
116+
interruptionEvent.CancelDrainTask = func(_ monitor.InterruptionEvent, _ node.Node) error {
117+
close(cancelHeartbeatCh)
118+
return nil
119+
}
114120

115121
interruptionEvent.PreDrainTask = func(interruptionEvent monitor.InterruptionEvent, n node.Node) error {
116122
nthConfig := n.GetNthConfig()
117123
// If only HeartbeatInterval is set, HeartbeatUntil will default to 172800.
118124
if nthConfig.HeartbeatInterval != -1 && nthConfig.HeartbeatUntil != -1 {
119125
go m.checkHeartbeatTimeout(nthConfig.HeartbeatInterval, lifecycleDetail)
120-
go m.SendHeartbeats(nthConfig.HeartbeatInterval, nthConfig.HeartbeatUntil, lifecycleDetail, stopHeartbeatCh)
126+
go m.SendHeartbeats(nthConfig.HeartbeatInterval, nthConfig.HeartbeatUntil, lifecycleDetail, stopHeartbeatCh, cancelHeartbeatCh)
121127
}
122128

123129
err := n.TaintASGLifecycleTermination(interruptionEvent.NodeName, interruptionEvent.EventID)
@@ -167,13 +173,20 @@ func (m SQSMonitor) checkHeartbeatTimeout(heartbeatInterval int, lifecycleDetail
167173
}
168174

169175
// Issue lifecycle heartbeats to reset the heartbeat timeout timer in ASG
170-
func (m SQSMonitor) SendHeartbeats(heartbeatInterval int, heartbeatUntil int, lifecycleDetail *LifecycleDetail, stopCh <-chan struct{}) {
176+
func (m SQSMonitor) SendHeartbeats(heartbeatInterval int, heartbeatUntil int, lifecycleDetail *LifecycleDetail, stopCh <-chan struct{}, cancelCh <-chan struct{}) {
171177
ticker := time.NewTicker(time.Duration(heartbeatInterval) * time.Second)
172178
defer ticker.Stop()
173179
timeout := time.After(time.Duration(heartbeatUntil) * time.Second)
174180

175181
for {
176182
select {
183+
case <-cancelCh:
184+
log.Info().Str("asgName", lifecycleDetail.AutoScalingGroupName).
185+
Str("lifecycleHookName", lifecycleDetail.LifecycleHookName).
186+
Str("lifecycleActionToken", lifecycleDetail.LifecycleActionToken).
187+
Str("instanceID", lifecycleDetail.EC2InstanceID).
188+
Msg("Failed to cordon and drain the node, stopping heartbeat")
189+
return
177190
case <-stopCh:
178191
log.Info().Str("asgName", lifecycleDetail.AutoScalingGroupName).
179192
Str("lifecycleHookName", lifecycleDetail.LifecycleHookName).

pkg/monitor/sqsevent/sqs-monitor_test.go

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,7 @@ func TestMonitor_EventBridgeSuccess(t *testing.T) {
188188
h.Equals(t, result.NodeName, dnsNodeName)
189189
h.Assert(t, result.PostDrainTask != nil, "PostDrainTask should have been set")
190190
h.Assert(t, result.PreDrainTask != nil, "PreDrainTask should have been set")
191+
if event.ID == asgLifecycleEvent.ID { h.Assert(t, result.CancelDrainTask != nil, "CancelDrainTask should have been set") }
191192
err = result.PostDrainTask(result, node.Node{})
192193
h.Ok(t, err)
193194
default:
@@ -273,6 +274,7 @@ func TestMonitor_AsgDirectToSqsSuccess(t *testing.T) {
273274
h.Equals(t, result.NodeName, dnsNodeName)
274275
h.Assert(t, result.PostDrainTask != nil, "PostDrainTask should have been set")
275276
h.Assert(t, result.PreDrainTask != nil, "PreDrainTask should have been set")
277+
h.Assert(t, result.CancelDrainTask != nil, "CancelDrainTask should have been set")
276278
err = result.PostDrainTask(result, node.Node{})
277279
h.Ok(t, err)
278280
default:
@@ -365,6 +367,7 @@ func TestMonitor_DrainTasks(t *testing.T) {
365367
h.Equals(st, result.NodeName, dnsNodeName)
366368
h.Assert(st, result.PostDrainTask != nil, "PostDrainTask should have been set")
367369
h.Assert(st, result.PreDrainTask != nil, "PreDrainTask should have been set")
370+
if event.ID == asgLifecycleEvent.ID { h.Assert(t, result.CancelDrainTask != nil, "CancelDrainTask should have been set") }
368371
err := result.PostDrainTask(result, node.Node{})
369372
h.Ok(st, err)
370373
})
@@ -466,6 +469,7 @@ func TestMonitor_DrainTasks_Errors(t *testing.T) {
466469
h.Equals(t, result.NodeName, dnsNodeName)
467470
h.Assert(t, result.PostDrainTask != nil, "PostDrainTask should have been set")
468471
h.Assert(t, result.PreDrainTask != nil, "PreDrainTask should have been set")
472+
if i == 1 { h.Assert(t, result.CancelDrainTask != nil, "CancelDrainTask should have been set") }
469473
err := result.PostDrainTask(result, node.Node{})
470474
h.Ok(t, err)
471475
default:
@@ -909,32 +913,39 @@ func TestMonitor_InstanceNotManaged(t *testing.T) {
909913
}
910914

911915
func TestSendHeartbeats_EarlyClosure(t *testing.T) {
912-
err := heartbeatTestHelper(nil, 3500, 1, 5)
916+
err := heartbeatTestHelper(nil, 3500, 1, 5, false)
913917
h.Ok(t, err)
914918
h.Assert(t, h.HeartbeatCallCount == 3, "3 Heartbeat Expected, got %d", h.HeartbeatCallCount)
915919
}
916920

917921
func TestSendHeartbeats_HeartbeatUntilExpire(t *testing.T) {
918-
err := heartbeatTestHelper(nil, 8000, 1, 5)
922+
err := heartbeatTestHelper(nil, 8000, 1, 5, false)
919923
h.Ok(t, err)
920924
h.Assert(t, h.HeartbeatCallCount == 5, "5 Heartbeat Expected, got %d", h.HeartbeatCallCount)
921925
}
922926

923927
func TestSendHeartbeats_ErrThrottlingASG(t *testing.T) {
924928
RecordLifecycleActionHeartbeatErr := awserr.New("Throttling", "Rate exceeded", nil)
925-
err := heartbeatTestHelper(RecordLifecycleActionHeartbeatErr, 8000, 1, 6)
929+
err := heartbeatTestHelper(RecordLifecycleActionHeartbeatErr, 8000, 1, 6, false)
926930
h.Ok(t, err)
927931
h.Assert(t, h.HeartbeatCallCount == 6, "6 Heartbeat Expected, got %d", h.HeartbeatCallCount)
928932
}
929933

930934
func TestSendHeartbeats_ErrInvalidTarget(t *testing.T) {
931935
RecordLifecycleActionHeartbeatErr := awserr.New("ValidationError", "No active Lifecycle Action found", nil)
932-
err := heartbeatTestHelper(RecordLifecycleActionHeartbeatErr, 6000, 1, 4)
936+
err := heartbeatTestHelper(RecordLifecycleActionHeartbeatErr, 6000, 1, 4, false)
933937
h.Ok(t, err)
934938
h.Assert(t, h.HeartbeatCallCount == 1, "1 Heartbeat Expected, got %d", h.HeartbeatCallCount)
935939
}
936940

937-
func heartbeatTestHelper(RecordLifecycleActionHeartbeatErr error, sleepMilliSeconds int, heartbeatInterval int, heartbeatUntil int) error {
941+
942+
func TestSendHeartbeats_CancelHeartbeat(t *testing.T) {
943+
err := heartbeatTestHelper(nil, 6000, 1, 4, true)
944+
h.Ok(t, err)
945+
h.Assert(t, h.HeartbeatCallCount == 2, "2 Heartbeat Expected, got %d", h.HeartbeatCallCount)
946+
}
947+
948+
func heartbeatTestHelper(RecordLifecycleActionHeartbeatErr error, sleepMilliSeconds int, heartbeatInterval int, heartbeatUntil int, cancelDrain bool) error {
938949
h.HeartbeatCallCount = 0
939950

940951
msg, err := getSQSMessageFromEvent(asgLifecycleEvent)
@@ -986,6 +997,16 @@ func heartbeatTestHelper(RecordLifecycleActionHeartbeatErr error, sleepMilliSeco
986997
return err
987998
}
988999

1000+
if cancelDrain == true {
1001+
if result.CancelDrainTask == nil {
1002+
return fmt.Errorf("CancelDrainTask should have been set")
1003+
}
1004+
time.Sleep(2100 * time.Millisecond)
1005+
if err := result.CancelDrainTask(result, *testNode); err != nil {
1006+
return err
1007+
}
1008+
}
1009+
9891010
if result.PostDrainTask == nil {
9901011
return fmt.Errorf("PostDrainTask should have been set")
9911012
}

pkg/monitor/types.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ type InterruptionEvent struct {
6161
InProgress bool
6262
PreDrainTask DrainTask `json:"-"`
6363
PostDrainTask DrainTask `json:"-"`
64+
CancelDrainTask DrainTask `json:"-"`
6465
}
6566

6667
// TimeUntilEvent returns the duration until the event start time

pkg/observability/k8s-events.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,10 @@ const (
5656
PostDrainErrMsgFmt = "There was a problem executing the post-drain task: %s"
5757
PostDrainReason = "PostDrain"
5858
PostDrainMsg = "Post-drain task successfully executed"
59+
CancelDrainErrReason = "CancelDrainError"
60+
CancelDrainErrMsgFmt = "There was a problem executing the early exit task: %s"
61+
CancelDrainReason = "CancelDrain"
62+
CancelDrainMsg = "Early exit task successfully executed"
5963
)
6064

6165
// Interruption event reasons

0 commit comments

Comments
 (0)