Skip to content

Commit d2fc215

Browse files
authored
Merge pull request #4 from LamaAni/add_envs
Added envs to default values
2 parents fb27554 + 03e292a commit d2fc215

File tree

5 files changed

+36
-31
lines changed

5 files changed

+36
-31
lines changed

airflow_kubernetes_job_operator/kubernetes_job_operator.py

Lines changed: 25 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
from airflow.utils.decorators import apply_defaults
88
from airflow.operators import BaseOperator
99

10-
from .utils import to_kubernetes_valid_name, set_yaml_path_value
10+
from .utils import to_kubernetes_valid_name, set_yaml_path_value, get_yaml_path_value
1111
from .job_runner import JobRunner
1212
from .threaded_kubernetes_resource_watchers import (
1313
ThreadedKubernetesResourcesWatcher,
@@ -39,6 +39,7 @@ def __init__(
3939
image: str = None,
4040
namespace: str = None,
4141
name: str = None,
42+
envs: dict = None,
4243
job_yaml=None,
4344
job_yaml_filepath=None,
4445
delete_policy: str = "IfSucceeded",
@@ -65,6 +66,8 @@ def __init__(
6566
image {str} -- The image to use in the pod. (default: None)
6667
namespace {str} -- The namespace to execute in. (default: None)
6768
name {str} -- Override automatic name creation for the job. (default: None)
69+
envs {dict} -= A collection of environment variables that will be added to all
70+
containers.
6871
job_yaml {dict|string} -- The job to execute as a yaml description. (default: None)
6972
If None, will use a default job yaml command. In this case you must provide an
7073
image.
@@ -98,9 +101,7 @@ def __init__(
98101
), "job_yaml is None, and an image was not defined. Unknown image to execute."
99102

100103
# use or load
101-
job_yaml = job_yaml or self.read_job_yaml(
102-
job_yaml_filepath or JOB_YAML_DEFAULT_FILE
103-
)
104+
job_yaml = job_yaml or self.read_job_yaml(job_yaml_filepath or JOB_YAML_DEFAULT_FILE)
104105

105106
assert job_yaml is not None and (
106107
isinstance(job_yaml, (dict, str))
@@ -112,12 +113,15 @@ def __init__(
112113
"ifsucceeded",
113114
], "the delete_policy must be one of: Never, Always, IfSucceeded"
114115

116+
assert envs is None or isinstance(envs, dict), "The env collection must be a dict or None"
117+
115118
# override/replace properties
116119
self.name = name
117120
self.namespace = namespace
118121
self.command = command
119122
self.arguments = arguments
120123
self.image = image
124+
self.envs = envs
121125

122126
# kubernetes config properties.
123127
self.job_yaml = job_yaml
@@ -211,13 +215,9 @@ def log_job_result(
211215
self.log.error(f"Job Failed ({pod_count} pods), last pod/job status:")
212216

213217
# log proper resource error
214-
def log_resource_error(
215-
resource_watcher: ThreadedKubernetesResourcesWatcher,
216-
):
218+
def log_resource_error(resource_watcher: ThreadedKubernetesResourcesWatcher,):
217219
log_method = (
218-
self.log.error
219-
if resource_watcher.status == "Failed"
220-
else self.log.info
220+
self.log.error if resource_watcher.status == "Failed" else self.log.info
221221
)
222222
log_method(
223223
"FINAL STATUS: "
@@ -268,15 +268,19 @@ def set_if_not_none(path_names: list, value):
268268

269269
set_if_not_none(["metadata", "name"], self.name)
270270
set_if_not_none(["metadata", "namespace"], self.namespace)
271-
set_if_not_none(
272-
["spec", "template", "spec", "containers", 0, "command"], self.command
273-
)
274-
set_if_not_none(
275-
["spec", "template", "spec", "containers", 0, "args"], self.arguments
276-
)
277-
set_if_not_none(
278-
["spec", "template", "spec", "containers", 0, "image"], self.image
279-
)
271+
set_if_not_none(["spec", "template", "spec", "containers", 0, "command"], self.command)
272+
set_if_not_none(["spec", "template", "spec", "containers", 0, "args"], self.arguments)
273+
set_if_not_none(["spec", "template", "spec", "containers", 0, "image"], self.image)
274+
275+
containers = get_yaml_path_value(self.job_yaml, ["spec", "template", "spec", "containers"])
276+
277+
if self.envs is not None and len(self.envs.keys()) > 0:
278+
for container in containers:
279+
if "env" not in container:
280+
container["env"] = []
281+
282+
for env_name in self.envs.keys():
283+
container["env"].append({"name": env_name, "value": self.envs[env_name]})
280284

281285
# call parent.
282286
return super().pre_execute(context)
@@ -295,9 +299,7 @@ def execute(self, context):
295299

296300
# Executing the job
297301
(job_watcher, namespace_watcher) = self.job_runner.execute_job(
298-
self.job_yaml,
299-
start_timeout=self.startup_timeout_seconds,
300-
read_logs=self.get_logs,
302+
self.job_yaml, start_timeout=self.startup_timeout_seconds, read_logs=self.get_logs,
301303
)
302304

303305
self.__waiting_for_job_execution = False
@@ -333,8 +335,7 @@ def on_kill(self):
333335
self.log.info("Job deleted.")
334336
except Exception:
335337
self.log.error(
336-
"Failed to delete an aborted/killed"
337-
+ " job! The job may still be executing."
338+
"Failed to delete an aborted/killed" + " job! The job may still be executing."
338339
)
339340

340341
return super().on_kill()

tests/airflow-webserver.pid

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
683

tests/dags/test_job_operator.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,17 @@ def read_job_yaml(fpath):
1818
success_job_yaml = read_job_yaml(__file__ + ".success.yaml")
1919
fail_job_yaml = read_job_yaml(__file__ + ".fail.yaml")
2020

21+
envs = {"PASS_ARG": "a test"}
22+
2123
# BashOperator(bash_command="date", task_id="test-bash", dag=dag)
2224

23-
KubernetesJobOperator(task_id="test-job-success", job_yaml=success_job_yaml, dag=dag)
24-
KubernetesJobOperator(task_id="test-job-fail", job_yaml=fail_job_yaml, dag=dag)
25+
KubernetesJobOperator(task_id="test-job-success", job_yaml=success_job_yaml, envs=envs, dag=dag)
26+
KubernetesJobOperator(task_id="test-job-fail", job_yaml=fail_job_yaml, envs=envs, dag=dag)
2527
KubernetesJobOperator(
2628
task_id="test-job-overrides",
2729
dag=dag,
2830
image="ubuntu",
29-
command=["bash", "-c", "echo start; sleep 10; echo end"],
31+
envs=envs,
32+
command=["bash", "-c", 'echo "Starting $PASS_ARG"; sleep 10; echo end'],
3033
)
3134

tests/dags/test_job_operator.py.fail.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@ spec:
1919
- -c
2020
- |
2121
#/usr/bin/env bash
22-
echo "Starting"
23-
TIC_COUNT=0
22+
echo "Starting $PASS_ARG"
23+
TIC_COUNT=5
2424
cur_count=0
2525
while true; do
2626
cur_count=$((cur_count + 1))

tests/dags/test_job_operator.py.success.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@ spec:
1919
- -c
2020
- |
2121
#/usr/bin/env bash
22-
echo "Starting"
23-
TIC_COUNT=0
22+
echo "Starting $PASS_ARG"
23+
TIC_COUNT=5
2424
cur_count=0
2525
while true; do
2626
cur_count=$((cur_count + 1))

0 commit comments

Comments
 (0)