Skip to content

Commit ea03569

Browse files
authored
Use AWS SDK to delete volumes during cluster down (#1954)
1 parent e7bacc5 commit ea03569

File tree

9 files changed

+172
-45
lines changed

9 files changed

+172
-45
lines changed

cli/cmd/cluster.go

Lines changed: 50 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"time"
2626

2727
"github.com/aws/aws-sdk-go/service/autoscaling"
28+
"github.com/aws/aws-sdk-go/service/ec2"
2829
"github.com/aws/aws-sdk-go/service/elbv2"
2930
"github.com/cortexlabs/cortex/cli/cluster"
3031
"github.com/cortexlabs/cortex/cli/types/cliconfig"
@@ -421,6 +422,17 @@ var _clusterDownCmd = &cobra.Command{
421422
case clusterstate.StatusNotFound:
422423
exit.Error(clusterstate.ErrorClusterDoesNotExist(accessConfig.ClusterName, accessConfig.Region))
423424
case clusterstate.StatusDeleteComplete:
425+
// silently clean up
426+
awsClient.DeleteQueuesWithPrefix(clusterconfig.SQSNamePrefix(accessConfig.ClusterName))
427+
awsClient.DeletePolicy(clusterconfig.DefaultPolicyARN(accountID, accessConfig.ClusterName, accessConfig.Region))
428+
if !_flagClusterDownKeepVolumes {
429+
volumes, err := listPVCVolumesForCluster(awsClient, accessConfig.ClusterName)
430+
if err == nil {
431+
for _, volume := range volumes {
432+
awsClient.DeleteVolume(*volume.VolumeId)
433+
}
434+
}
435+
}
424436
exit.Error(clusterstate.ErrorClusterAlreadyDeleted(accessConfig.ClusterName, accessConfig.Region))
425437
}
426438
}
@@ -437,7 +449,7 @@ var _clusterDownCmd = &cobra.Command{
437449
fmt.Print("○ deleting sqs queues ")
438450
err = awsClient.DeleteQueuesWithPrefix(clusterconfig.SQSNamePrefix(accessConfig.ClusterName))
439451
if err != nil {
440-
fmt.Printf("\n\nfailed to delete all sqs queues; please delete queues starting with the name %s via the cloudwatch console: https://%s.console.aws.amazon.com/sqs/v2/home", clusterconfig.SQSNamePrefix(accessConfig.ClusterName), accessConfig.Region)
452+
fmt.Printf("\n\nfailed to delete all sqs queues; please delete queues starting with the name %s via the cloudwatch console: https://%s.console.aws.amazon.com/sqs/v2/home\n", clusterconfig.SQSNamePrefix(accessConfig.ClusterName), accessConfig.Region)
441453
errors.PrintError(err)
442454
fmt.Println()
443455
} else {
@@ -446,12 +458,7 @@ var _clusterDownCmd = &cobra.Command{
446458

447459
fmt.Print("○ spinning down the cluster ...")
448460

449-
uninstallCmd := "/root/uninstall.sh"
450-
if _flagClusterDownKeepVolumes {
451-
uninstallCmd += " --keep-volumes"
452-
}
453-
454-
out, exitCode, err := runManagerAccessCommand(uninstallCmd, *accessConfig, awsClient, nil, nil)
461+
out, exitCode, err := runManagerAccessCommand("/root/uninstall.sh", *accessConfig, awsClient, nil, nil)
455462
if err != nil {
456463
errors.PrintError(err)
457464
fmt.Println()
@@ -466,13 +473,41 @@ var _clusterDownCmd = &cobra.Command{
466473
fmt.Printf("○ deleting auto-generated iam policy %s ", policyARN)
467474
err = awsClient.DeletePolicy(policyARN)
468475
if err != nil {
469-
fmt.Printf("\n\nfailed to delete auto-generated cortex policy %s; please delete the policy via the iam console: https://us-west-2.console.aws.amazon.com/iam/home#/policies", policyARN)
476+
fmt.Printf("\n\nfailed to delete auto-generated cortex policy %s; please delete the policy via the iam console: https://console.aws.amazon.com/iam/home#/policies\n", policyARN)
470477
errors.PrintError(err)
471478
fmt.Println()
472479
} else {
473480
fmt.Println("✓")
474481
}
475482

483+
// delete EBS volumes
484+
if !_flagClusterDownKeepVolumes {
485+
volumes, err := listPVCVolumesForCluster(awsClient, accessConfig.ClusterName)
486+
if err != nil {
487+
fmt.Println("\nfailed to list volumes for deletion; please delete any volumes associated with your cluster via the ec2 console: https://console.aws.amazon.com/ec2/v2/home?#Volumes")
488+
errors.PrintError(err)
489+
fmt.Println()
490+
} else {
491+
fmt.Print("○ deleting ebs volumes ")
492+
var failedToDeleteVolumes []string
493+
var lastErr error
494+
for _, volume := range volumes {
495+
err := awsClient.DeleteVolume(*volume.VolumeId)
496+
if err != nil {
497+
failedToDeleteVolumes = append(failedToDeleteVolumes, *volume.VolumeId)
498+
lastErr = err
499+
}
500+
}
501+
if lastErr != nil {
502+
fmt.Printf("\n\nfailed to delete %s %s; please delete %s via the ec2 console: https://console.aws.amazon.com/ec2/v2/home?#Volumes\n", s.PluralS("volume", len(failedToDeleteVolumes)), s.UserStrsAnd(failedToDeleteVolumes), s.PluralCustom("it", "them", len(failedToDeleteVolumes)))
503+
errors.PrintError(lastErr)
504+
fmt.Println()
505+
} else {
506+
fmt.Println("✓")
507+
}
508+
}
509+
}
510+
476511
// best-effort deletion of cli environment(s)
477512
if loadBalancer != nil {
478513
envNames, isDefaultEnv, _ := getEnvNamesByOperatorEndpoint(*loadBalancer.DNSName)
@@ -1018,3 +1053,10 @@ func getAWSOperatorLoadBalancer(clusterName string, awsClient *aws.Client) (*elb
10181053

10191054
return loadBalancer, nil
10201055
}
1056+
1057+
func listPVCVolumesForCluster(awsClient *aws.Client, clusterName string) ([]ec2.Volume, error) {
1058+
return awsClient.ListVolumes(ec2.Tag{
1059+
Key: pointer.String(fmt.Sprintf("kubernetes.io/cluster/%s", clusterName)),
1060+
Value: nil, // any value should be ok as long as the key is present
1061+
})
1062+
}

cli/cmd/cluster_gcp.go

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -144,9 +144,9 @@ var _clusterGCPUpCmd = &cobra.Command{
144144
exit.Error(err)
145145
}
146146

147-
gkeClusterName := fmt.Sprintf("projects/%s/locations/%s/clusters/%s", clusterConfig.Project, clusterConfig.Zone, clusterConfig.ClusterName)
147+
fullyQualifiedClusterName := fmt.Sprintf("projects/%s/locations/%s/clusters/%s", clusterConfig.Project, clusterConfig.Zone, clusterConfig.ClusterName)
148148

149-
clusterExists, err := gcpClient.ClusterExists(gkeClusterName)
149+
clusterExists, err := gcpClient.ClusterExists(fullyQualifiedClusterName)
150150
if err != nil {
151151
exit.Error(err)
152152
}
@@ -169,7 +169,7 @@ var _clusterGCPUpCmd = &cobra.Command{
169169
exit.Error(err)
170170
}
171171

172-
operatorLoadBalancerIP, err := getGCPOperatorLoadBalancerIP(gkeClusterName, gcpClient)
172+
operatorLoadBalancerIP, err := getGCPOperatorLoadBalancerIP(fullyQualifiedClusterName, gcpClient)
173173
if err != nil {
174174
exit.Error(errors.Append(err, fmt.Sprintf("\n\nyou can attempt to resolve this issue and configure your cli environment by running `cortex cluster info --configure-env %s`", _flagClusterGCPUpEnv)))
175175
}
@@ -244,10 +244,10 @@ var _clusterGCPDownCmd = &cobra.Command{
244244
exit.Error(err)
245245
}
246246

247-
gkeClusterName := fmt.Sprintf("projects/%s/locations/%s/clusters/%s", accessConfig.Project, accessConfig.Zone, accessConfig.ClusterName)
247+
fullyQualifiedClusterName := fmt.Sprintf("projects/%s/locations/%s/clusters/%s", accessConfig.Project, accessConfig.Zone, accessConfig.ClusterName)
248248
bucketName := clusterconfig.GCPBucketName(accessConfig.ClusterName, accessConfig.Project, accessConfig.Zone)
249249

250-
clusterExists, err := gcpClient.ClusterExists(gkeClusterName)
250+
clusterExists, err := gcpClient.ClusterExists(fullyQualifiedClusterName)
251251
if err != nil {
252252
exit.Error(err)
253253
}
@@ -257,7 +257,7 @@ var _clusterGCPDownCmd = &cobra.Command{
257257
}
258258

259259
// updating CLI env is best-effort, so ignore errors
260-
operatorLoadBalancerIP, _ := getGCPOperatorLoadBalancerIP(gkeClusterName, gcpClient)
260+
operatorLoadBalancerIP, _ := getGCPOperatorLoadBalancerIP(fullyQualifiedClusterName, gcpClient)
261261

262262
if _flagClusterGCPDisallowPrompt {
263263
fmt.Printf("your cluster named \"%s\" in %s (zone: %s) will be spun down and all apis will be deleted\n\n", accessConfig.ClusterName, accessConfig.Project, accessConfig.Zone)
@@ -291,7 +291,7 @@ var _clusterGCPDownCmd = &cobra.Command{
291291
fmt.Print("○ proceeding with best-effort deletion of the cluster ")
292292
}
293293

294-
_, err = gcpClient.DeleteCluster(gkeClusterName)
294+
_, err = gcpClient.DeleteCluster(fullyQualifiedClusterName)
295295
if err != nil {
296296
fmt.Print("\n\n")
297297
exit.Error(err)
@@ -461,7 +461,7 @@ func createGKECluster(clusterConfig *clusterconfig.GCPConfig, gcpClient *gcp.Cli
461461
fmt.Print("○ creating GKE cluster ")
462462

463463
gkeClusterParent := fmt.Sprintf("projects/%s/locations/%s", clusterConfig.Project, clusterConfig.Zone)
464-
gkeClusterName := fmt.Sprintf("%s/clusters/%s", gkeClusterParent, clusterConfig.ClusterName)
464+
fullyQualifiedClusterName := fmt.Sprintf("%s/clusters/%s", gkeClusterParent, clusterConfig.ClusterName)
465465

466466
gkeClusterConfig := containerpb.Cluster{
467467
Name: clusterConfig.ClusterName,
@@ -571,7 +571,7 @@ func createGKECluster(clusterConfig *clusterconfig.GCPConfig, gcpClient *gcp.Cli
571571
fmt.Print(".")
572572
time.Sleep(5 * time.Second)
573573

574-
cluster, err := gcpClient.GetCluster(gkeClusterName)
574+
cluster, err := gcpClient.GetCluster(fullyQualifiedClusterName)
575575
if err != nil {
576576
return err
577577
}
@@ -593,8 +593,8 @@ func createGKECluster(clusterConfig *clusterconfig.GCPConfig, gcpClient *gcp.Cli
593593
return nil
594594
}
595595

596-
func getGCPOperatorLoadBalancerIP(clusterName string, gcpClient *gcp.Client) (string, error) {
597-
cluster, err := gcpClient.GetCluster(clusterName)
596+
func getGCPOperatorLoadBalancerIP(fullyQualifiedClusterName string, gcpClient *gcp.Client) (string, error) {
597+
cluster, err := gcpClient.GetCluster(fullyQualifiedClusterName)
598598
if err != nil {
599599
return "", err
600600
}

manager/uninstall.sh

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -40,14 +40,7 @@ function uninstall_gcp() {
4040

4141
function uninstall_aws() {
4242
echo
43-
4443
aws eks --region $CORTEX_REGION update-kubeconfig --name $CORTEX_CLUSTER_NAME >/dev/null
45-
46-
if [ "$arg1" != "--keep-volumes" ]; then
47-
uninstall_prometheus
48-
uninstall_grafana
49-
fi
50-
5144
eksctl delete cluster --wait --name=$CORTEX_CLUSTER_NAME --region=$CORTEX_REGION --timeout=$EKSCTL_TIMEOUT
5245
echo -e "\n✓ done spinning down the cluster"
5346
}

pkg/lib/aws/ec2.go

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -329,3 +329,63 @@ func (c *Client) DescribeVpcs() ([]ec2.Vpc, error) {
329329

330330
return vpcs, nil
331331
}
332+
333+
func (c *Client) ListVolumes(tags ...ec2.Tag) ([]ec2.Volume, error) {
334+
var volumes []ec2.Volume
335+
err := c.EC2().DescribeVolumesPages(&ec2.DescribeVolumesInput{}, func(output *ec2.DescribeVolumesOutput, lastPage bool) bool {
336+
if output == nil {
337+
return false
338+
}
339+
for _, volume := range output.Volumes {
340+
if volume == nil {
341+
continue
342+
}
343+
if hasAllEC2Tags(tags, volume.Tags) {
344+
volumes = append(volumes, *volume)
345+
}
346+
}
347+
348+
return true
349+
})
350+
351+
if err != nil {
352+
return nil, errors.WithStack(err)
353+
}
354+
355+
return volumes, nil
356+
}
357+
358+
func (c *Client) DeleteVolume(volumeID string) error {
359+
_, err := c.EC2().DeleteVolume(&ec2.DeleteVolumeInput{
360+
VolumeId: aws.String(volumeID),
361+
})
362+
if err != nil {
363+
return errors.Wrap(err)
364+
}
365+
366+
return nil
367+
}
368+
369+
func hasAllEC2Tags(queryTags []ec2.Tag, allResourceTags []*ec2.Tag) bool {
370+
for _, queryTag := range queryTags {
371+
if !hasEC2Tag(queryTag, allResourceTags) {
372+
return false
373+
}
374+
}
375+
return true
376+
}
377+
378+
// if queryTag's value is nil, the tag will match as long as the key is present in the resource's tags
379+
func hasEC2Tag(queryTag ec2.Tag, allResourceTags []*ec2.Tag) bool {
380+
for _, resourceTag := range allResourceTags {
381+
if queryTag.Key != nil && resourceTag.Key != nil && *queryTag.Key == *resourceTag.Key {
382+
if queryTag.Value == nil {
383+
return true
384+
}
385+
if queryTag.Value != nil && resourceTag.Value != nil && *queryTag.Value == *resourceTag.Value {
386+
return true
387+
}
388+
}
389+
}
390+
return false
391+
}

pkg/lib/gcp/clients.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ type clients struct {
3030
gcs *storage.Client
3131
compute *compute.Service
3232
gke *container.ClusterManagerClient
33+
disks *compute.DisksService
3334
}
3435

3536
func (c *Client) GCS() (*storage.Client, error) {
@@ -82,3 +83,18 @@ func (c *Client) GKE() (*container.ClusterManagerClient, error) {
8283
}
8384
return c.clients.gke, nil
8485
}
86+
87+
func (c *Client) Disks() (*compute.DisksService, error) {
88+
if c.clients.disks == nil {
89+
comp, err := c.Compute()
90+
if err != nil {
91+
return nil, err
92+
}
93+
disks := compute.NewDisksService(comp)
94+
if err != nil {
95+
return nil, errors.WithStack(err)
96+
}
97+
c.clients.disks = disks
98+
}
99+
return c.clients.disks, nil
100+
}

pkg/lib/gcp/compute.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -330,3 +330,28 @@ func (c *Client) GetAvailableZonesForAccelerator(acceleratorType string) ([]stri
330330

331331
return availableAcceleratorZones.SliceSorted(), nil
332332
}
333+
334+
func (c *Client) ListDisks(zone string) ([]compute.Disk, error) {
335+
disksClient, err := c.Disks()
336+
if err != nil {
337+
return nil, err
338+
}
339+
340+
var disks []compute.Disk
341+
342+
err = disksClient.List(c.ProjectID, zone).Pages(context.Background(), func(list *compute.DiskList) error {
343+
for _, disk := range list.Items {
344+
if disk == nil {
345+
continue
346+
}
347+
disks = append(disks, *disk)
348+
}
349+
return nil
350+
})
351+
352+
if err != nil {
353+
return nil, errors.WithStack(err)
354+
}
355+
356+
return disks, nil
357+
}

pkg/lib/gcp/gke.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,13 +29,13 @@ import (
2929
grpcStatus "google.golang.org/grpc/status"
3030
)
3131

32-
func (c *Client) GetCluster(clusterName string) (*containerpb.Cluster, error) {
32+
func (c *Client) GetCluster(fullyQualifiedClusterName string) (*containerpb.Cluster, error) {
3333
gke, err := c.GKE()
3434
if err != nil {
3535
return nil, err
3636
}
3737
cluster, err := gke.GetCluster(context.Background(), &containerpb.GetClusterRequest{
38-
Name: clusterName,
38+
Name: fullyQualifiedClusterName,
3939
})
4040
if err != nil {
4141
return nil, errors.WithStack(err)

pkg/types/clusterconfig/cluster_config_aws.go

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,7 @@ var CoreConfigStructFieldValidations = []*cr.StructFieldValidation{
214214
StructField: "ClusterName",
215215
StringValidation: &cr.StringValidation{
216216
Default: "cortex",
217-
MaxLength: 63,
217+
MaxLength: 54, // leaves room for 8 char uniqueness string (and "-") for bucket name (63 chars max)
218218
MinLength: 3,
219219
Validator: validateClusterName,
220220
},
@@ -704,7 +704,7 @@ var AccessValidation = &cr.StructValidation{
704704
StructField: "ClusterName",
705705
StringValidation: &cr.StringValidation{
706706
Default: "cortex",
707-
MaxLength: 63,
707+
MaxLength: 54, // leaves room for 8 char uniqueness string (and "-") for bucket name (63 chars max)
708708
MinLength: 3,
709709
Validator: validateClusterName,
710710
},
@@ -805,17 +805,8 @@ func (cc *Config) Validate(awsClient *aws.Client, skipQuotaVerification bool) er
805805
}
806806

807807
if cc.Bucket == "" {
808-
bucketID := hash.String(accountID + cc.Region)[:10]
809-
810-
defaultBucket := cc.ClusterName + "-" + bucketID
811-
if len(defaultBucket) > 63 {
812-
defaultBucket = defaultBucket[:63]
813-
}
814-
if strings.HasSuffix(defaultBucket, "-") {
815-
defaultBucket = defaultBucket[:len(defaultBucket)-1]
816-
}
817-
818-
cc.Bucket = defaultBucket
808+
bucketID := hash.String(accountID + cc.Region)[:8] // this is to "guarantee" a globally unique name
809+
cc.Bucket = cc.ClusterName + "-" + bucketID
819810
} else {
820811
bucketRegion, _ := aws.GetBucketRegion(cc.Bucket)
821812
if bucketRegion != "" && bucketRegion != cc.Region { // if the bucket didn't exist, we will create it in the correct region, so there is no error

0 commit comments

Comments
 (0)