diff --git a/.github/workflows/kfp-integration-tests.yaml b/.github/workflows/kfp-integration-tests.yaml deleted file mode 100644 index 4b075ebd4..000000000 --- a/.github/workflows/kfp-integration-tests.yaml +++ /dev/null @@ -1,34 +0,0 @@ -name: KFP Integration Tests - -on: - push: - branches: - - main - pull_request: - -jobs: - kfp-launch: - runs-on: inux.24_04.16x - steps: - - name: Setup Python - uses: actions/setup-python@v2 - with: - python-version: "3.10" - architecture: x64 - - name: Checkout TorchX - uses: actions/checkout@v2 - - name: Install dependencies - run: | - set -eux - pip install -r dev-requirements.txt - python setup.py install - - name: Start Kubernetes - run: | - scripts/setup_minikube.sh - scripts/setup_kfp.sh - - - name: Run KFP Integration Tests - env: - KFP_NAMESPACE: kubeflow - INTEGRATION_TEST_STORAGE: torchx_minio://torchx/tests - run: scripts/kfpint.py --container_repo localhost:5000/torchx diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 3caffe486..ca42445f1 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -39,11 +39,6 @@ lintrunner init lintrunner -a ``` -## Integration Tests - -See the [KFP integration test](scripts/kfpint.py) file for more details on setup -and running them. - ## License By contributing to TorchX, you agree that your contributions will be licensed under the LICENSE file in the root directory of this source tree. diff --git a/README.md b/README.md index 1057bb370..50291cb93 100644 --- a/README.md +++ b/README.md @@ -55,9 +55,6 @@ pip install torchx # install torchx sdk and CLI -- all dependencies pip install "torchx[dev]" -# install torchx kubeflow pipelines (kfp) support -pip install "torchx[kfp]" - # install torchx Kubernetes / Volcano support pip install "torchx[kubernetes]" diff --git a/dev-requirements.txt b/dev-requirements.txt index 0abe53c1d..707de45ca 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -12,9 +12,6 @@ google-cloud-logging==3.10.0 google-cloud-runtimeconfig==0.34.0 hydra-core ipython -kfp==1.8.22 -# pin protobuf to the version that is required by kfp -protobuf==3.20.3 mlflow-skinny moto~=5.0.8 pyre-extensions @@ -45,4 +42,3 @@ grpcio==1.62.1 grpcio-status==1.48.1 googleapis-common-protos==1.63.0 google-api-core==2.18.0 -protobuf==3.20.3 # kfp==1.8.22 needs protobuf < 4 diff --git a/docs/source/basics.rst b/docs/source/basics.rst index 2673839e7..34d01d7c8 100644 --- a/docs/source/basics.rst +++ b/docs/source/basics.rst @@ -14,8 +14,7 @@ The top level modules in TorchX are: 4. :mod:`torchx.cli`: CLI tool 5. :mod:`torchx.runner`: given an app spec, submits the app as a job on a scheduler 6. :mod:`torchx.schedulers`: backend job schedulers that the runner supports -7. :mod:`torchx.pipelines`: adapters that convert the given app spec to a "stage" in an ML pipeline platform -8. :mod:`torchx.runtime`: util and abstraction libraries you can use in authoring apps (not app spec) +7. :mod:`torchx.runtime`: util and abstraction libraries you can use in authoring apps (not app spec) Below is a UML diagram @@ -32,8 +31,7 @@ the actual application. In scheduler lingo, this is a ``JobDefinition`` and a similar concept in Kubernetes is the ``spec.yaml``. To disambiguate between the application binary (logic) and the spec, we typically refer to a TorchX ``AppDef`` as an "app spec" or ``specs.AppDef``. It -is the common interface understood by ``torchx.runner`` -and ``torchx.pipelines`` allowing you to run your app as a standalone job +is the common interface understood by ``torchx.runner`` allowing you to run your app as a standalone job or as a stage in an ML pipeline. Below is a simple example of an ``specs.AppDef`` that echos "hello world" @@ -119,10 +117,6 @@ can be achieved through python function composition rather than object compositi However **we do not recommend component composition** for maintainability purposes. -**PROTIP 2:** To define dependencies between components, use a pipelining DSL. -See :ref:`basics:Pipeline Adapters` section below to understand how TorchX components -are used in the context of pipelines. - Before authoring your own component, browse through the library of :ref:`Components` that are included with TorchX to see if one fits your needs. @@ -141,34 +135,11 @@ There are two ways to access runners in TorchX: See :ref:`Schedulers` for a list of schedulers that the runner can launch apps to. -Pipeline Adapters -~~~~~~~~~~~~~~~~~~~~~~ -While runners launch components as standalone jobs, ``torchx.pipelines`` -makes it possible to plug components into an ML pipeline/workflow. For a -specific target pipeline platform (e.g. kubeflow pipelines), TorchX -defines an adapter that converts a TorchX app spec to whatever the -"stage" representation is in the target platform. For instance, -``torchx.pipelines.kfp`` adapter for kubeflow pipelines converts an -app spec to a ``kfp.ContainerOp`` (or more accurately, a kfp "component spec" yaml). - - -In most cases an app spec would map to a "stage" (or node) in a pipeline. -However advanced components, especially those that have a mini control flow -of its own (e.g. HPO), may map to a "sub-pipeline" or an "inline-pipeline". -The exact semantics of how these advanced components map to the pipeline -is dependent on the target pipeline platform. For example, if the -pipeline DSL allows dynamically adding stages to a pipeline from an upstream -stage, then TorchX may take advantage of such feature to "inline" the -sub-pipeline to the main pipeline. TorchX generally tries its best to adapt -app specs to the **most canonical** representation in the target pipeline platform. - -See :ref:`Pipelines` for a list of supported pipeline platforms. - Runtime ~~~~~~~~ .. important:: ``torchx.runtime`` is by no means is a requirement to use TorchX. If your infrastructure is fixed and you don't need your application - to be portable across different types of schedulers and pipelines, + to be portable across different types of schedulers, you can skip this section. Your application (not the app spec, but the actual app binary) has **ZERO** dependencies diff --git a/docs/source/conf.py b/docs/source/conf.py index 33e4e6fbd..0d56bb678 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -341,7 +341,7 @@ def handle_item(fieldarg, content): code_url = f"https://github.com/pytorch/torchx/archive/refs/heads/{notebook_version}.tar.gz" first_notebook_cell = f""" -!pip install torchx[kfp] +!pip install torchx !wget --no-clobber {code_url} !tar xf {notebook_version}.tar.gz --strip-components=1 @@ -351,7 +351,6 @@ def handle_item(fieldarg, content): sphinx_gallery_conf = { "examples_dirs": [ "../../torchx/examples/apps", - "../../torchx/examples/pipelines", ], "gallery_dirs": [ "examples_apps", diff --git a/docs/source/index.rst b/docs/source/index.rst index 484f92f83..2c4ebee1f 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -4,8 +4,6 @@ TorchX ================== TorchX is a universal job launcher for PyTorch applications. -TorchX is designed to have fast iteration time for training/research and support -for E2E production ML pipelines when you're ready. **GETTING STARTED?** Follow the :ref:`quickstart guide`. @@ -91,14 +89,6 @@ Works With schedulers/fb/* -.. _Pipelines: -.. toctree:: - :maxdepth: 1 - :caption: Pipelines - - pipelines/kfp - pipelines/airflow.md - .. fbcode:: .. toctree:: @@ -116,7 +106,6 @@ Examples :caption: Examples examples_apps/index - examples_pipelines/index Components Library @@ -165,7 +154,6 @@ Reference runner schedulers workspace - pipelines .. toctree:: :maxdepth: 1 diff --git a/docs/source/pipelines.rst b/docs/source/pipelines.rst deleted file mode 100644 index 5569a2d37..000000000 --- a/docs/source/pipelines.rst +++ /dev/null @@ -1,15 +0,0 @@ -torchx.pipelines -================ - -.. automodule:: torchx.pipelines -.. currentmodule:: torchx.pipelines - -All Pipelines -~~~~~~~~~~~~~~~~ - -.. toctree:: - :maxdepth: 1 - :glob: - - pipelines/* - diff --git a/docs/source/pipelines/airflow.md b/docs/source/pipelines/airflow.md deleted file mode 100644 index a0861d0b5..000000000 --- a/docs/source/pipelines/airflow.md +++ /dev/null @@ -1,104 +0,0 @@ ---- -jupyter: - jupytext: - text_representation: - extension: .md - format_name: markdown - format_version: '1.3' - jupytext_version: 1.13.7 - kernelspec: - display_name: Python 3 - language: python - name: python3 ---- - -# Airflow - -For pipelines that support Python based execution you can directly use the -TorchX API. TorchX is designed to be easily integrated in to other applications -via the programmatic API. No special Airflow integrations are needed. - -With TorchX, you can use Airflow for the pipeline orchestration and run your -PyTorch application (i.e. distributed training) on a remote GPU cluster. - -```python -import datetime -import pendulum - -from airflow.utils.state import DagRunState, TaskInstanceState -from airflow.utils.types import DagRunType -from airflow.models.dag import DAG -from airflow.decorators import task - - -DATA_INTERVAL_START = pendulum.datetime(2021, 9, 13, tz="UTC") -DATA_INTERVAL_END = DATA_INTERVAL_START + datetime.timedelta(days=1) -``` - -To launch a TorchX job from Airflow you can create a Airflow Python task to -import the runner, launch the job and wait for it to complete. If you're running -on a remote cluster you may need to use the virtualenv task to install the -`torchx` package. - -```python -@task(task_id=f'hello_torchx') -def run_torchx(message): - """This is a function that will run within the DAG execution""" - from torchx.runner import get_runner - with get_runner() as runner: - # Run the utils.sh component on the local_cwd scheduler. - app_id = runner.run_component( - "utils.sh", - ["echo", message], - scheduler="local_cwd", - ) - - # Wait for the the job to complete - status = runner.wait(app_id, wait_interval=1) - - # Raise_for_status will raise an exception if the job didn't succeed - status.raise_for_status() - - # Finally we can print all of the log lines from the TorchX job so it - # will show up in the workflow logs. - for line in runner.log_lines(app_id, "sh", k=0): - print(line, end="") -``` - -Once we have the task defined we can put it into a Airflow DAG and run it like -normal. - -```python -from torchx.schedulers.ids import make_unique - -with DAG( - dag_id=make_unique('example_python_operator'), - schedule_interval=None, - start_date=DATA_INTERVAL_START, - catchup=False, - tags=['example'], -) as dag: - run_job = run_torchx("Hello, TorchX!") - - -dagrun = dag.create_dagrun( - state=DagRunState.RUNNING, - execution_date=DATA_INTERVAL_START, - data_interval=(DATA_INTERVAL_START, DATA_INTERVAL_END), - start_date=DATA_INTERVAL_END, - run_type=DagRunType.MANUAL, -) -ti = dagrun.get_task_instance(task_id="hello_torchx") -ti.task = dag.get_task(task_id="hello_torchx") -ti.run(ignore_ti_state=True) -assert ti.state == TaskInstanceState.SUCCESS -``` - -If all goes well you should see `Hello, TorchX!` printed above. - -## Next Steps - -* Checkout the [runner API documentation](../runner.rst) to learn more about - programmatic usage of TorchX -* Browse through the collection of [builtin components](../components/overview.rst) - which can be used in your Airflow pipeline diff --git a/docs/source/pipelines/kfp.rst b/docs/source/pipelines/kfp.rst deleted file mode 100644 index cabb04493..000000000 --- a/docs/source/pipelines/kfp.rst +++ /dev/null @@ -1,24 +0,0 @@ -Kubeflow Pipelines -====================== - -TorchX provides an adapter to run TorchX components as part of Kubeflow -Pipelines. See :ref:`examples_pipelines/index:KubeFlow Pipelines Examples`. - -.. image:: kfp_diagram.jpg - -torchx.pipelines.kfp -##################### - -.. image:: pipeline_kfp_diagram.png - -.. automodule:: torchx.pipelines.kfp -.. currentmodule:: torchx.pipelines.kfp - -.. currentmodule:: torchx.pipelines.kfp.adapter - -.. autofunction:: container_from_app -.. autofunction:: resource_from_app -.. autofunction:: component_from_app -.. autofunction:: component_spec_from_app - -.. autoclass:: ContainerFactory diff --git a/docs/source/pipelines/kfp_diagram.jpg b/docs/source/pipelines/kfp_diagram.jpg deleted file mode 100644 index 1fea3f1f0..000000000 Binary files a/docs/source/pipelines/kfp_diagram.jpg and /dev/null differ diff --git a/docs/source/pipelines/pipeline_kfp_diagram.png b/docs/source/pipelines/pipeline_kfp_diagram.png deleted file mode 100644 index e69de29bb..000000000 diff --git a/docs/source/quickstart.md b/docs/source/quickstart.md index c3aeae2e1..3816783e7 100644 --- a/docs/source/quickstart.md +++ b/docs/source/quickstart.md @@ -236,5 +236,4 @@ The `slurm` and `local_cwd` use the current environment so you can use `pip` and 1. Checkout other features of the [torchx CLI](cli.rst) 2. Take a look at the [list of schedulers](schedulers.rst) supported by the runner 3. Browse through the collection of [builtin components](components/overview.rst) -4. See which [ML pipeline platforms](pipelines.rst) you can run components on 5. See a [training app example](examples_apps/index.rst) diff --git a/scripts/kfpint.py b/scripts/kfpint.py deleted file mode 100755 index dd9cc6eac..000000000 --- a/scripts/kfpint.py +++ /dev/null @@ -1,282 +0,0 @@ -#!/usr/bin/env python3 -# Copyright (c) Meta Platforms, Inc. and affiliates. -# All rights reserved. -# -# This source code is licensed under the BSD-style license found in the -# LICENSE file in the root directory of this source tree. - -# pyre-strict - -""" -This file runs the KFP integration tests on KFP cluster. There's a number of -environment variables that need to be setup as well as the cluster. - -See examples/pipelines/kfp/ for more information on how the cluster is used. - -Cluster setup: - -You'll need a KubeFlow Pipelines cluster with a torchserve instance with svc -name torchserve on the default namespace. - -* https://www.kubeflow.org/docs/started/installing-kubeflow/ -* https://github.com/pytorch/serve/blob/master/kubernetes/README.md - -Environment variables: - -``` -export KFP_NAMESPACE= -export INTEGRATION_TEST_STORAGE= -export TORCHX_CONTAINER_REPO= -``` - -Once you have everything setup you can just run: - -scripts/kfpint.py - - -""" - -import argparse -import asyncio -import dataclasses -import json -import os -import os.path -import shutil -import subprocess -import tempfile -import time -from contextlib import contextmanager -from typing import Any, Callable, Iterator, Optional, TypeVar - -import kfp - -from integ_test_utils import ( - build_images, - BuildInfo, - getenv_asserts, - MissingEnvError, - push_images, - run, - run_in_bg, -) -from torchx.util.types import none_throws -from urllib3.exceptions import MaxRetryError - -T = TypeVar("T") - - -def get_fn_name(fn: Callable[..., T]) -> str: - if hasattr(fn, "__qualname__"): - return fn.__qualname__ - elif hasattr(fn, "__name__"): - return fn.__name__ - else: - return "undefined" - - -def retry(f: Callable[..., T]) -> Callable[..., T]: - retries: int = 5 - backoff: int = 3 - - def wrapper(*args: Any, **kwargs: Any) -> T: - curr_retries = 0 - while True: - try: - return f(*args, **kwargs) - except: # noqa: B001 E722 - if curr_retries == retries: - raise - else: - sleep = backoff * 2**curr_retries - fn_name = get_fn_name(f) - print(f"retrying `{fn_name}` request after {sleep} seconds") - time.sleep(sleep) - curr_retries += 1 - continue - - return wrapper - - -@retry -def get_client(host: str) -> kfp.Client: - return kfp.Client(host=f"{host}/pipeline") - - -def get_free_port() -> int: - return 32001 - - -def enable_port_forward(local_port: int) -> "Optional[subprocess.Popen[str]]": - # Enable port forward via running background process. - # Kubernetes python does not support a clean way of - # Kubernetes python cli provides a socket, more info: - # https://github.com/kubernetes-client/python/blob/master/examples/pod_portforward.py - # The drawback of this method is that we have to monkey patch - # the urllib, which is used by the kfp client. - # This approach is more cleaner than to use the python cli directly. - try: - namespace = getenv_asserts("KFP_NAMESPACE") - except MissingEnvError: - print("Skipping port forward due to workflow executed without env variable") - return None - return run_in_bg( - "kubectl", - "port-forward", - "-n", - namespace, - "svc/ml-pipeline-ui", - f"{local_port}:80", - ) - - -META_FILE = "meta" -IMAGES_FILE = "images.tar.zst" - - -def save_advanced_pipeline_spec(path: str, build: BuildInfo) -> None: - print("generating advanced_pipeline spec") - - id = build.id - torchx_image = build.torchx_image - - STORAGE_PATH = os.getenv("INTEGRATION_TEST_STORAGE", "/tmp/storage") - root = os.path.join(STORAGE_PATH, id) - output = os.path.join(root, "output") - - save_pipeline_spec( - path, - "advanced_pipeline.py", - "--output_path", - output, - "--image", - torchx_image, - "--model_name", - f"tiny_image_net_{id}", - ) - - -def save_pipeline_spec(path: str, pipeline_file: str, *args: str) -> None: - print(f"generating pipeline spec for {pipeline_file}") - - with tempfile.TemporaryDirectory() as tmpdir: - run(os.path.join("torchx/examples/pipelines/kfp", pipeline_file), *args) - shutil.copy("pipeline.yaml", path) - - -@contextmanager -def path_or_tmp(path: Optional[str]) -> Iterator[str]: - if path: - os.makedirs(path, exist_ok=True) - yield path - else: - with tempfile.TemporaryDirectory() as tmpdir: - yield tmpdir - - -def _connection_error_message() -> str: - kfp_host = getenv_asserts("KFP_HOST") - return f""" - Unable to connect to kfp cluster using {kfp_host}. - Check that `kubectl` has proper credentials to execute port forward - """ - - -def save_build(path: str, build: BuildInfo) -> None: - meta_path = os.path.join(path, META_FILE) - with open(meta_path, "wt") as f: - json.dump(dataclasses.asdict(build), f) - - -def run_pipeline(build: BuildInfo, pipeline_file: str) -> object: - print(f"launching pipeline {pipeline_file}") - HOST: str = getenv_asserts("KFP_HOST") - - try: - client = get_client(HOST) - except MaxRetryError: - print(_connection_error_message()) - raise - resp = client.create_run_from_pipeline_package( - pipeline_file, - arguments={}, - experiment_name="integration-tests", - run_name=f"integration test {build.id} - {os.path.basename(pipeline_file)}", - ) - ui_url = f"{HOST}/#/runs/details/{resp.run_id}" - print(f"{resp.run_id} - launched! view run at {ui_url}") - return resp - - -def wait_for_pipeline( - resp: Any, # pyre-fixme: KFP doesn't have a response type -) -> None: - print(f"{resp.run_id} - waiting for completion") - result = resp.wait_for_run_completion( - timeout=1 * 60 * 60, - ) # 1 hour - print(f"{resp.run_id} - finished: {result}") - assert result.run.status == "Succeeded", "run didn't succeed" - - -async def exec_job() -> None: - parser = argparse.ArgumentParser(description="kfp integration test runner") - parser.add_argument( - "--path", - type=str, - help="path to place the files", - ) - parser.add_argument( - "--load", - help="if specified load the build from path instead of building", - action="store_true", - ) - parser.add_argument( - "--save", - help="if specified save the build to path and exit", - action="store_true", - ) - parser.add_argument("--container_repo", type=str) - args = parser.parse_args() - - with path_or_tmp(args.path) as path: - advanced_pipeline_file = os.path.join(path, "advanced_pipeline.yaml") - intro_pipeline_file = os.path.join(path, "intro_pipeline.yaml") - dist_pipeline_file = os.path.join(path, "dist_pipeline.yaml") - build = build_images() - try: - push_images(build, container_repo=args.container_repo) - except MissingEnvError as e: - print(f"Missing environments, only building: {e}") - return - finally: - save_advanced_pipeline_spec(advanced_pipeline_file, build) - save_pipeline_spec(intro_pipeline_file, "intro_pipeline.py") - save_pipeline_spec(dist_pipeline_file, "dist_pipeline.py") - - pipeline_files = [ - advanced_pipeline_file, - intro_pipeline_file, - dist_pipeline_file, - ] - responses = [ - run_pipeline(build, pipeline_file) for pipeline_file in pipeline_files - ] - for response in responses: - wait_for_pipeline(response) - - -def main() -> None: - port = get_free_port() - kfp_host = f"http://localhost:{port}" - os.environ["KFP_HOST"] = kfp_host - port_forward_proc = enable_port_forward(port) - try: - asyncio.run(exec_job()) - finally: - if port_forward_proc: - none_throws(port_forward_proc).kill() - - -if __name__ == "__main__": - main() diff --git a/scripts/setup_kfp.sh b/scripts/setup_kfp.sh deleted file mode 100755 index dd2c42ba4..000000000 --- a/scripts/setup_kfp.sh +++ /dev/null @@ -1,10 +0,0 @@ -#!/bin/bash - -set -eux - -export PIPELINE_VERSION=1.8.5 -kubectl apply -k "github.com/kubeflow/pipelines/manifests/kustomize/cluster-scoped-resources?ref=$PIPELINE_VERSION" -kubectl wait --for condition=established --timeout=60s crd/applications.app.k8s.io -kubectl apply -k "github.com/kubeflow/pipelines/manifests/kustomize/env/dev?ref=$PIPELINE_VERSION" -kubectl apply -f resources/kfp_volcano_role_binding.yaml -kubectl wait --namespace=kubeflow --for condition=available --timeout=10m deployments/metadata-grpc-deployment diff --git a/setup.py b/setup.py index 16981d5dc..c49705707 100644 --- a/setup.py +++ b/setup.py @@ -87,7 +87,6 @@ def get_nightly_version(): "google-cloud-logging>=3.0.0", "google-cloud-runtimeconfig>=0.33.2", ], - "kfp": ["kfp==1.6.2"], "kubernetes": ["kubernetes>=11"], "ray": ["ray>=1.12.1"], "dev": dev_reqs, diff --git a/torchx/components/__init__.py b/torchx/components/__init__.py index a215fc4ce..428ad9aee 100644 --- a/torchx/components/__init__.py +++ b/torchx/components/__init__.py @@ -298,13 +298,6 @@ def f(i: int, f: float, s: str, b: bool, l: List[str], d: Dict[str, str], *args) * ``*args=["--help"]``: ``torchx run comp.py:f -- --help`` * ``*args=["--i", "2"]``: ``torchx run comp.py:f --i 1 -- --i 2`` -Run in a Pipeline --------------------------------- - -The :ref:`torchx.pipelines` define adapters that -convert a torchx component into the object that represents a pipeline "stage" in the -target pipeline platform (see :ref:`Pipelines` for a list of supported pipeline orchestrators). - Additional Resources ----------------------- diff --git a/torchx/examples/pipelines/README.rst b/torchx/examples/pipelines/README.rst deleted file mode 100644 index 59f79deca..000000000 --- a/torchx/examples/pipelines/README.rst +++ /dev/null @@ -1,4 +0,0 @@ -Pipelines Examples -================== - -This contains examples of using TorchX components as part of pipelines. diff --git a/torchx/examples/pipelines/__init__.py b/torchx/examples/pipelines/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/torchx/examples/pipelines/kfp/README.rst b/torchx/examples/pipelines/kfp/README.rst deleted file mode 100644 index 26de2d7e0..000000000 --- a/torchx/examples/pipelines/kfp/README.rst +++ /dev/null @@ -1,6 +0,0 @@ -KubeFlow Pipelines Examples -########################### - -Each of these files is a python file that generates a Kubeflow Pipeline -definition that uses TorchX components. The generated ``pipeline.yaml`` files -can be uploaded to a KFP cluster to run the pipeline. diff --git a/torchx/examples/pipelines/kfp/__init__.py b/torchx/examples/pipelines/kfp/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/torchx/examples/pipelines/kfp/advanced_pipeline.py b/torchx/examples/pipelines/kfp/advanced_pipeline.py deleted file mode 100755 index e84e6ee52..000000000 --- a/torchx/examples/pipelines/kfp/advanced_pipeline.py +++ /dev/null @@ -1,289 +0,0 @@ -#!/usr/bin/env python3 -# Copyright (c) Meta Platforms, Inc. and affiliates. -# All rights reserved. -# -# This source code is licensed under the BSD-style license found in the -# LICENSE file in the root directory of this source tree. - -# pyre-strict - -""" -Advanced KubeFlow Pipelines Example -=================================== - -This is an example pipeline using KubeFlow Pipelines built with only TorchX -components. - -KFP adapters can be used transform the TorchX components directly into -something that can be used within KFP. -""" - -# %% -# Input Arguments -# ############### -# Lets first define some arguments for the pipeline. - -import argparse -import os.path -import sys -from typing import Dict - -import kfp -import torchx -from torchx import specs -from torchx.components.dist import ddp as dist_ddp -from torchx.components.serve import torchserve -from torchx.components.utils import copy as utils_copy, python as utils_python -from torchx.pipelines.kfp.adapter import container_from_app - - -parser = argparse.ArgumentParser(description="example kfp pipeline") - -# %% -# TorchX components are built around images. Depending on what scheduler -# you're using this can vary but for KFP these images are specified as -# docker containers. We have one container for the example apps and one for -# the standard built in apps. If you modify the torchx example code you'll -# need to rebuild the container before launching it on KFP - - -parser.add_argument( - "--image", - type=str, - help="docker image to use for the examples apps", - default=torchx.IMAGE, -) - -# %% -# Most TorchX components use -# `fsspec `_ to abstract -# away dealing with remote filesystems. This allows the components to take -# paths like ``s3://`` to make it easy to use cloud storage providers. -parser.add_argument( - "--output_path", - type=str, - help="path to place the data", - required=True, -) -parser.add_argument("--load_path", type=str, help="checkpoint path to load from") - -# %% -# This example uses the torchserve for inference so we need to specify some -# options. This assumes you have a TorchServe instance running in the same -# Kubernetes cluster with with the service name ``torchserve`` in the default -# namespace. -# -# See https://github.com/pytorch/serve/blob/master/kubernetes/README.md for info -# on how to setup TorchServe. -parser.add_argument( - "--management_api", - type=str, - help="path to the torchserve management API", - default="http://torchserve.default.svc.cluster.local:8081", -) -parser.add_argument( - "--model_name", - type=str, - help="the name of the inference model", - default="tiny_image_net", -) - -# %% Parse the arguments, you'll need to set these accordingly if running from a -# notebook. - - -if "NOTEBOOK" in globals(): - argv = [ - "--output_path", - "/tmp/output", - ] -else: - argv = sys.argv[1:] - -args: argparse.Namespace = parser.parse_args(argv) - -# %% -# Creating the Components -# ####################### -# The first step is downloading the data to somewhere we can work on it. For -# this we can just the builtin copy component. This component takes two valid -# fsspec paths and copies them from one to another. In this case we're using -# http as the source and a file under the output_path as the output. - - -data_path: str = os.path.join(args.output_path, "tiny-imagenet-200.zip") -copy_app: specs.AppDef = utils_copy( - "http://cs231n.stanford.edu/tiny-imagenet-200.zip", - data_path, - image=args.image, -) - -# %% -# The next component is for data preprocessing. This takes in the raw data from -# the previous operator and runs some transforms on it for use with the trainer. -# -# datapreproc outputs the data to a specified fsspec path. These paths are all -# specified ahead of time so we have a fully static pipeline. - - -processed_data_path: str = os.path.join(args.output_path, "processed") -datapreproc_app: specs.AppDef = utils_python( - "--output_path", - processed_data_path, - "--input_path", - data_path, - "--limit", - "100", - image=args.image, - m="torchx.examples.apps.datapreproc.datapreproc", - cpu=1, - memMB=1024, -) - -# %% -# Next we'll create the trainer component that takes in the training data from the -# previous datapreproc component. We've defined this in a separate component -# file as you normally would. -# -# Having a separate component file allows you to launch your trainer from the -# TorchX CLI via ``torchx run`` for fast iteration as well as run it from a -# pipeline in an automated fashion. - -# make sure examples is on the path -if "__file__" in globals(): - sys.path.append(os.path.join(os.path.dirname(__file__), "..", "..", "..")) - - -logs_path: str = os.path.join(args.output_path, "logs") -models_path: str = os.path.join(args.output_path, "models") - -trainer_app: specs.AppDef = dist_ddp( - *( - "--output_path", - models_path, - "--load_path", - args.load_path or "", - "--log_path", - logs_path, - "--data_path", - processed_data_path, - "--epochs", - str(1), - ), - image=args.image, - m="torchx.examples.apps.lightning.train", - j="1x1", - # per node resource settings - cpu=1, - memMB=3000, -) - -# %% -# To have the tensorboard path show up in KFPs UI we need to some metadata so -# KFP knows where to consume the metrics from. -# -# This will get used when we create the KFP container. - - -ui_metadata: Dict[str, object] = { - "outputs": [ - { - "type": "tensorboard", - "source": os.path.join(logs_path, "lightning_logs"), - } - ] -} - -# %% -# For the inference, we're leveraging one of the builtin TorchX components. This -# component takes in a model and uploads it to the TorchServe management API -# endpoints. - - -serve_app: specs.AppDef = torchserve( - model_path=os.path.join(models_path, "model.mar"), - management_api=args.management_api, - image=args.image, - params={ - "model_name": args.model_name, - # set this to allocate a worker - # "initial_workers": 1, - }, -) - -# %% -# For model interpretability we're leveraging a custom component stored in it's -# own component file. This component takes in the output from datapreproc and -# train components and produces images with integrated gradient results. - -interpret_path: str = os.path.join(args.output_path, "interpret") -interpret_app: specs.AppDef = utils_python( - *( - "--load_path", - os.path.join(models_path, "last.ckpt"), - "--data_path", - processed_data_path, - "--output_path", - interpret_path, - ), - image=args.image, - m="torchx.examples.apps.lightning.interpret", -) - -# %% -# Pipeline Definition -# ################### -# The last step is to define the actual pipeline using the torchx components via -# the KFP adapter and export the pipeline package that can be uploaded to a KFP -# cluster. -# -# The KFP adapter currently doesn't track the input and outputs so the -# containers need to have their dependencies specified via `.after()`. -# -# We call `.set_tty()` to make the logs from the components more responsive for -# example purposes. - - -def pipeline() -> None: - # container_from_app creates a KFP container from the TorchX app - # definition. - copy = container_from_app(copy_app) - copy.container.set_tty() - - datapreproc = container_from_app(datapreproc_app) - datapreproc.container.set_tty() - datapreproc.after(copy) - - # For the trainer we want to log that UI metadata so you can access - # tensorboard from the UI. - trainer = container_from_app(trainer_app, ui_metadata=ui_metadata) - trainer.container.set_tty() - trainer.after(datapreproc) - - if False: - serve = container_from_app(serve_app) - serve.container.set_tty() - serve.after(trainer) - - if False: - # Serve and interpret only require the trained model so we can run them - # in parallel to each other. - interpret = container_from_app(interpret_app) - interpret.container.set_tty() - interpret.after(trainer) - - -kfp.compiler.Compiler().compile( - pipeline_func=pipeline, - package_path="pipeline.yaml", -) - -with open("pipeline.yaml", "rt") as f: - print(f.read()) - -# %% -# Once this has all run you should have a pipeline file (typically -# pipeline.yaml) that you can upload to your KFP cluster via the UI or -# a kfp.Client. - -# sphinx_gallery_thumbnail_path = '_static/img/gallery-kfp.png' diff --git a/torchx/examples/pipelines/kfp/dist_pipeline.py b/torchx/examples/pipelines/kfp/dist_pipeline.py deleted file mode 100755 index 4cf8f2e05..000000000 --- a/torchx/examples/pipelines/kfp/dist_pipeline.py +++ /dev/null @@ -1,71 +0,0 @@ -#!/usr/bin/env python3 -# Copyright (c) Meta Platforms, Inc. and affiliates. -# All rights reserved. -# -# This source code is licensed under the BSD-style license found in the -# LICENSE file in the root directory of this source tree. - -# pyre-strict - -""" -Distributed KubeFlow Pipelines Example -====================================== - -This is an example KFP pipeline that uses resource_from_app to launch a -distributed operator using the kubernetes/volcano job scheduler. This only works -in Kubernetes KFP clusters with https://volcano.sh/en/docs/ installed on them. -""" - -import kfp -from torchx import specs -from torchx.pipelines.kfp.adapter import resource_from_app - - -def pipeline() -> None: - # First we define our AppDef for the component, we set - echo_app = specs.AppDef( - name="test-dist", - roles=[ - specs.Role( - name="dist-echo", - image="alpine", - entrypoint="/bin/echo", - args=["hello dist!"], - num_replicas=3, - ), - ], - ) - - # To convert the TorchX AppDef into a KFP container we use - # the resource_from_app adapter. This takes generates a KFP Kubernetes - # resource operator definition from the TorchX app def and instantiates it. - echo_container: kfp.dsl.BaseOp = resource_from_app(echo_app, queue="default") - - -# %% -# To generate the pipeline definition file we need to call into the KFP compiler -# with our pipeline function. - -kfp.compiler.Compiler().compile( - pipeline_func=pipeline, - package_path="pipeline.yaml", -) - -with open("pipeline.yaml", "rt") as f: - print(f.read()) - -# %% -# Once this has all run you should have a pipeline file (typically -# pipeline.yaml) that you can upload to your KFP cluster via the UI or -# a kfp.Client. -# -# See the -# `KFP SDK Examples `_ -# for more info on launching KFP pipelines. - -# %% -# See the :ref:`examples_pipelines/kfp/advanced_pipeline:Advanced KubeFlow Pipelines Example` for how to chain multiple -# components together and use builtin components. - - -# sphinx_gallery_thumbnail_path = '_static/img/gallery-kfp.png' diff --git a/torchx/examples/pipelines/kfp/intro_pipeline.py b/torchx/examples/pipelines/kfp/intro_pipeline.py deleted file mode 100755 index 07130b338..000000000 --- a/torchx/examples/pipelines/kfp/intro_pipeline.py +++ /dev/null @@ -1,83 +0,0 @@ -#!/usr/bin/env python3 -# Copyright (c) Meta Platforms, Inc. and affiliates. -# All rights reserved. -# -# This source code is licensed under the BSD-style license found in the -# LICENSE file in the root directory of this source tree. - -# pyre-strict - -""" -Intro KubeFlow Pipelines Example -================================ - -This an introductory pipeline using KubeFlow Pipelines built with only TorchX -components. - -TorchX is intended to allow making cross platform components. As such, we have -a standard definition that uses adapters to convert it to the specific -pipeline platform. This is an example of using the KFP adapter to run a TorchX -component as part of a KubeFlow Pipeline. - -TorchX tries to leverage standard mechanisms wherever possible. For KFP we use -the existing KFP pipeline definition syntax and add a single -`component_from_app` conversion step to convert a TorchX component into one -KFP can understand. - -Typically you have a separate component file but for this example we define the -AppDef inline. -""" - -import kfp -from torchx import specs -from torchx.pipelines.kfp.adapter import container_from_app - - -def pipeline() -> None: - # First we define our AppDef for the component. AppDef is a core part of TorchX - # and can be used to describe complex distributed multi container apps or - # just a single node component like here. - echo_app: specs.AppDef = specs.AppDef( - name="examples-intro", - roles=[ - specs.Role( - name="worker", - entrypoint="/bin/echo", - args=["Hello TorchX!"], - image="alpine", - ) - ], - ) - - # To convert the TorchX AppDef into a KFP container we use - # the container_from_app adapter. This takes generates a KFP component - # definition from the TorchX app def and instantiates it into a container. - echo_container: kfp.dsl.ContainerOp = container_from_app(echo_app) - - -# %% -# To generate the pipeline definition file we need to call into the KFP compiler -# with our pipeline function. - -kfp.compiler.Compiler().compile( - pipeline_func=pipeline, - package_path="pipeline.yaml", -) - -with open("pipeline.yaml", "rt") as f: - print(f.read()) - -# %% -# Once this has all run you should have a pipeline file (typically -# pipeline.yaml) that you can upload to your KFP cluster via the UI or -# a kfp.Client. -# -# See the -# `KFP SDK Examples `_ -# for more info on launching KFP pipelines. - -# %% -# See the :ref:`examples_pipelines/kfp/advanced_pipeline:Advanced KubeFlow Pipelines Example` for how to chain multiple -# components together and use builtin components. - -# sphinx_gallery_thumbnail_path = '_static/img/gallery-kfp.png' diff --git a/torchx/examples/pipelines/kfp/test/__init__.py b/torchx/examples/pipelines/kfp/test/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/torchx/examples/pipelines/kfp/test/kfp_pipeline_test.py b/torchx/examples/pipelines/kfp/test/kfp_pipeline_test.py deleted file mode 100644 index f4d26da4b..000000000 --- a/torchx/examples/pipelines/kfp/test/kfp_pipeline_test.py +++ /dev/null @@ -1,46 +0,0 @@ -# Copyright (c) Meta Platforms, Inc. and affiliates. -# All rights reserved. -# -# This source code is licensed under the BSD-style license found in the -# LICENSE file in the root directory of this source tree. - -# pyre-strict - -import os -import os.path -import sys -import tempfile -import unittest - - -class KFPPipelineTest(unittest.TestCase): - def setUp(self) -> None: - self.dir = tempfile.TemporaryDirectory() # noqa: P201 - self.orig_dir = os.getcwd() - os.chdir(self.dir.name) - - def tearDown(self) -> None: - os.chdir(self.orig_dir) - self.dir.cleanup() - - def test_kfp_pipeline(self) -> None: - sys.argv = [ - "advanced_pipeline.py", - "--output_path", - "bar", - ] - from torchx.examples.pipelines.kfp import advanced_pipeline # noqa: F401 - - self.assertTrue(os.path.exists("pipeline.yaml")) - - def test_intro_pipeline(self) -> None: - sys.argv = ["intro_pipeline.py"] - from torchx.examples.pipelines.kfp import intro_pipeline # noqa: F401 - - self.assertTrue(os.path.exists("pipeline.yaml")) - - def test_dist_pipeline(self) -> None: - sys.argv = ["dist_pipeline.py"] - from torchx.examples.pipelines.kfp import dist_pipeline # noqa: F401 - - self.assertTrue(os.path.exists("pipeline.yaml")) diff --git a/torchx/pipelines/kfp/__init__.py b/torchx/pipelines/kfp/__init__.py deleted file mode 100644 index 1adeede50..000000000 --- a/torchx/pipelines/kfp/__init__.py +++ /dev/null @@ -1,30 +0,0 @@ -#!/usr/bin/env python3 -# Copyright (c) Meta Platforms, Inc. and affiliates. -# All rights reserved. -# -# This source code is licensed under the BSD-style license found in the -# LICENSE file in the root directory of this source tree. - -# pyre-strict - -""" -This module contains adapters for converting TorchX components into KubeFlow -Pipeline components. - -The current KFP adapters only support single node (1 role and 1 replica) -components. -""" - -import kfp - -from .version import __version__ as __version__ # noqa F401 - - -def _check_kfp_version() -> None: - if not kfp.__version__.startswith("1."): - raise ImportError( - f"Only kfp version 1.x.x is supported! kfp version {kfp.__version__}" - ) - - -_check_kfp_version() diff --git a/torchx/pipelines/kfp/adapter.py b/torchx/pipelines/kfp/adapter.py deleted file mode 100644 index 427f25f44..000000000 --- a/torchx/pipelines/kfp/adapter.py +++ /dev/null @@ -1,274 +0,0 @@ -#!/usr/bin/env python3 -# Copyright (c) Meta Platforms, Inc. and affiliates. -# All rights reserved. -# -# This source code is licensed under the BSD-style license found in the -# LICENSE file in the root directory of this source tree. - -# pyre-strict - -import json -import os -import os.path -import shlex -from typing import Mapping, Optional, Tuple - -import yaml -from kfp import components, dsl - -# @manual=fbsource//third-party/pypi/kfp:kfp -from kfp.components.structures import ComponentSpec, OutputSpec -from kubernetes.client.models import ( - V1ContainerPort, - V1EmptyDirVolumeSource, - V1Volume, - V1VolumeMount, -) -from torchx.schedulers.kubernetes_scheduler import app_to_resource, pod_labels -from torchx.specs import api -from typing_extensions import Protocol - -from .version import __version__ as __version__ # noqa F401 - - -def component_spec_from_app(app: api.AppDef) -> Tuple[str, api.Role]: - """ - component_spec_from_app takes in a TorchX component and generates the yaml - spec for it. Notably this doesn't apply resources or port_maps since those - must be applied at runtime which is why it returns the role spec as well. - - >>> from torchx import specs - >>> from torchx.pipelines.kfp.adapter import component_spec_from_app - >>> app_def = specs.AppDef( - ... name="trainer", - ... roles=[specs.Role("trainer", image="foo:latest")], - ... ) - >>> component_spec_from_app(app_def) - ('description: ...', Role(...)) - """ - assert len(app.roles) == 1, f"KFP adapter only support one role, got {app.roles}" - - role = app.roles[0] - assert ( - role.num_replicas - == 1 - # pyre-fixme[16]: `AppDef` has no attribute `num_replicas`. - ), f"KFP adapter only supports one replica, got {app.num_replicas}" - - command = [role.entrypoint, *role.args] - - spec = { - "name": f"{app.name}-{role.name}", - "description": f"KFP wrapper for TorchX component {app.name}, role {role.name}", - "implementation": { - "container": { - "image": role.image, - "command": command, - "env": role.env, - } - }, - "outputs": [], - } - return yaml.dump(spec), role - - -class ContainerFactory(Protocol): - """ - ContainerFactory is a protocol that represents a function that when called produces a - kfp.dsl.ContainerOp. - """ - - def __call__(self, *args: object, **kwargs: object) -> dsl.ContainerOp: ... - - -class KFPContainerFactory(ContainerFactory, Protocol): - """ - KFPContainerFactory is a ContainerFactory that also has some KFP metadata - attached to it. - """ - - component_spec: ComponentSpec - - -METADATA_FILE = "/tmp/outputs/mlpipeline-ui-metadata/data.json" - - -def component_from_app( - app: api.AppDef, ui_metadata: Optional[Mapping[str, object]] = None -) -> ContainerFactory: - """ - component_from_app takes in a TorchX component/AppDef and returns a KFP - ContainerOp factory. This is equivalent to the - `kfp.components.load_component_from_* - `_ - methods. - - Args: - app: The AppDef to generate a KFP container factory for. - ui_metadata: KFP UI Metadata to output so you can have model results show - up in the UI. See - https://www.kubeflow.org/docs/components/pipelines/legacy-v1/sdk/output-viewer/ - for more info on the format. - - >>> from torchx import specs - >>> from torchx.pipelines.kfp.adapter import component_from_app - >>> app_def = specs.AppDef( - ... name="trainer", - ... roles=[specs.Role("trainer", image="foo:latest")], - ... ) - >>> component_from_app(app_def) - - """ - - role_spec: api.Role - spec, role_spec = component_spec_from_app(app) - resources: api.Resource = role_spec.resource - assert ( - len(resources.capabilities) == 0 - ), f"KFP doesn't support capabilities, got {resources.capabilities}" - component_factory: KFPContainerFactory = components.load_component_from_text(spec) - - if ui_metadata is not None: - # pyre-fixme[16]: `ComponentSpec` has no attribute `outputs` - component_factory.component_spec.outputs.append( - OutputSpec( - name="mlpipeline-ui-metadata", - type="MLPipeline UI Metadata", - description="ui metadata", - ) - ) - - def factory_wrapper(*args: object, **kwargs: object) -> dsl.ContainerOp: - c = component_factory(*args, **kwargs) - container = c.container - - if ui_metadata is not None: - # We generate the UI metadata from the sidecar so we need to make - # both the container and the sidecar share the same tmp directory so - # the outputs appear in the original container. - c.add_volume(V1Volume(name="tmp", empty_dir=V1EmptyDirVolumeSource())) - container.add_volume_mount( - V1VolumeMount( - name="tmp", - mount_path="/tmp/", - ) - ) - c.output_artifact_paths["mlpipeline-ui-metadata"] = METADATA_FILE - c.add_sidecar(_ui_metadata_sidecar(ui_metadata)) - - cpu = resources.cpu - if cpu >= 0: - cpu_str = f"{int(cpu*1000)}m" - container.set_cpu_request(cpu_str) - container.set_cpu_limit(cpu_str) - mem = resources.memMB - if mem >= 0: - mem_str = f"{int(mem)}M" - container.set_memory_request(mem_str) - container.set_memory_limit(mem_str) - gpu = resources.gpu - if gpu > 0: - container.set_gpu_limit(str(gpu)) - - for name, port in role_spec.port_map.items(): - container.add_port( - V1ContainerPort( - name=name, - container_port=port, - ), - ) - - c.pod_labels.update(pod_labels(app, 0, role_spec, 0, app.name)) - - return c - - return factory_wrapper - - -def _ui_metadata_sidecar( - ui_metadata: Mapping[str, object], image: str = "alpine" -) -> dsl.Sidecar: - shell_encoded = shlex.quote(json.dumps(ui_metadata)) - dirname = os.path.dirname(METADATA_FILE) - return dsl.Sidecar( - name="ui-metadata-sidecar", - image=image, - command=[ - "sh", - "-c", - f"mkdir -p {dirname}; echo {shell_encoded} > {METADATA_FILE}", - ], - mirror_volume_mounts=True, - ) - - -def container_from_app( - app: api.AppDef, - *args: object, - ui_metadata: Optional[Mapping[str, object]] = None, - **kwargs: object, -) -> dsl.ContainerOp: - """ - container_from_app transforms the app into a KFP component and returns a - corresponding ContainerOp instance. - - See component_from_app for description on the arguments. Any unspecified - arguments are passed through to the KFP container factory method. - - >>> import kfp - >>> from torchx import specs - >>> from torchx.pipelines.kfp.adapter import container_from_app - >>> app_def = specs.AppDef( - ... name="trainer", - ... roles=[specs.Role("trainer", image="foo:latest")], - ... ) - >>> def pipeline(): - ... trainer = container_from_app(app_def) - ... print(trainer) - >>> kfp.compiler.Compiler().compile( - ... pipeline_func=pipeline, - ... package_path="/tmp/pipeline.yaml", - ... ) - {'ContainerOp': {... 'name': 'trainer-trainer', ...}} - """ - factory = component_from_app(app, ui_metadata) - return factory(*args, **kwargs) - - -def resource_from_app( - app: api.AppDef, - queue: str, - service_account: Optional[str] = None, -) -> dsl.ResourceOp: - """ - resource_from_app generates a KFP ResourceOp from the provided app that uses - the Volcano job scheduler on Kubernetes to run distributed apps. See - https://volcano.sh/en/docs/ for more info on Volcano and how to install. - - Args: - app: The torchx AppDef to adapt. - queue: the Volcano queue to schedule the operator in. - - >>> import kfp - >>> from torchx import specs - >>> from torchx.pipelines.kfp.adapter import resource_from_app - >>> app_def = specs.AppDef( - ... name="trainer", - ... roles=[specs.Role("trainer", image="foo:latest", num_replicas=3)], - ... ) - >>> def pipeline(): - ... trainer = resource_from_app(app_def, queue="test") - ... print(trainer) - >>> kfp.compiler.Compiler().compile( - ... pipeline_func=pipeline, - ... package_path="/tmp/pipeline.yaml", - ... ) - {'ResourceOp': {... 'name': 'trainer-0', ... 'name': 'trainer-1', ... 'name': 'trainer-2', ...}} - """ - return dsl.ResourceOp( - name=app.name, - action="create", - success_condition="status.state.phase = Completed", - failure_condition="status.state.phase = Failed", - k8s_resource=app_to_resource(app, queue, service_account=service_account), - ) diff --git a/torchx/pipelines/kfp/test/__init__.py b/torchx/pipelines/kfp/test/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/torchx/pipelines/kfp/test/adapter_test.py b/torchx/pipelines/kfp/test/adapter_test.py deleted file mode 100644 index df7b743a8..000000000 --- a/torchx/pipelines/kfp/test/adapter_test.py +++ /dev/null @@ -1,155 +0,0 @@ -#!/usr/bin/env python3 -# Copyright (c) Meta Platforms, Inc. and affiliates. -# All rights reserved. -# -# This source code is licensed under the BSD-style license found in the -# LICENSE file in the root directory of this source tree. - -# pyre-strict - -import os.path -import tempfile -import unittest -from typing import Callable, List - -import torchx -import yaml -from kfp import compiler, components, dsl -from kubernetes.client.models import V1ContainerPort, V1ResourceRequirements -from torchx.pipelines.kfp.adapter import ( - component_from_app, - component_spec_from_app, - container_from_app, - ContainerFactory, -) -from torchx.specs import api - - -class KFPSpecsTest(unittest.TestCase): - """ - tests KFP components using torchx.specs.api - """ - - def _test_app(self) -> api.AppDef: - trainer_role = api.Role( - name="trainer", - image="pytorch/torchx:latest", - entrypoint="main", - args=["--output-path", "blah"], - env={"FOO": "bar"}, - resource=api.Resource( - cpu=2, - memMB=3000, - gpu=4, - ), - port_map={"foo": 1234}, - num_replicas=1, - ) - - return api.AppDef("test", roles=[trainer_role]) - - def _compile_pipeline(self, pipeline: Callable[[], None]) -> None: - with tempfile.TemporaryDirectory() as tmpdir: - pipeline_file = os.path.join(tmpdir, "pipeline.yaml") - compiler.Compiler().compile(pipeline, pipeline_file) - with open(pipeline_file, "r") as f: - data = yaml.safe_load(f) - - spec = data["spec"] - templates = spec["templates"] - self.assertGreaterEqual(len(templates), 2) - - def test_component_spec_from_app(self) -> None: - app = self._test_app() - - spec, role = component_spec_from_app(app) - self.assertIsNotNone(components.load_component_from_text(spec)) - self.assertEqual(role.resource, app.roles[0].resource) - self.assertEqual( - spec, - """description: KFP wrapper for TorchX component test, role trainer -implementation: - container: - command: - - main - - --output-path - - blah - env: - FOO: bar - image: pytorch/torchx:latest -name: test-trainer -outputs: [] -""", - ) - - def test_pipeline(self) -> None: - app = self._test_app() - kfp_copy: ContainerFactory = component_from_app(app) - - def pipeline() -> None: - a = kfp_copy() - resources: V1ResourceRequirements = a.container.resources - self.assertEqual( - resources, - V1ResourceRequirements( - limits={ - "cpu": "2000m", - "memory": "3000M", - "nvidia.com/gpu": "4", - }, - requests={ - "cpu": "2000m", - "memory": "3000M", - }, - ), - ) - ports: List[V1ContainerPort] = a.container.ports - self.assertEqual( - ports, - [V1ContainerPort(name="foo", container_port=1234)], - ) - - b = kfp_copy() - b.after(a) - - self._compile_pipeline(pipeline) - - def test_pipeline_metadata(self) -> None: - app = self._test_app() - metadata = {} - kfp_copy: ContainerFactory = component_from_app(app, metadata) - - def pipeline() -> None: - a = kfp_copy() - self.assertEqual(len(a.volumes), 1) - self.assertEqual(len(a.container.volume_mounts), 1) - self.assertEqual(len(a.sidecars), 1) - self.assertEqual( - a.output_artifact_paths["mlpipeline-ui-metadata"], - "/tmp/outputs/mlpipeline-ui-metadata/data.json", - ) - self.assertEqual( - a.pod_labels, - { - "app.kubernetes.io/instance": "test", - "app.kubernetes.io/managed-by": "torchx.pytorch.org", - "app.kubernetes.io/name": "test", - "torchx.pytorch.org/version": torchx.__version__, - "torchx.pytorch.org/app-name": "test", - "torchx.pytorch.org/role-index": "0", - "torchx.pytorch.org/role-name": "trainer", - "torchx.pytorch.org/replica-id": "0", - }, - ) - - self._compile_pipeline(pipeline) - - def test_container_from_app(self) -> None: - app: api.AppDef = self._test_app() - - def pipeline() -> None: - a: dsl.ContainerOp = container_from_app(app) - b: dsl.ContainerOp = container_from_app(app) - b.after(a) - - self._compile_pipeline(pipeline) diff --git a/torchx/pipelines/kfp/test/suites.py b/torchx/pipelines/kfp/test/suites.py deleted file mode 100644 index bb37bff19..000000000 --- a/torchx/pipelines/kfp/test/suites.py +++ /dev/null @@ -1,50 +0,0 @@ -#!/usr/bin/env python3 -# Copyright (c) Meta Platforms, Inc. and affiliates. -# All rights reserved. -# -# This source code is licensed under the BSD-style license found in the -# LICENSE file in the root directory of this source tree. - -import os -import random -import unittest -from itertools import chain - - -def _circleci_parallelism(suite: unittest.TestSuite) -> unittest.TestSuite: - """Allow for parallelism in CircleCI for speedier tests..""" - if int(os.environ.get("CIRCLE_NODE_TOTAL", 0)) <= 1: - # either not running on circleci, or we're not using parallelism. - return suite - # tests are automatically sorted by discover, so we will get the same ordering - # on all hosts. - total = int(os.environ["CIRCLE_NODE_TOTAL"]) - index = int(os.environ["CIRCLE_NODE_INDEX"]) - - # right now each test is corresponds to a /file/. Certain files are slower than - # others, so we want to flatten it - # pyre-fixme[16]: `TestCase` has no attribute `_tests`. - tests = [testfile._tests for testfile in suite._tests] - tests = list(chain.from_iterable(tests)) - random.Random(42).shuffle(tests) - tests = [t for i, t in enumerate(tests) if i % total == index] - return unittest.TestSuite(tests) - - -def unittests() -> unittest.TestSuite: - """ - Short tests. - - Runs on CircleCI on every commit. Returns everything in the tests root directory. - """ - test_loader = unittest.TestLoader() - test_suite = test_loader.discover( - "torchx/kfp", pattern="*_test.py", top_level_dir="." - ) - test_suite = _circleci_parallelism(test_suite) - return test_suite - - -if __name__ == "__main__": - runner = unittest.TextTestRunner() - runner.run(unittests()) diff --git a/torchx/pipelines/kfp/test/version_test.py b/torchx/pipelines/kfp/test/version_test.py deleted file mode 100644 index f932f5b7b..000000000 --- a/torchx/pipelines/kfp/test/version_test.py +++ /dev/null @@ -1,29 +0,0 @@ -#!/usr/bin/env python3 -# Copyright (c) Meta Platforms, Inc. and affiliates. -# All rights reserved. -# -# This source code is licensed under the BSD-style license found in the -# LICENSE file in the root directory of this source tree. - -# pyre-strict - -import importlib -import unittest -from unittest.mock import patch - - -class VersionTest(unittest.TestCase): - def test_can_get_version(self) -> None: - import torchx.pipelines.kfp - - self.assertIsNotNone(torchx.pipelines.kfp.__version__) - - def test_kfp_1x(self) -> None: - import torchx.pipelines.kfp - - with patch("kfp.__version__", "2.0.1"): - with self.assertRaisesRegex(ImportError, "Only kfp version"): - importlib.reload(torchx.pipelines.kfp) - - with patch("kfp.__version__", "1.5.0"): - importlib.reload(torchx.pipelines.kfp) diff --git a/torchx/pipelines/kfp/version.py b/torchx/pipelines/kfp/version.py deleted file mode 100644 index a9244e289..000000000 --- a/torchx/pipelines/kfp/version.py +++ /dev/null @@ -1,19 +0,0 @@ -#!/usr/bin/env python3 -# Copyright (c) Meta Platforms, Inc. and affiliates. -# All rights reserved. -# -# This source code is licensed under the BSD-style license found in the -# LICENSE file in the root directory of this source tree. - -# pyre-strict - -# Follows PEP-0440 version scheme guidelines -# https://www.python.org/dev/peps/pep-0440/#version-scheme -# -# Examples: -# 0.1.0.devN # Developmental release -# 0.1.0aN # Alpha release -# 0.1.0bN # Beta release -# 0.1.0rcN # Release Candidate -# 0.1.0 # Final release -__version__ = "0.1.0.dev0"