Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion goldens/Workload_create.txt
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ docker tag dry-run-runner gcr.io/golden-project/dry-run-runner:prefix-current
[XPK] Task: `Upload Docker Image` is implemented by the following command not running since it is a dry run.
docker push gcr.io/golden-project/dry-run-runner:prefix-current
[XPK] Task: `Creating Workload` is implemented by the following command not running since it is a dry run.
kubectl apply -f abc33690f7a11b2ba50a8f949970fd3ba812f088367b7f64260729f01f41a231
kubectl apply -f e21c8ebdc21d15a852187058c096898c486d3b1066e67dcfb67e5052a1d0a7fa
[XPK] Task: `GKE Dashboard List` is implemented by the following command not running since it is a dry run.
gcloud monitoring dashboards list --project=golden-project --filter="displayName:'GKE - TPU Monitoring Dashboard'" --format="value(name)" --verbosity=error
[XPK] Check statistics and outlier mode of GKE metrics here: https://console.cloud.google.com/monitoring/dashboards/builder/0?project=golden-project&f.rlabel.cluster_name.ClusterName=golden-cluster. To view the metric data for your workload, select golden-workload from the JobName filter on the dashboard.
Expand Down
2 changes: 1 addition & 1 deletion goldens/Workload_create_pathways.txt
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ docker tag dry-run-runner gcr.io/golden-project/dry-run-runner:prefix-current
[XPK] Task: `Upload Docker Image` is implemented by the following command not running since it is a dry run.
docker push gcr.io/golden-project/dry-run-runner:prefix-current
[XPK] Task: `Creating Workload` is implemented by the following command not running since it is a dry run.
kubectl apply -f 574963d6d441695d681ff94ad241e713559f64b4ce519f4f1e0708c659f1c25d
kubectl apply -f 6fb0f350cf4e0dccc71f77392d12db3de6371d5148519657046613794358bfce
[XPK] Task: `GKE Dashboard List` is implemented by the following command not running since it is a dry run.
gcloud monitoring dashboards list --project=golden-project --filter="displayName:'GKE - TPU Monitoring Dashboard'" --format="value(name)" --verbosity=error
[XPK] Check statistics and outlier mode of GKE metrics here: https://console.cloud.google.com/monitoring/dashboards/builder/0?project=golden-project&f.rlabel.cluster_name.ClusterName=golden-cluster. To view the metric data for your workload, select golden-workload from the JobName filter on the dashboard.
Expand Down
28 changes: 28 additions & 0 deletions src/xpk/commands/workload.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@
get_cpu_affinity,
get_gpu_scheduler,
create_sub_slicing_annotations,
create_placement_policy_label,
is_placement_policy_supported,
)
from ..core.storage import (
GCE_PD_TYPE,
Expand Down Expand Up @@ -143,6 +145,7 @@
nodeSelector:
{accelerator_label}
{machine_label}
{placement_policy_label}
{autoprovisioning_args}
priorityClassName: {args.priority}
hostNetwork: true
Expand Down Expand Up @@ -192,6 +195,8 @@
{gpu_scheduler}
priorityClassName: {args.priority}
restartPolicy: Never
nodeSelector:
{placement_policy_label}
imagePullSecrets:
- name: {args.docker_image_pull_secret}
hostNetwork: true
Expand Down Expand Up @@ -237,6 +242,8 @@
spec:
priorityClassName: {args.priority}
restartPolicy: Never
nodeSelector:
{placement_policy_label}
imagePullSecrets:
- name: {args.docker_image_pull_secret}
dnsPolicy: ClusterFirstWithHostNet
Expand Down Expand Up @@ -272,6 +279,7 @@
terminationGracePeriodSeconds: {args.termination_grace_period_seconds}
priorityClassName: {args.priority}
nodeSelector:
{placement_policy_label}
{autoprovisioning_args}
pathwaysDir: {args.pathways_gcs_location} #This bucket needs to be created in advance.
controller:
Expand Down Expand Up @@ -517,6 +525,11 @@ def workload_create(args) -> None:
failure_policy_rules=failure_policy_rules,
pod_failure_policy=pod_failure_policy,
annotations=annotations,
placement_policy_label=(
create_placement_policy_label(system)
if is_placement_policy_supported(system)
else ''
),
)

sub_networks = get_cluster_subnetworks()
Expand All @@ -541,6 +554,11 @@ def workload_create(args) -> None:
service_account=service_account,
failure_policy_rules=failure_policy_rules,
pod_failure_policy=pod_failure_policy,
placement_policy_label=(
create_placement_policy_label(system)
if is_placement_policy_supported(system)
else ''
),
)

