diff --git a/src/xpk/commands/cluster.py b/src/xpk/commands/cluster.py index 143cee9c8..2b93c0daf 100644 --- a/src/xpk/commands/cluster.py +++ b/src/xpk/commands/cluster.py @@ -78,6 +78,8 @@ from ..utils.file import write_tmp_file from . import cluster_gcluster from .common import set_cluster_command +import shutil +import os def cluster_adapt(args) -> None: @@ -247,6 +249,10 @@ def cluster_create(args) -> None: get_cluster_credentials(args) + update_coredns_command_code = update_coredns_if_necessary(args) + if update_coredns_command_code != 0: + xpk_exit(update_cluster_command_code) + k8s_client = setup_k8s_env(args) install_storage_crd(k8s_client) @@ -702,6 +708,259 @@ def cluster_create_ray_cluster(args) -> None: cluster_create(args) +def install_jq(args): + """Installs 'jq' utility.""" + command_jq_install = 'sudo apt install jq -y' + xpk_print("Task: 'Install jq' in progress.") + return_code = run_command_with_updates(command_jq_install, 'Install jq', args) + if return_code != 0: + xpk_print(f'Install jq error {return_code}') + xpk_exit(return_code) + + +def clone_coredns_deployment_repo(args, coredns_repo_full_path: str): + """Clones the CoreDNS deployment repository if it doesn't exist.""" + if os.path.exists(coredns_repo_full_path): + xpk_print( + f"Directory '{coredns_repo_full_path}' already exists, skip git clone." + ) + return + command_git_clone = ( + 'git clone https://github.com/coredns/deployment.git' + f' {coredns_repo_full_path}' + ) + xpk_print( + "Task: 'Clone deployment' in progress, Target" + f' directory:{coredns_repo_full_path}.' + ) + return_code = run_command_with_updates( + command_git_clone, 'Clone deployment', args + ) + if return_code != 0: + xpk_print(f'Clone deployment error {return_code}') + xpk_exit(return_code) + + +def deploy_coredns_manifests(args, coredns_k8s_path: str): + """Deploys CoreDNS manifests to the cluster.""" + if not os.path.isdir(coredns_k8s_path): + xpk_print( + f"Error:CoreDNS Kubernetes path '{coredns_k8s_path}' does not exist." + ' Has git clone been successful?' + ) + xpk_exit(1) + original_cwd = os.getcwd() + try: + os.chdir(coredns_k8s_path) + xpk_print(f'Current working directory changed to: {os.getcwd()}') + + command_deploy_coredns = './deploy.sh | kubectl apply -f -' + xpk_print( + f"Task: 'Deploy CoreDNS' in progress, Located at '{coredns_k8s_path}'" + ) + return_code = run_command_with_updates( + command_deploy_coredns, 'Deploy CoreDNS', args + ) + if return_code != 0: + xpk_print(f'Deploy CoreDNS error {return_code}') + + finally: + xpk_print(f'Restoring working directory to: {original_cwd}') + os.chdir(original_cwd) + if return_code != 0: + xpk_exit(return_code) + + +def scale_down_deployment( + args, deployment_name: str, namespace: str = 'kube-system' +): + """Scales down a specified Kubernetes deployment to 0 replicas.""" + command = ( + f'kubectl scale deployment {deployment_name} --replicas=0' + f' --namespace={namespace}' + ) + xpk_print(f"Task: 'Scaling down {deployment_name}' in progress") + return_code = run_command_with_updates( + command, f'Scale down {deployment_name}', args + ) + if return_code != 0: + xpk_print(f'Scale down {deployment_name} error {return_code}') + xpk_exit(return_code) + xpk_print(f'\n{deployment_name} has been scaled down.') + + +def scale_up_coredns(args, replicas: int = 15, namespace: str = 'kube-system'): + """Scales up the CoreDNS deployment to a specified number of replicas.""" + command_coredns_scale = ( + f'kubectl scale deployment coredns --replicas={replicas} -n {namespace}' + ) + xpk_print(f"Task: 'Scale CoreDNS' in progress (to {replicas} replicas)") + return_code = run_command_with_updates( + command_coredns_scale, 'Scale CoreDNS', args + ) + if return_code != 0: + xpk_print(f'Scale CoreDNS error {return_code}') + xpk_exit(return_code) + + +def check_deployment_exists(args, deployment_name: str, namespace: str) -> bool: + """Check for the existence of a specific Deployment in a given namespace.""" + command = ( + f'kubectl get deployment {deployment_name} -n' + f' {namespace} --ignore-not-found' + ) + result = run_command_with_updates( + command, 'Waiting for kubeDNS to be checked.', args + ) + return result + + +def verify_coredns_readiness( + args, timeout: int = 120, namespace: str = 'kube-system' +): + """Verifies CoreDNS readiness using kubectl wait commands.""" + xpk_print('Now verifying CoreDNS readiness...') + kube_dns_exists = check_deployment_exists(args, 'kube-dns', namespace) + if kube_dns_exists: + # Wait for kube-dns to be fully scaled down + command_kube_dns_wait_scaled_down = ( + 'kubectl wait deployment/kube-dns' + " --for=jsonpath='{.status.replicas}'=0" + f' --namespace={namespace} --timeout={timeout}s' + ) + xpk_print('Verifying if kube-dns has scaled down...') + return_code_kube_dns = run_command_with_updates( + command_kube_dns_wait_scaled_down, 'Wait for kube-dns scale down', args + ) + if return_code_kube_dns != 0: + xpk_print('kube-dns did not scale down successfully within the timeout.') + xpk_exit(1) # Exit if kube-dns cannot scale down + else: + xpk_print('kube-dns has successfully scaled down.') + else: + xpk_print('kube-dns deployment not found.') + # Wait for CoreDNS to be fully scaled up and available + command_coredns_wait_available = ( + 'kubectl wait deployment/coredns --for=condition=Available=true' + f' --namespace={namespace} --timeout={timeout}s' + ) + xpk_print('Verifying if CoreDNS is available...') + return_code_coredns = run_command_with_updates( + command_coredns_wait_available, 'Wait for coredns available', args + ) + if return_code_coredns != 0: + xpk_print( + 'CoreDNS verification failed, it might not have fully started within' + ' the timeout.' + ) + xpk_exit(1) # Exit if coredns cannot become available + + xpk_print('CoreDNS has successfully started and passed verification.') + + +def cleanup_coredns_repo(coredns_repo_full_path: str): + """Deletes the cloned CoreDNS deployment directory.""" + xpk_print( + "Task: 'Deleting CoreDNS deployment directory' in progress:" + f' {coredns_repo_full_path}' + ) + try: + shutil.rmtree(coredns_repo_full_path) + xpk_print(f'Successfully deleted directory: {coredns_repo_full_path}') + except OSError as e: + xpk_print(f'Error deleting directory {coredns_repo_full_path}: {e}') + + +def update_coredns(args): + """Updates and deploys CoreDNS within a cluster. + + Args: + args: user provided arguments for running the command. + + Returns: + 0 if successful and 1 otherwise. + """ + coredns_repo_dir = os.path.expanduser('/tmp/') + coredns_repo_dir_name = 'deployment' + coredns_repo_full_path = os.path.join(coredns_repo_dir, coredns_repo_dir_name) + coredns_k8s_path = os.path.join(coredns_repo_full_path, 'kubernetes') + # 1. Install jq + install_jq(args) + + # 2. Clone CoreDNS deployment repository + clone_coredns_deployment_repo(args, coredns_repo_full_path) + + # 3. Deploy CoreDNS to the cluster + deploy_coredns_manifests(args, coredns_k8s_path) + + # 4. Scale down kube-dns-autoscaler + scale_down_deployment(args, 'kube-dns-autoscaler') + + # 5. Scale down kube-dns + scale_down_deployment(args, 'kube-dns') + + # 6. Scale up coredns and verify readiness + scale_up_coredns(args, replicas=15) + verify_coredns_readiness(args, timeout=120) + + xpk_print('The CoreDNS setup process has been completed.') + + # 7. Cleanup + cleanup_coredns_repo(coredns_repo_full_path) + + return 0 + + +def coredns_deployment_exists(args, namespace: str = 'kube-system') -> bool: + """Checks if the CoreDNS deployment exists in the given namespace. + + Args: + namespace: The Kubernetes namespace to check for the CoreDNS deployment. + + Returns: + True if the 'coredns' deployment exists, False otherwise. + """ + command = f'kubectl get deployment coredns -n {namespace}' + xpk_print( + "Task: 'Checking CoreDNS deployment existence' in progress for" + f' namespace: {namespace}' + ) + return_code = run_command_with_updates( + command, f'Check CoreDNS deployment in {namespace}', args + ) + if return_code == 0: + verify_coredns_readiness(args) + xpk_print(f"CoreDNS deployment 'coredns' found in namespace '{namespace}'.") + return True + else: + xpk_print( + f"CoreDNS deployment 'coredns' NOT found in namespace '{namespace}' or" + ' an error occurred.' + ) + return False + + +def update_coredns_if_necessary(args) -> int: + """Updates and deploys CoreDNS within the cluster if it's not already present. + + This function checks for the existence of the CoreDNS deployment. + If it's not found, it proceeds to deploy and configure CoreDNS. + + Args: + args: User-provided arguments for running the command. + + Returns: + 0 if successful (CoreDNS was already present or successfully deployed), + and 1 otherwise. + """ + if coredns_deployment_exists(args, namespace='kube-system'): + xpk_print('Skipping CoreDNS deployment since it already exists.') + return 0 + else: + xpk_print('CoreDNS deployment not found. Proceeding with CoreDNS setup.') + return update_coredns(args) + + def create_cluster_if_necessary( args, gke_control_plane_version: str, system: SystemCharacteristics ) -> int: