diff --git a/src/xpk/core/scheduling.py b/src/xpk/core/scheduling.py index 8bc18c66..00c23f72 100644 --- a/src/xpk/core/scheduling.py +++ b/src/xpk/core/scheduling.py @@ -25,86 +25,83 @@ def check_if_workload_can_schedule(args, system: SystemCharacteristics) -> bool: - """Check if workload can schedule based on the cluster resources (tpu_type and maximum VM in cluster). + """Check if workload can schedule based on the cluster resources. + + This function validates that the resource requested by the user exists in the + cluster's resource manifest (a ConfigMap) and that the requested quantity + (e.g., number of VMs) does not exceed the available quantity. Args: - args: user provided arguments for running the command. - system: system characteristics + args: User-provided arguments for running the command. + system: System characteristics derived from the user's request. Returns: - returns true if workload can schedule, otherwise returns false. + True if the workload can be scheduled, otherwise False. """ resources_configmap_name = f'{args.cluster}-{CLUSTER_RESOURCES_CONFIGMAP}' cluster_config_map = get_cluster_configmap(args, resources_configmap_name) - # Prevents workload creation failure for existing clusters with no ConfigMap + # If no ConfigMap exists, we cannot validate, so we optimistically proceed. + # This maintains compatibility with older cluster setups. if cluster_config_map is None: xpk_print( - 'No ConfigMap exist for cluster with the name' - f' {resources_configmap_name}.' + f"Warning: Could not find resource ConfigMap '{resources_configmap_name}'. " + "Proceeding without resource validation." ) return True - # Check for gke accelerator type: - missing_gke_accelerator_type = False - if not cluster_config_map.get(system.gke_accelerator): + # The user-facing device type (e.g., 'v5litepod-32') is the single source + # of truth for identifying the resource in the cluster's manifest. + user_facing_device_type = args.tpu_type if args.tpu_type else args.device_type + + # --- Primary Validation --- + # Check if the cluster's resource manifest contains an entry for the exact + # device type the user requested. This is the only reliable existence check. + if user_facing_device_type not in cluster_config_map: xpk_print( - f'Gke Accelerator Type Check: {args.workload} is requesting' - f' {system.gke_accelerator} but cluster only contains' - f' {cluster_config_map.keys()}. ' + f"Device Type Check Failed: Workload '{args.workload}' is requesting " + f"device type '{user_facing_device_type}', but the cluster's resource " + f"manifest only contains entries for: {list(cluster_config_map.keys())}. " + "The cluster may not be provisioned with this hardware type." ) - missing_gke_accelerator_type = True - elif ( - cluster_config_map[system.gke_accelerator] + return False + + # --- Quantity Validation --- + + # Handle autoprovisioning capacity checks. + if ( + cluster_config_map[user_facing_device_type] == AUTOPROVISIONING_CONFIG_VALUE ): - # Run total chip check when in autoprovisioning mode. max_chips_in_cluster = int( - cluster_config_map[AUTOPROVISIONING_CONFIG_MAXIMUM_KEY] + cluster_config_map.get(AUTOPROVISIONING_CONFIG_MAXIMUM_KEY, 0) ) num_chips_in_workload = get_total_chips_requested_from_args(args, system) if num_chips_in_workload > max_chips_in_cluster: xpk_print( - f'{args.workload} is requesting {num_chips_in_workload} chips but' - f' the cluster {args.cluster} supports up to {max_chips_in_cluster}.' - ' Resize the cluster to support more chips with' - ' `xpk cluster create --autoprovisioning-max-chips=X ...`' + f"Chip Request Exceeds Limit: Workload '{args.workload}' requests " + f"{num_chips_in_workload} chips, but the autoprovisioning cluster " + f"'{args.cluster}' is configured for a maximum of {max_chips_in_cluster} chips." ) return False - return True + return True # For autoprovisioning, chip count is sufficient. - # Check for device type - missing_device_type = False - device_type = system.device_type - if device_type not in cluster_config_map: - xpk_print( - f'Device Type Check: {args.workload} is requesting {device_type} but ' - f'cluster only contains {cluster_config_map.keys()}. ' - ) - missing_device_type = True + # For statically-sized clusters, check if the number of requested VMs fits. + max_vm_in_cluster = int(cluster_config_map[user_facing_device_type]) + if system.accelerator_type == AcceleratorType['GPU']: + vm_required_by_workload = args.num_nodes + else: + vm_required_by_workload = args.num_slices * system.vms_per_slice - if missing_device_type and missing_gke_accelerator_type: + if vm_required_by_workload > max_vm_in_cluster: xpk_print( - 'Both Device Type and GKE Accelerator Type checks failed.' - f' XPK will not create the workload {args.workload}.' + f"VM Request Exceeds Capacity: Workload '{args.workload}' requests " + f"{vm_required_by_workload} VMs for {args.num_slices} slice(s) of type " + f"'{user_facing_device_type}', but the cluster only has " + f"{max_vm_in_cluster} VMs of that type available." ) return False - else: - # Check if the size of the workload will fit in the cluster. - max_vm_in_cluster = int(cluster_config_map[device_type]) - if system.accelerator_type == AcceleratorType['GPU']: - vm_required_by_workload = args.num_nodes - else: - vm_required_by_workload = args.num_slices * system.vms_per_slice - if vm_required_by_workload > max_vm_in_cluster: - xpk_print( - f'{args.workload} is requesting {args.num_slices} slice/slices of' - f' {device_type}, which is {vm_required_by_workload} VMs, but the' - f' cluster only contains {max_vm_in_cluster} VMs of {device_type}.' - ' XPK will not create this workload.' - ) - return False return True