elif args.use_pathways and ensure_pathways_workload_prerequisites(
Expand All @@ -558,6 +576,11 @@ def workload_create(args) -> None:
user_workload=get_user_workload_for_pathways(args, system),
local_queue_name=LOCAL_QUEUE_NAME,
autoprovisioning_args=autoprovisioning_args,
placement_policy_label=(
create_placement_policy_label(system)
if is_placement_policy_supported(system)
else ''
),
)
else:
container, debugging_dashboard_id = get_user_workload_container(
Expand Down Expand Up @@ -585,6 +608,11 @@ def workload_create(args) -> None:
create_sub_slicing_annotations(args.sub_slicing_topology)
)
),
placement_policy_label=(
create_placement_policy_label(system)
if is_placement_policy_supported(system)
else ''
),
machine_label=create_machine_label(system.accelerator_type, system),
local_queue_name=LOCAL_QUEUE_NAME,
autoprovisioning_args=autoprovisioning_args,
Expand Down
8 changes: 3 additions & 5 deletions src/xpk/core/nodepool.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

from typing import List
from ..utils.console import ask_for_user_consent, xpk_print
from ..utils.topology import is_topology_valid
from .scheduling import get_placement_policy_name, is_placement_policy_supported
from .capacity import (
AUTOPROVISIONING_CONFIG_VALUE,
H100_MEGA_DEVICE_TYPE,
Expand Down Expand Up @@ -258,10 +258,8 @@ def run_gke_node_pool_create_command(
return 1

placement_args = ''
if system.requires_workload_policy and is_topology_valid(system.topology):
placement_policy = (
f'{system.device_type}-{system.topology}-placement-policy'
)
if is_placement_policy_supported(system):
placement_policy = get_placement_policy_name(system)
ensure_resource_policy_exists(placement_policy, args, system.topology)
placement_args = f' --placement-policy={placement_policy}'

Expand Down
22 changes: 12 additions & 10 deletions src/xpk/core/nodepool_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,21 +146,23 @@ def mock_nodepool_dependencies(mocker):
)
mocker.patch("xpk.core.nodepool.run_commands", return_value=0)
mocker.patch("xpk.core.nodepool.ask_for_user_consent", return_value=True)
mock_is_topology_valid = mocker.patch("xpk.core.nodepool.is_topology_valid")
mock_is_placement_policy_supported = mocker.patch(
"xpk.core.nodepool.is_placement_policy_supported"
)
mock_ensure_resource_policy = mocker.patch(
"xpk.core.nodepool.ensure_resource_policy_exists"
)
return mock_is_topology_valid, mock_ensure_resource_policy
return mock_is_placement_policy_supported, mock_ensure_resource_policy


def test_placement_policy_created_for_gpu_with_valid_topology(
mocker, mock_nodepool_dependencies
):
"""Tests that placement policy is created for GPUs with a valid topology."""
mock_is_topology_valid, mock_ensure_resource_policy = (
mock_is_placement_policy_supported, mock_ensure_resource_policy = (
mock_nodepool_dependencies
)
mock_is_topology_valid.return_value = True
mock_is_placement_policy_supported.return_value = True
args = mocker.Mock(
tpu_type=None,
device_type="h100-80gb-8",
Expand Down Expand Up @@ -188,10 +190,10 @@ def test_placement_policy_not_created_for_gpu_with_invalid_topology(
mocker, mock_nodepool_dependencies
):
"""Tests that placement policy is not created for GPUs with an invalid topology."""
mock_is_topology_valid, mock_ensure_resource_policy = (
mock_is_placement_policy_supported, mock_ensure_resource_policy = (
mock_nodepool_dependencies
)
mock_is_topology_valid.return_value = False
mock_is_placement_policy_supported.return_value = False
args = mocker.Mock(
tpu_type=None,
device_type="h100-80gb-8",
Expand All @@ -218,10 +220,10 @@ def test_placement_policy_created_for_tpu7x_with_valid_topology(
mocker, mock_nodepool_dependencies
):
"""Tests that placement policy is created for tpu7x with a valid topology."""
mock_is_topology_valid, mock_ensure_resource_policy = (
mock_is_placement_policy_supported, mock_ensure_resource_policy = (
mock_nodepool_dependencies
)
mock_is_topology_valid.return_value = True
mock_is_placement_policy_supported.return_value = True
args = mocker.Mock(
tpu_type="tpu7x-8",
device_type=None,
Expand Down Expand Up @@ -251,10 +253,10 @@ def test_placement_policy_not_created_for_non7x_tpu(
mocker, mock_nodepool_dependencies
):
"""Tests that placement policy is not created for non-tpu7x TPUs."""
mock_is_topology_valid, mock_ensure_resource_policy = (
mock_is_placement_policy_supported, mock_ensure_resource_policy = (
mock_nodepool_dependencies
)
mock_is_topology_valid.return_value = True
mock_is_placement_policy_supported.return_value = False
args = mocker.Mock(
tpu_type="v6e",
device_type=None,
Expand Down
14 changes: 14 additions & 0 deletions src/xpk/core/scheduling.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
"""

from ..utils.console import xpk_print
from ..utils.topology import is_topology_valid
from ..utils.execution_context import is_dry_run
from .capacity import AUTOPROVISIONING_CONFIG_MAXIMUM_KEY, AUTOPROVISIONING_CONFIG_VALUE
from .resources import CLUSTER_RESOURCES_CONFIGMAP, get_cluster_configmap
Expand Down Expand Up @@ -303,3 +304,16 @@ def create_sub_slicing_annotations(sub_slicing_topology: str) -> list[str]:
),
f'cloud.google.com/gke-tpu-slice-topology: {sub_slicing_topology}',
]


def create_placement_policy_label(system: SystemCharacteristics) -> str:
name = get_placement_policy_name(system)
return f'cloud.google.com/placement-policy-name: {name}'


def get_placement_policy_name(system: SystemCharacteristics) -> str:
return f'{system.device_type}-{system.topology}-placement-policy'


def is_placement_policy_supported(system: SystemCharacteristics) -> bool:
return system.requires_workload_policy and is_topology_valid(system.topology)
83 changes: 82 additions & 1 deletion src/xpk/core/scheduling_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
limitations under the License.
"""

from .scheduling import create_sub_slicing_annotations
from .scheduling import create_sub_slicing_annotations, create_placement_policy_label, get_placement_policy_name, is_placement_policy_supported
from .system_characteristics import SystemCharacteristics, AcceleratorType


def test_create_sub_slicing_annotations_returns_valid_annotations():
Expand All @@ -29,3 +30,83 @@ def test_create_sub_slicing_annotations_returns_valid_annotations():
),
'cloud.google.com/gke-tpu-slice-topology: 2x2',
]


def test_create_placement_policy_label_returns_valid_label():
system_characteristics = SystemCharacteristics(
chips_per_vm=1,
gce_machine_type='tpu7x-standard-1t',
gke_accelerator='tpu7x',
requires_workload_policy=False,
topology='1x1x1',
vms_per_slice=1,
device_type='tpu7x',
accelerator_type=AcceleratorType.TPU,
supports_sub_slicing=False,
)
label = create_placement_policy_label(system_characteristics)
assert (
label
== 'cloud.google.com/placement-policy-name: tpu7x-1x1x1-placement-policy'
)


def test_get_placement_policy_name_returns_valid_name():
system_characteristics = SystemCharacteristics(
chips_per_vm=1,
gce_machine_type='tpu7x-standard-1t',
gke_accelerator='tpu7x',
requires_workload_policy=False,
topology='1x1x1',
vms_per_slice=1,
device_type='tpu7x',
accelerator_type=AcceleratorType.TPU,
supports_sub_slicing=False,
)
name = get_placement_policy_name(system_characteristics)
assert name == 'tpu7x-1x1x1-placement-policy'


def test_is_placement_policy_supported_returns_true_for_system_characteristics_supporting_workload_policy_and_having_valid_topology():
system_characteristics = SystemCharacteristics(
chips_per_vm=1,
gce_machine_type='tpu7x-standard-1t',
gke_accelerator='tpu7x',
requires_workload_policy=True,
topology='1x1x1',
vms_per_slice=1,
device_type='tpu7x',
accelerator_type=AcceleratorType.TPU,
supports_sub_slicing=False,
)
assert is_placement_policy_supported(system_characteristics) is True


def test_is_placement_policy_supported_returns_false_for_system_characteristics_not_supporting_workload_policy_and_having_valid_topology():
system_characteristics = SystemCharacteristics(
chips_per_vm=1,
gce_machine_type='tpu7x-standard-1t',
gke_accelerator='tpu7x',
requires_workload_policy=False,
topology='1x1x1',
vms_per_slice=1,
device_type='tpu7x',
accelerator_type=AcceleratorType.TPU,
supports_sub_slicing=False,
)
assert is_placement_policy_supported(system_characteristics) is False


def test_is_placement_policy_supported_returns_false_for_system_characteristics_supporting_workload_policy_and_having_invalid_topology():
system_characteristics = SystemCharacteristics(
chips_per_vm=1,
gce_machine_type='tpu7x-standard-1t',
gke_accelerator='tpu7x',
requires_workload_policy=True,
topology='aaa',
vms_per_slice=1,
device_type='tpu7x',
accelerator_type=AcceleratorType.TPU,
supports_sub_slicing=False,
)
assert is_placement_policy_supported(system_characteristics) is False
Loading