1+ # Copyright 2024 Confluent Inc.
2+ import glob
3+ import hashlib
4+ import logging
5+ import os
6+ import re
7+ import time
8+ from datetime import datetime , timedelta
9+ import boto3
10+
11+ from terraform .kafka_runner .util import HASH_ALGORITHM , AWS_REGION , AWS_ACCOUNT_ID , AMI_NAME_MAX_LENGTH , \
12+ BASE_KAFKA_DIR , run , WORKER_AMI_JSON , WORKER_AMI_NAME , INSTANCE_TYPE , IPV6_SUBNET_ID ,IPV4_SUBNET_ID , AMI , AWS_PACKER_JSON , VPC_ID , KAFKA_BRANCH , ALLOW_ALL_SECURITY_GROUP_ID
13+
14+ GET_VERSION_FROM_PACKAGES_RE = re .compile ('.*confluent-packages-(?P<major>\d+)\.(?P<minor>\d+)\.(?P<patch>\d+).*' )
15+
16+ def hash_files (file_list , ** kwargs ):
17+ """
18+ Creates a hash based on the contents of the files.
19+ Arguments:
20+ file_list: a list of file paths
21+ **kwargs: additional key-value pairs to include in the hash
22+ """
23+ hasher = hashlib .new (HASH_ALGORITHM )
24+ sorted_files = sorted (file_list )
25+
26+ hasher .update (str (sorted (kwargs .items ())).encode ())
27+
28+ for f in sorted_files :
29+ with open (f , "rb" ) as fd :
30+ hasher .update (fd .read ())
31+ return hasher .hexdigest ()
32+
33+ def ensure_trailing_separator (dirname ):
34+ """Ensure trailing separator on the directory name
35+ E.g::
36+ my/path -> my/path/ # Add trailing '/'
37+ my/path/ -> my/path/ # No change
38+ """
39+ if not dirname .endswith (os .path .sep ):
40+ dirname += os .path .sep
41+ return dirname
42+
43+
44+ def compute_packer_hash (** extras ):
45+ """Compute a hash which depends on the contents and directory layout of Packer files.
46+ Since Packer files are changed infrequently, hopefully this provides a reasonable way to cache and reuse a
47+ pre-created ami.
48+ Arguments:
49+ **extras: named user arguments to pass to packer
50+ """
51+ previous_wd = os .getcwd ()
52+ os .chdir (BASE_KAFKA_DIR )
53+
54+ dirname = os .path .dirname (AWS_PACKER_JSON )
55+
56+ def with_extension (extension ):
57+ return glob .glob (os .path .join (dirname , '*.%s' % extension ))
58+
59+ file_list = with_extension ('sh' ) + with_extension ('json' ) + [
60+ os .path .join (BASE_KAFKA_DIR , "resources/requirements.txt" )]
61+
62+ logging .info ('Files considered for packer_hash: %s' , ', ' .join (file_list ))
63+ logging .info ('Extras considered for packer_hash: %s' , extras )
64+
65+ hash_val = hash_files (file_list , ** extras )
66+ os .chdir (previous_wd )
67+
68+ return hash_val
69+
70+ def image_from (name = None , image_id = None , region_name = AWS_REGION ):
71+ """Given the image name or id, return a boto3 object corresponding to the image, or None if no such image exists."""
72+ if bool (name ) == bool (image_id ):
73+ raise ValueError ('image_from requires either name or image_id' )
74+ ec2 = boto3 .resource ("ec2" , region_name = region_name )
75+ filters = []
76+ if image_id :
77+ filters .append ({'Name' : 'image-id' , 'Values' : [image_id ]})
78+ if name :
79+ filters .append ({'Name' : 'name' , 'Values' : [name ]})
80+ return next (iter (ec2 .images .filter (Owners = [AWS_ACCOUNT_ID ], Filters = filters )), None )
81+
82+ def create_ami (image_name , source_ami = AMI , region_name = AWS_REGION , volume_size = 60 ,
83+ packer_json = AWS_PACKER_JSON , instance_type = INSTANCE_TYPE , ** extras ):
84+ """Create a new ami using packer!"""
85+ previons_wd = os .getcwd ()
86+ os .chdir (BASE_KAFKA_DIR )
87+ extras .setdefault ('linux_distro' , os .environ .get ('LINUX_DISTRO' , 'ubuntu' ))
88+
89+ cmd = 'packer build'
90+ cmd += ' -var "region=%s"' % region_name
91+ cmd += ' -var "source_ami=%s"' % source_ami
92+ cmd += ' -var "ami_name=%s"' % image_name
93+ cmd += ' -var "volume_size=%s"' % volume_size
94+ cmd += ' -var "instance_type=%s"' % instance_type
95+ cmd += ' -var "vpc_id=%s"' % VPC_ID
96+ cmd += ' -var "subnet_id=%s"' % IPV4_SUBNET_ID
97+ cmd += ' -var "security_group_id=%s"' % ALLOW_ALL_SECURITY_GROUP_ID
98+ cmd += '' .join ([' -var "{}={}"' .format (* v ) for v in extras .items () if v [1 ] is not None ])
99+ cmd += ' ' + packer_json
100+
101+ logging .info ("Creating a new image with name %s in region %s..." % (image_name , region_name ))
102+ logging .info ("This may take 10-20 minutes..." )
103+ run (cmd , allow_fail = False , print_output = True )
104+
105+ image = image_from (name = image_name , region_name = region_name )
106+ assert image is not None , "Expected aws image %s to exist after running packer!" % image_name
107+ os .chdir (previons_wd )
108+
109+ logging .info ('Successfully created new image with id = %s' , image .image_id )
110+
111+ return image
112+
113+ def wait_ready (image_id , region_name = AWS_REGION , timeout_sec = 1200 ):
114+ """Block until the given image_id is ready. Raise exception if no image with the given id."""
115+
116+ logging .info ("Waiting for %s to become available..." % image_id )
117+ start = time .time ()
118+ backoff = 5
119+ counter = 0
120+ while time .time () - start <= timeout_sec :
121+ image = image_from (image_id = image_id , region_name = region_name )
122+ assert image is not None , "Expected an image to exist with id %s, but it doesn't." % image_id
123+
124+ if image .state .lower () == "available" :
125+ logging .info ("Image %s is available." % image_id )
126+ break
127+ time .sleep (backoff )
128+ counter += 1
129+
130+ # progress bar, indicate + for each minute elapsed
131+ if counter % (60 / backoff ) == 0 :
132+ print ("+" )
133+ else :
134+ print ("-" )
135+
136+ def package_base_ami (instance_type = INSTANCE_TYPE , source_ami = AMI , ssh_account = None , volume_size = 60 , ** hash_extras ):
137+ """
138+ :param instance_type: instance to use create ami
139+ :param source_ami: base ami to spin up the instance
140+ :param ssh_account: which account to use ssh into the instance
141+ :param volume_size: size of the instance
142+ :param hash_extras: other parameters
143+ :return:
144+ This function creates base ami for the workers. In this base ami we download common modules.
145+ Using base ami we create target ami
146+ """
147+ if ssh_account is None :
148+ ssh_account = "ubuntu"
149+ packer_hash = compute_packer_hash (source_ami = source_ami , ** hash_extras )
150+ logging .info ("packer_hash: %s" % packer_hash )
151+
152+ ami_name = "kafka-%s-%s" % (packer_hash ,KAFKA_BRANCH )
153+ ami_name = ami_name [:AMI_NAME_MAX_LENGTH ] # Truncate to maximum length
154+ logging .info ("Base AMI name: %s (created from %s)" % (ami_name , source_ami ))
155+
156+ # Check for cached image, and create if not present
157+ image = image_from (name = ami_name )
158+ if image :
159+ logging .info ("Found image matching %s: %s" % (ami_name , image ))
160+ # Corner case: wait until image is ready
161+ wait_ready (image .image_id )
162+ else :
163+ logging .info ("No image matching %s." % ami_name )
164+ image = create_ami (ami_name , instance_type = instance_type , source_ami = source_ami , ssh_account = ssh_account , volume_size = volume_size , packer_json = AWS_PACKER_JSON , ** hash_extras )
165+
166+ return image .image_id
167+
168+ def package_worker_ami (install_type , volume_size , source_ami = AMI ,
169+ instance_type = INSTANCE_TYPE , ssh_account = None , ** extras ):
170+ """ Create a worker AMI with Confluent Platform """
171+ if ssh_account is None :
172+ ssh_account = "ubuntu"
173+ base_ami = package_base_ami (instance_type = instance_type , source_ami = source_ami , ssh_account = ssh_account ,
174+ volume_size = volume_size , ** extras )
175+
176+ logging .info ("Worker AMI name: %s" % WORKER_AMI_NAME )
177+ image = create_ami (WORKER_AMI_NAME , source_ami = base_ami , packer_json = WORKER_AMI_JSON , install_type = install_type ,
178+ ssh_account = ssh_account , volume_size = volume_size , instance_type = instance_type , ** extras )
179+ delete_old_worker_amis ()
180+ return image .image_id
181+
182+ def delete_old_worker_amis ():
183+ """ Delete worker AMIs older than 30 days """
184+ logging .info ('Checking for old worker AMIs to delete...' )
185+
186+ ec2 = boto3 .resource ("ec2" , region_name = AWS_REGION )
187+ for image in ec2 .images .filter (Owners = [AWS_ACCOUNT_ID ], Filters = [{'Name' : 'tag:Service' , 'Values' : ['ce-kafka' ]},
188+ {'Name' : 'tag:CreatedBy' , 'Values' : ['kafka-system-test' ]}]):
189+ created_date = datetime .strptime (image .creation_date , "%Y-%m-%dT%H:%M:%S.000Z" )
190+
191+ if datetime .utcnow () - created_date > timedelta (days = 30 ):
192+ snapshot_ids = [s ['Ebs' ]['SnapshotId' ] for s in image .block_device_mappings if 'Ebs' in s ]
193+ logging .info ('Deleting worker AMI {} with snapshot(s): {}' .format (image .id , snapshot_ids ))
194+
195+ image .deregister ()
196+ for snapshot in ec2 .snapshots .filter (SnapshotIds = snapshot_ids ):
197+ snapshot .delete ()
0 commit comments