diff --git a/golang/scale.go b/golang/scale.go index 2a4ee3b..0da323b 100644 --- a/golang/scale.go +++ b/golang/scale.go @@ -10,13 +10,13 @@ package main import ( "context" "encoding/json" + "errors" "fmt" "math/rand" "os" "strconv" - "time" "strings" - "errors" + "time" "github.com/aws/aws-lambda-go/events" "github.com/aws/aws-lambda-go/lambda" @@ -85,6 +85,7 @@ func handleRequest(_ context.Context, snsEvent events.SNSEvent) { var datapointsRequiredScaleUp, datapointsRequiredScaleDown int64 var upThreshold, downThreshold float64 var scaleDownMinIterAgeMins int64 + var minShardCount int64 var dryRun = true // Note: Investigate envconfig (https://github.com/kelseyhightower/envconfig) to simplify this environment variable section to have less boilerplate. periodMins, err := strconv.ParseInt(os.Getenv("SCALE_PERIOD_MINS"), 10, 64) @@ -175,6 +176,14 @@ func handleRequest(_ context.Context, snsEvent events.SNSEvent) { logger.WithError(err).Error(logMessage) errorHandler(err, logMessage, "", false) } + minShardCount, err = strconv.ParseInt(os.Getenv("MIN_SHARD_COUNT"), 10, 64) + if err != nil { + // Default minimum shard count + minShardCount = 1 + logMessage := "Error reading the MIN_SHARD_COUNT environment variable. Stream will scale down to a minimum of 1 shard." + logger.WithError(err).Error(logMessage) + errorHandler(err, logMessage, "", false) + } dryRun, err = strconv.ParseBool(os.Getenv("DRY_RUN")) if err != nil { // Default dryRun value is true. In case of error while reading this variable, dryRun will be set to true to be on the safer side. @@ -216,7 +225,7 @@ func handleRequest(_ context.Context, snsEvent events.SNSEvent) { } scaleUpAlarmName, scaleDownAlarmName, currentAlarmAction, lastScaledTimestamp = parseAlarmNameAndTags(*response, currentAlarmName) logger = logger.WithField("ScaleAction", currentAlarmAction) - if (currentAlarmAction == "") { + if currentAlarmAction == "" { logMessage := fmt.Sprintf("Scaling event was rejected. Could not parse triggering alarm name (%s), should end in -scale-up or -scale-down", currentAlarmName) err = errors.New(logMessage) logger.WithError(err).Error(logMessage) @@ -261,7 +270,7 @@ func handleRequest(_ context.Context, snsEvent events.SNSEvent) { return } currentShardCount = *((*streamSummary.StreamDescriptionSummary).OpenShardCount) - newShardCount, downThreshold = calculateNewShardCount(currentAlarmAction, downThreshold, currentShardCount) + newShardCount, downThreshold = calculateNewShardCount(currentAlarmAction, downThreshold, currentShardCount, minShardCount) logger = logger.WithField("CurrentShardCount", currentShardCount).WithField("TargetShardCount", newShardCount) if dryRun { logger.Info("This is dry run. Will not scale the stream.") @@ -582,7 +591,7 @@ func setAlarmState(alarmName string, state string, reason string) (*cloudwatch.S // scaleAction: The scaling action. Possible values are Up and Down // downThreshold: The current scaling down threshold. This will be set to -1.0 if the new shard count turns out to be 1 // currentShardCount: The current open shards in the Kinesis stream -func calculateNewShardCount(scaleAction string, downThreshold float64, currentShardCount int64) (int64, float64) { +func calculateNewShardCount(scaleAction string, downThreshold float64, currentShardCount int64, minShardCount int64) (int64, float64) { var targetShardCount int64 if scaleAction == "Up" { targetShardCount = currentShardCount * 2 @@ -591,8 +600,8 @@ func calculateNewShardCount(scaleAction string, downThreshold float64, currentSh if scaleAction == "Down" { targetShardCount = currentShardCount / 2 // Set to minimum shard count - if targetShardCount <= 1 { - targetShardCount = 1 + if targetShardCount <= minShardCount { + targetShardCount = minShardCount // At minimum shard count,set the scale down threshold to -1, so that scale down alarm remains in OK state downThreshold = -1.0 } @@ -631,11 +640,11 @@ func parseAlarmNameAndTags(listTags cloudwatch.ListTagsForResourceOutput, curren var scaleDownSuffix = "-scale-down" var scaleUpSuffix = "-scale-up" - if (strings.HasSuffix(currentAlarmName, scaleUpSuffix)) { + if strings.HasSuffix(currentAlarmName, scaleUpSuffix) { currentAlarmAction = "Up" scaleUpAlarmName = currentAlarmName scaleDownAlarmName = currentAlarmName[0:len(currentAlarmName)-len(scaleUpSuffix)] + scaleDownSuffix - } else if (strings.HasSuffix(currentAlarmName, scaleDownSuffix)) { + } else if strings.HasSuffix(currentAlarmName, scaleDownSuffix) { currentAlarmAction = "Down" scaleUpAlarmName = currentAlarmName[0:len(currentAlarmName)-len(scaleDownSuffix)] + scaleUpSuffix scaleDownAlarmName = currentAlarmName