Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 18 additions & 9 deletions golang/scale.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@ package main
import (
"context"
"encoding/json"
"errors"
"fmt"
"math/rand"
"os"
"strconv"
"time"
"strings"
"errors"
"time"

"github.com/aws/aws-lambda-go/events"
"github.com/aws/aws-lambda-go/lambda"
Expand Down Expand Up @@ -85,6 +85,7 @@ func handleRequest(_ context.Context, snsEvent events.SNSEvent) {
var datapointsRequiredScaleUp, datapointsRequiredScaleDown int64
var upThreshold, downThreshold float64
var scaleDownMinIterAgeMins int64
var minShardCount int64
var dryRun = true
// Note: Investigate envconfig (https://github.com/kelseyhightower/envconfig) to simplify this environment variable section to have less boilerplate.
periodMins, err := strconv.ParseInt(os.Getenv("SCALE_PERIOD_MINS"), 10, 64)
Expand Down Expand Up @@ -175,6 +176,14 @@ func handleRequest(_ context.Context, snsEvent events.SNSEvent) {
logger.WithError(err).Error(logMessage)
errorHandler(err, logMessage, "", false)
}
minShardCount, err = strconv.ParseInt(os.Getenv("MIN_SHARD_COUNT"), 10, 64)
if err != nil {
// Default minimum shard count
minShardCount = 1
logMessage := "Error reading the MIN_SHARD_COUNT environment variable. Stream will scale down to a minimum of 1 shard."
logger.WithError(err).Error(logMessage)
errorHandler(err, logMessage, "", false)
}
dryRun, err = strconv.ParseBool(os.Getenv("DRY_RUN"))
if err != nil {
// Default dryRun value is true. In case of error while reading this variable, dryRun will be set to true to be on the safer side.
Expand Down Expand Up @@ -216,7 +225,7 @@ func handleRequest(_ context.Context, snsEvent events.SNSEvent) {
}
scaleUpAlarmName, scaleDownAlarmName, currentAlarmAction, lastScaledTimestamp = parseAlarmNameAndTags(*response, currentAlarmName)
logger = logger.WithField("ScaleAction", currentAlarmAction)
if (currentAlarmAction == "") {
if currentAlarmAction == "" {
logMessage := fmt.Sprintf("Scaling event was rejected. Could not parse triggering alarm name (%s), should end in -scale-up or -scale-down", currentAlarmName)
err = errors.New(logMessage)
logger.WithError(err).Error(logMessage)
Expand Down Expand Up @@ -261,7 +270,7 @@ func handleRequest(_ context.Context, snsEvent events.SNSEvent) {
return
}
currentShardCount = *((*streamSummary.StreamDescriptionSummary).OpenShardCount)
newShardCount, downThreshold = calculateNewShardCount(currentAlarmAction, downThreshold, currentShardCount)
newShardCount, downThreshold = calculateNewShardCount(currentAlarmAction, downThreshold, currentShardCount, minShardCount)
logger = logger.WithField("CurrentShardCount", currentShardCount).WithField("TargetShardCount", newShardCount)
if dryRun {
logger.Info("This is dry run. Will not scale the stream.")
Expand Down Expand Up @@ -582,7 +591,7 @@ func setAlarmState(alarmName string, state string, reason string) (*cloudwatch.S
// scaleAction: The scaling action. Possible values are Up and Down
// downThreshold: The current scaling down threshold. This will be set to -1.0 if the new shard count turns out to be 1
// currentShardCount: The current open shards in the Kinesis stream
func calculateNewShardCount(scaleAction string, downThreshold float64, currentShardCount int64) (int64, float64) {
func calculateNewShardCount(scaleAction string, downThreshold float64, currentShardCount int64, minShardCount int64) (int64, float64) {
var targetShardCount int64
if scaleAction == "Up" {
targetShardCount = currentShardCount * 2
Expand All @@ -591,8 +600,8 @@ func calculateNewShardCount(scaleAction string, downThreshold float64, currentSh
if scaleAction == "Down" {
targetShardCount = currentShardCount / 2
// Set to minimum shard count
if targetShardCount <= 1 {
targetShardCount = 1
if targetShardCount <= minShardCount {
targetShardCount = minShardCount
// At minimum shard count,set the scale down threshold to -1, so that scale down alarm remains in OK state
downThreshold = -1.0
}
Expand Down Expand Up @@ -631,11 +640,11 @@ func parseAlarmNameAndTags(listTags cloudwatch.ListTagsForResourceOutput, curren
var scaleDownSuffix = "-scale-down"
var scaleUpSuffix = "-scale-up"

if (strings.HasSuffix(currentAlarmName, scaleUpSuffix)) {
if strings.HasSuffix(currentAlarmName, scaleUpSuffix) {
currentAlarmAction = "Up"
scaleUpAlarmName = currentAlarmName
scaleDownAlarmName = currentAlarmName[0:len(currentAlarmName)-len(scaleUpSuffix)] + scaleDownSuffix
} else if (strings.HasSuffix(currentAlarmName, scaleDownSuffix)) {
} else if strings.HasSuffix(currentAlarmName, scaleDownSuffix) {
currentAlarmAction = "Down"
scaleUpAlarmName = currentAlarmName[0:len(currentAlarmName)-len(scaleDownSuffix)] + scaleUpSuffix
scaleDownAlarmName = currentAlarmName
Expand Down