Skip to content

Commit 5110fdd

Browse files
authored
Create module and action plugins for assemble_cluster_template (#164)
* Add module and action for cloudera.cluster.assemble_cluster_template * Update assemble_cluster_template to handle JSON and process idempotent and unique keys * Update to handle aliases, temp file cleanup, documentation for assemble_cluster_template action plugin * Add merge logic to assemble_cluster_template module plugin Signed-off-by: Webster Mudge <wmudge@cloudera.com>
1 parent 03de94d commit 5110fdd

File tree

2 files changed

+626
-0
lines changed

2 files changed

+626
-0
lines changed
Lines changed: 283 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,283 @@
1+
# Copyright 2023 Cloudera, Inc.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import json
16+
import os
17+
import re
18+
import tempfile
19+
20+
from ansible import constants as C
21+
from ansible.errors import (
22+
AnsibleAction,
23+
AnsibleError,
24+
_AnsibleActionDone,
25+
AnsibleActionFail,
26+
)
27+
from ansible.module_utils.common.text.converters import to_native, to_text
28+
from ansible.module_utils.parsing.convert_bool import boolean
29+
from ansible.plugins.action import ActionBase
30+
from ansible.utils.hashing import checksum_s
31+
32+
33+
class ActionModule(ActionBase):
34+
TRANSFERS_FILES = True
35+
36+
MERGED = {}
37+
IDEMPOTENT_IDS = ["refName", "name", "clusterName", "hostName", "product"]
38+
UNIQUE_IDS = ["repositories"]
39+
40+
def update_object(self, base, template, breadcrumbs=""):
41+
if isinstance(base, dict) and isinstance(template, dict):
42+
self.update_dict(base, template, breadcrumbs)
43+
return True
44+
elif isinstance(base, list) and isinstance(template, list):
45+
self.update_list(base, template, breadcrumbs)
46+
return True
47+
return False
48+
49+
def update_dict(self, base, template, breadcrumbs=""):
50+
for key, value in template.items():
51+
crumb = breadcrumbs + "/" + key
52+
53+
if key in self.IDEMPOTENT_IDS:
54+
if base[key] != value:
55+
self._display.error(
56+
"Objects with distinct IDs should not be merged: " + crumb
57+
)
58+
continue
59+
60+
if key not in base:
61+
base[key] = value
62+
elif not self.update_object(base[key], value, crumb) and base[key] != value:
63+
self._display.warning(
64+
f"Value being overwritten for key [{crumb}]], Old: [{base[key]}], New: [{value}]"
65+
)
66+
base[key] = value
67+
68+
if key in self.UNIQUE_IDS:
69+
base[key] = list(set(base[key]))
70+
71+
def update_list(self, base, template, breadcrumbs=""):
72+
for item in template:
73+
if isinstance(item, dict):
74+
for attr in self.IDEMPOTENT_IDS:
75+
if attr in item:
76+
idempotent_id = attr
77+
break
78+
else:
79+
idempotent_id = None
80+
if idempotent_id:
81+
namesake = [
82+
i for i in base if i[idempotent_id] == item[idempotent_id]
83+
]
84+
if namesake:
85+
self.update_dict(
86+
namesake[0],
87+
item,
88+
breadcrumbs
89+
+ "/["
90+
+ idempotent_id
91+
+ "="
92+
+ item[idempotent_id]
93+
+ "]",
94+
)
95+
continue
96+
base.append(item)
97+
base.sort(key=lambda x: json.dumps(x, sort_keys=True))
98+
99+
def assemble_fragments(
100+
self, assembled_file, src_path, regex=None, ignore_hidden=True, decrypt=True
101+
):
102+
# By file name sort order
103+
for f in (
104+
to_text(p, errors="surrogate_or_strict")
105+
for p in sorted(os.listdir(src_path))
106+
):
107+
# Filter by regexp
108+
if regex and not regex.search(f):
109+
continue
110+
111+
# Read and process the fragment
112+
fragment = os.path.join(src_path, f)
113+
if not os.path.isfile(fragment) or (
114+
ignore_hidden and os.path.basename(fragment).startswith(".")
115+
):
116+
continue
117+
118+
with open(
119+
self._loader.get_real_file(fragment, decrypt=decrypt),
120+
"r",
121+
encoding="utf-8",
122+
) as fragment_file:
123+
try:
124+
self.update_object(self.MERGED, json.loads(fragment_file.read()))
125+
except json.JSONDecodeError as e:
126+
raise AnsibleActionFail(
127+
message=f"JSON parsing error: {to_text(e.msg)}",
128+
obj=to_native(e),
129+
)
130+
131+
# Write out the final assembly
132+
json.dump(self.MERGED, assembled_file, indent=2, sort_keys=False)
133+
134+
# Flush the assembled file handle
135+
assembled_file.flush()
136+
137+
def run(self, tmp=None, task_vars=None):
138+
self._supports_check_mode = False
139+
140+
result = super(ActionModule, self).run(tmp, task_vars)
141+
142+
del tmp # legacy
143+
if task_vars is None:
144+
task_vars = dict()
145+
146+
# Handle aliases
147+
src = self._task.args.get("src", None)
148+
if src is None:
149+
src = self._task.args.get("cluster_template_src", None)
150+
151+
dest = self._task.args.get("dest", None)
152+
if dest is None:
153+
dest = self._task.args.get("cluster_template")
154+
155+
regexp = self._task.args.get("regexp", None)
156+
if regexp is None:
157+
regexp = self._task.args.get("filter", None)
158+
159+
remote_src = boolean(self._task.args.get("remote_src", False))
160+
follow = boolean(self._task.args.get("follow", False))
161+
ignore_hidden = boolean(self._task.args.get("ignore_hidden", True))
162+
decrypt = self._task.args.pop("decrypt", True)
163+
164+
try:
165+
if src is None or dest is None:
166+
raise AnsibleActionFail("Both 'src' and 'dest' are required")
167+
168+
# If src files are on the remote host, run the module
169+
if boolean(remote_src, strict=False):
170+
result.update(
171+
self._execute_module(
172+
module_name="cloudera.cluster.assemble_cluster_template",
173+
task_vars=task_vars,
174+
)
175+
)
176+
raise _AnsibleActionDone()
177+
else:
178+
try:
179+
src = self._find_needle("files", src)
180+
except AnsibleError as e:
181+
raise AnsibleActionFail(to_native(e))
182+
183+
if not os.path.isdir(src):
184+
raise AnsibleActionFail(f"Source, {src}, is not a directory")
185+
186+
# Compile the regexp
187+
compiled = None
188+
if regexp is not None:
189+
try:
190+
compiled = re.compile(regexp)
191+
except re.error as e:
192+
raise AnsibleActionFail(
193+
message=f"Regular expression, {regexp}, is invalid: {to_native(e)}"
194+
)
195+
196+
# Assemble the src files into output file
197+
with tempfile.NamedTemporaryFile(
198+
mode="w", encoding="utf-8", dir=C.DEFAULT_LOCAL_TMP
199+
) as assembled:
200+
self.assemble_fragments(
201+
assembled,
202+
src,
203+
regex=compiled,
204+
ignore_hidden=ignore_hidden,
205+
decrypt=decrypt,
206+
)
207+
208+
# Gather the checksums for assembled file and destination file
209+
assembled_checksum = checksum_s(assembled.name)
210+
211+
dest = self._remote_expand_user(dest)
212+
dest_stat = self._execute_remote_stat(
213+
dest, all_vars=task_vars, follow=follow
214+
)
215+
216+
# Prepare the task arguments for the called submodules
217+
submodule_args = self._task.args.copy()
218+
219+
# Purge non-submodule arguments
220+
for o in [
221+
"cluster_template_src",
222+
"cluster_template",
223+
"remote_src",
224+
"regexp",
225+
"filter",
226+
"ignore_hidden",
227+
"decrypt",
228+
]:
229+
submodule_args.pop(o, None)
230+
231+
# Update the 'dest' arg
232+
submodule_args.update(dest=dest)
233+
234+
if assembled_checksum != dest_stat["checksum"]:
235+
diff = {}
236+
237+
if self._task.diff:
238+
diff = self._get_diff_data(dest, assembled.name, task_vars)
239+
240+
# Define a temporary remote path for the remote copy
241+
remote_path = self._connection._shell.join_path(
242+
self._connection._shell.tmpdir, "assembled_cluster_template"
243+
)
244+
245+
# Transfer the file to the remote path
246+
transfered = self._transfer_file(assembled.name, remote_path)
247+
248+
# Update the file permissions on the remote file
249+
self._fixup_perms2((self._connection._shell.tmpdir, remote_path))
250+
251+
# Update the 'src' arg with the temporary remote file
252+
submodule_args.update(
253+
dict(
254+
src=transfered,
255+
)
256+
)
257+
258+
# Execute the copy
259+
copy = self._execute_module(
260+
module_name="ansible.legacy.copy",
261+
module_args=submodule_args,
262+
task_vars=task_vars,
263+
)
264+
265+
if diff:
266+
copy.update(diff=diff)
267+
268+
result.update(copy)
269+
else:
270+
# Gather details on the existing file
271+
file = self._execute_module(
272+
module_name="ansible.legacy.file",
273+
module_args=submodule_args,
274+
task_vars=task_vars,
275+
)
276+
277+
result.update(file)
278+
except AnsibleAction as e:
279+
result.update(e.result)
280+
finally:
281+
self._remove_tmp_path(self._connection._shell.tmpdir)
282+
283+
return result

0 commit comments

Comments
 (0)