diff --git a/changelogs/fragments/173-storagebox.yml b/changelogs/fragments/173-storagebox.yml new file mode 100644 index 00000000..94834689 --- /dev/null +++ b/changelogs/fragments/173-storagebox.yml @@ -0,0 +1,2 @@ +minor_changes: + - "storagebox* modules - the code for the old API (that has been removed by Hetzner) has been replaced by hard-coding the result of the API, namely that no storagebox of this ID exists (https://github.com/ansible-collections/community.hrobot/pull/173)." diff --git a/plugins/modules/storagebox.py b/plugins/modules/storagebox.py index df59bd8a..cab18953 100644 --- a/plugins/modules/storagebox.py +++ b/plugins/modules/storagebox.py @@ -124,10 +124,8 @@ from ansible.module_utils.common.text.converters import to_native from ansible_collections.community.hrobot.plugins.module_utils.robot import ( - BASE_URL, ROBOT_DEFAULT_ARGUMENT_SPEC, _ROBOT_DEFAULT_ARGUMENT_SPEC_COMPAT_DEPRECATED, - fetch_url_json, ) from ansible_collections.community.hrobot.plugins.module_utils.api import ( @@ -139,21 +137,6 @@ api_fetch_url_json, ) -try: - from urllib.parse import urlencode -except ImportError: - # Python 2.x fallback: - from urllib import urlencode - - -PARAMETERS_LEGACY = { - 'name': ('name', 'storagebox_name'), - 'webdav': ('webdav', 'webdav'), - 'samba': ('samba', 'samba'), - 'ssh': ('ssh', 'ssh'), - 'external_reachability': ('external_reachability', 'external_reachability'), - 'zfs': ('zfs', 'zfs'), -} UPDATE_PARAMETERS = { 'name': ('name', ['name'], 'name'), @@ -171,11 +154,6 @@ PARAMETERS.update(ACTION_PARAMETERS) -def extract_legacy(result): - sb = result['storagebox'] - return {key: sb.get(key) for key, dummy in PARAMETERS_LEGACY.values()} - - def extract(result): sb = result['storage_box'] @@ -221,101 +199,68 @@ def main(): collection_name="community.hrobot", version="3.0.0", ) - # DEPRECATED: old API - url = "{0}/storagebox/{1}".format(BASE_URL, storagebox_id) - result, error = fetch_url_json(module, url, accept_errors=['STORAGEBOX_NOT_FOUND']) - if error: - module.fail_json(msg='Storagebox with ID {0} does not exist'.format(storagebox_id)) - - before = extract_legacy(result) - after = dict(before) - - for option_name, (data_name, change_name) in PARAMETERS_LEGACY.items(): - value = module.params[option_name] - if value is not None: - if before[data_name] != value: - after[data_name] = value - if isinstance(value, bool): - changes[change_name] = str(value).lower() - else: - changes[change_name] = value - - if changes and not module.check_mode: - headers = {"Content-type": "application/x-www-form-urlencoded"} - result, error = fetch_url_json( - module, - url, - data=urlencode(changes), - headers=headers, - method='POST', - accept_errors=['INVALID_INPUT'], - ) - if error: - invalid = result['error'].get('invalid') or [] - module.fail_json(msg='The values to update were invalid ({0})'.format(', '.join(invalid))) - after = extract_legacy(result) - - else: - # NEW API! - url = "{0}/v1/storage_boxes/{1}".format(API_BASE_URL, storagebox_id) - result, dummy, error = api_fetch_url_json(module, url, accept_errors=['not_found']) + module.warn("The old storagebox API has been disabled by Hetzner. The supporting code has been removed.") + module.fail_json(msg='Storagebox with ID {0} does not exist'.format(storagebox_id)) + + url = "{0}/v1/storage_boxes/{1}".format(API_BASE_URL, storagebox_id) + result, dummy, error = api_fetch_url_json(module, url, accept_errors=['not_found']) + if error: + module.fail_json(msg='Storagebox with ID {0} does not exist'.format(storagebox_id)) + + before = extract(result) + after = dict(before) + + update = {} + for option_name, (data_name, dummy, change_name) in UPDATE_PARAMETERS.items(): + value = module.params[option_name] + if value is not None: + if before[data_name] != value: + after[data_name] = value + changes[change_name] = value + update[change_name] = value + + action = {} + update_after_update = {} + for option_name, (data_name, dummy, change_name) in ACTION_PARAMETERS.items(): + value = module.params[option_name] + if value is not None: + if before[data_name] != value: + after[data_name] = value + update_after_update[data_name] = value + changes[change_name] = value + action[change_name] = value + + if update and not module.check_mode: + headers = {"Content-type": "application/json"} + result, dummy, error = api_fetch_url_json( + module, + url, + data=module.jsonify(update), + headers=headers, + method='PUT', + accept_errors=['invalid_input'], + ) if error: - module.fail_json(msg='Storagebox with ID {0} does not exist'.format(storagebox_id)) - - before = extract(result) - after = dict(before) - - update = {} - for option_name, (data_name, dummy, change_name) in UPDATE_PARAMETERS.items(): - value = module.params[option_name] - if value is not None: - if before[data_name] != value: - after[data_name] = value - changes[change_name] = value - update[change_name] = value - - action = {} - update_after_update = {} - for option_name, (data_name, dummy, change_name) in ACTION_PARAMETERS.items(): - value = module.params[option_name] - if value is not None: - if before[data_name] != value: - after[data_name] = value - update_after_update[data_name] = value - changes[change_name] = value - action[change_name] = value - - if update and not module.check_mode: - headers = {"Content-type": "application/json"} - result, dummy, error = api_fetch_url_json( + details = result['error'].get('details') or {} + fields = details.get("fields") or [] + details_str = ", ".join(['{0}: {1}'.format(to_native(field["name"]), to_native(field["message"])) for field in fields]) + module.fail_json(msg='The values to update were invalid ({0})'.format(details_str or "no details")) + after = extract(result) + + if action and not module.check_mode: + after.update(update_after_update) + action_url = "{0}/actions/update_access_settings".format(url) + try: + api_apply_action( module, - url, - data=module.jsonify(update), - headers=headers, - method='PUT', - accept_errors=['invalid_input'], + action_url, + action, + lambda action_id: "{0}/v1/storage_boxes/actions/{1}".format(API_BASE_URL, action_id), + check_done_delay=1, + check_done_timeout=60, ) - if error: - details = result['error'].get('details') or {} - fields = details.get("fields") or [] - details_str = ", ".join(['{0}: {1}'.format(to_native(field["name"]), to_native(field["message"])) for field in fields]) - module.fail_json(msg='The values to update were invalid ({0})'.format(details_str or "no details")) - after = extract(result) - - if action and not module.check_mode: - after.update(update_after_update) - action_url = "{0}/actions/update_access_settings".format(url) - try: - api_apply_action( - module, - action_url, - action, - lambda action_id: "{0}/v1/storage_boxes/actions/{1}".format(API_BASE_URL, action_id), - check_done_delay=1, - check_done_timeout=60, - ) - except ApplyActionError as exc: - module.fail_json(msg='Error while updating access settings: {0}'.format(exc)) + except ApplyActionError as exc: + module.fail_json(msg='Error while updating access settings: {0}'.format(exc)) result = dict(after) result['changed'] = bool(changes) diff --git a/plugins/modules/storagebox_info.py b/plugins/modules/storagebox_info.py index 5bd74201..506a4f63 100644 --- a/plugins/modules/storagebox_info.py +++ b/plugins/modules/storagebox_info.py @@ -539,10 +539,8 @@ from ansible.module_utils.basic import AnsibleModule from ansible_collections.community.hrobot.plugins.module_utils.robot import ( - BASE_URL, ROBOT_DEFAULT_ARGUMENT_SPEC, _ROBOT_DEFAULT_ARGUMENT_SPEC_COMPAT_DEPRECATED, - fetch_url_json, ) from ansible_collections.community.hrobot.plugins.module_utils.api import ( @@ -557,12 +555,6 @@ deprecate_value, ) -try: - from urllib.parse import urlencode -except ImportError: - # Python 2.x fallback: - from urllib import urlencode - _CONVERT = { "login": ["username"], @@ -621,49 +613,18 @@ def main(): collection_name="community.hrobot", version="3.0.0", ) - # DEPRECATED: old API - if storagebox_id is not None: - storagebox_ids = [storagebox_id] - else: - url = "{0}/storagebox".format(BASE_URL) - data = None - headers = None - if linked_server_number is not None: - data = urlencode({ - "linked_server": linked_server_number, - }) - headers = { - "Content-type": "application/x-www-form-urlencoded", - } - result, error = fetch_url_json(module, url, accept_errors=['STORAGEBOX_NOT_FOUND'], data=data) - storagebox_ids = [] - if not error: - # When filtering by linked_server, the result should be a dictionary - if isinstance(result, dict): - result = [result] - for entry in result: - if full_info: - storagebox_ids.append(entry['storagebox']['id']) - else: - storageboxes.append(entry['storagebox']) - - for storagebox_id in storagebox_ids: - url = "{0}/storagebox/{1}".format(BASE_URL, storagebox_id) - result, error = fetch_url_json(module, url, accept_errors=['STORAGEBOX_NOT_FOUND']) - if not error: - storageboxes.append(result['storagebox']) + module.warn("The old storagebox API has been disabled by Hetzner. The supporting code has been removed.") + module.exit_json(changed=False, storageboxes=[]) + if storagebox_id is not None: + url = "{0}/v1/storage_boxes/{1}".format(API_BASE_URL, storagebox_id) + result, dummy, error = api_fetch_url_json(module, url, accept_errors=["not_found"]) + if error is None: + storageboxes = [result["storage_box"]] else: - # NEW API! - if storagebox_id is not None: - url = "{0}/v1/storage_boxes/{1}".format(API_BASE_URL, storagebox_id) - result, dummy, error = api_fetch_url_json(module, url, accept_errors=["not_found"]) - if error is None: - storageboxes = [result["storage_box"]] - else: - url = "{0}/v1/storage_boxes".format(API_BASE_URL) - storageboxes, dummy = api_fetch_url_json_list(module, url, data_key="storage_boxes") - storageboxes = [add_hrobot_compat_shim(storagebox) for storagebox in storageboxes] + url = "{0}/v1/storage_boxes".format(API_BASE_URL) + storageboxes, dummy = api_fetch_url_json_list(module, url, data_key="storage_boxes") + storageboxes = [add_hrobot_compat_shim(storagebox) for storagebox in storageboxes] module.exit_json( changed=False, diff --git a/plugins/modules/storagebox_set_password.py b/plugins/modules/storagebox_set_password.py index abd80bc6..debacc5d 100644 --- a/plugins/modules/storagebox_set_password.py +++ b/plugins/modules/storagebox_set_password.py @@ -83,10 +83,8 @@ from ansible.module_utils.basic import AnsibleModule from ansible_collections.community.hrobot.plugins.module_utils.robot import ( - BASE_URL, ROBOT_DEFAULT_ARGUMENT_SPEC, _ROBOT_DEFAULT_ARGUMENT_SPEC_COMPAT_DEPRECATED, - fetch_url_json, ) from ansible_collections.community.hrobot.plugins.module_utils.api import ( @@ -97,12 +95,6 @@ api_apply_action, ) -try: - from urllib.parse import urlencode -except ImportError: - # Python 2.x fallback: - from urllib import urlencode - def main(): argument_spec = dict( @@ -128,51 +120,30 @@ def main(): collection_name="community.hrobot", version="3.0.0", ) - # DEPRECATED: old API - url = "{0}/storagebox/{1}/password".format(BASE_URL, id) - accepted_errors = ["STORAGEBOX_NOT_FOUND", "STORAGEBOX_INVALID_PASSWORD"] - - if password: - headers = {"Content-type": "application/x-www-form-urlencoded"} - result, error = fetch_url_json( - module, url, method="POST", accept_errors=accepted_errors, data=urlencode({"password": password}), headers=headers) - else: - result, error = fetch_url_json( - module, url, method="POST", accept_errors=accepted_errors) - - if error == 'STORAGEBOX_NOT_FOUND': - module.fail_json( - msg='Storage Box with ID {0} not found'.format(id)) - - if error == 'STORAGEBOX_INVALID_PASSWORD': - module.fail_json( - msg="The chosen password has been considered insecure or does not comply with Hetzner's password guideline") - - module.exit_json(changed=True, password=result["password"]) - - else: - # NEW API! - action_url = "{0}/v1/storage_boxes/{1}/actions/reset_password".format(API_BASE_URL, id) - action = { - "password": password, - } - try: - dummy, error = api_apply_action( - module, - action_url, - action, - lambda action_id: "{0}/v1/storage_boxes/actions/{1}".format(API_BASE_URL, action_id), - check_done_delay=1, - check_done_timeout=60, - accept_errors=["not_found"], - ) - except ApplyActionError as exc: - module.fail_json(msg='Error while resetting password: {0}'.format(exc)) - - if error == "not_found": - module.fail_json(msg='Storage Box with ID {0} not found'.format(id)) - - module.exit_json(changed=True, password=password) + module.warn("The old storagebox API has been disabled by Hetzner. The supporting code has been removed.") + module.fail_json(msg='Storage Box with ID {0} not found'.format(id)) + + action_url = "{0}/v1/storage_boxes/{1}/actions/reset_password".format(API_BASE_URL, id) + action = { + "password": password, + } + try: + dummy, error = api_apply_action( + module, + action_url, + action, + lambda action_id: "{0}/v1/storage_boxes/actions/{1}".format(API_BASE_URL, action_id), + check_done_delay=1, + check_done_timeout=60, + accept_errors=["not_found"], + ) + except ApplyActionError as exc: + module.fail_json(msg='Error while resetting password: {0}'.format(exc)) + + if error == "not_found": + module.fail_json(msg='Storage Box with ID {0} not found'.format(id)) + + module.exit_json(changed=True, password=password) if __name__ == '__main__': # pragma: no cover diff --git a/plugins/modules/storagebox_snapshot.py b/plugins/modules/storagebox_snapshot.py index aadd6f70..24995c8e 100644 --- a/plugins/modules/storagebox_snapshot.py +++ b/plugins/modules/storagebox_snapshot.py @@ -106,10 +106,8 @@ from ansible.module_utils.basic import AnsibleModule from ansible_collections.community.hrobot.plugins.module_utils.robot import ( - BASE_URL, ROBOT_DEFAULT_ARGUMENT_SPEC, _ROBOT_DEFAULT_ARGUMENT_SPEC_COMPAT_DEPRECATED, - fetch_url_json, ) from ansible_collections.community.hrobot.plugins.module_utils.api import ( @@ -128,15 +126,6 @@ from urllib import urlencode -def legacy_handle_errors(module, error, storagebox_id=None, snapshot_name=None): - error_messages = { - "STORAGEBOX_NOT_FOUND": "Storagebox with ID {0} does not exist".format(storagebox_id), - "SNAPSHOT_NOT_FOUND": "Snapshot with name {0} does not exist".format(snapshot_name), - "SNAPSHOT_LIMIT_EXCEEDED": "Snapshot limit exceeded", - } - module.fail_json(msg=error_messages.get(error, error)) - - def extract_legacy(snapshot): return { 'id': snapshot['id'], @@ -178,165 +167,85 @@ def main(): collection_name="community.hrobot", version="3.0.0", ) - # DEPRECATED: old API - - # Create snapshot - if state == 'present' and not snapshot_name: - if module.check_mode: - module.exit_json(changed=True) - snapshot = legacy_create_snapshot(module, storagebox_id) - - # Add the comment if provided - if snapshot_comment is not None: - legacy_update_snapshot_comment(module, storagebox_id, snapshot['name'], snapshot_comment) - snapshot['comment'] = snapshot_comment + module.warn("The old storagebox API has been disabled by Hetzner. The supporting code has been removed.") + module.fail_json(msg="Storagebox with ID {0} does not exist".format(storagebox_id)) - module.exit_json(changed=True, snapshot=snapshot) - - # Update snapshot comment - elif state == 'present' and snapshot_name: - if snapshot_comment is None: - module.fail_json(msg="snapshot_comment is required when updating a snapshot") - - snapshots = legacy_fetch_snapshots(module=module, storagebox_id=storagebox_id) - snapshot = legacy_get_snapshot_by_name(snapshots, snapshot_name) - if not snapshot: - legacy_handle_errors(module, "SNAPSHOT_NOT_FOUND", snapshot_name=snapshot_name) - if snapshot_comment != snapshot['comment']: - if not module.check_mode: - legacy_update_snapshot_comment(module, storagebox_id, snapshot_name, snapshot_comment) - module.exit_json(changed=True, snapshot=snapshot) - else: - module.exit_json(changed=False, snapshot=snapshot) - - # Delete snapshot + # Create snapshot + if state == 'present' and not snapshot_name: + if module.check_mode: + module.exit_json(changed=True) + action_url = "{0}/v1/storage_boxes/{1}/snapshots".format(API_BASE_URL, storagebox_id) + action = {} + if snapshot_comment: + action["description"] = snapshot_comment + try: + extracted_ids, error = api_apply_action( + module, + action_url, + action, + lambda action_id: "{0}/v1/storage_boxes/actions/{1}".format(API_BASE_URL, action_id), + check_done_delay=1, + check_done_timeout=120, + accept_errors=["not_found"], + ) + except ApplyActionError as exc: + module.fail_json(msg='Error while creating snapshot: {0}'.format(exc)) + + if error == "not_found": + module.fail_json(msg="Storagebox with ID {0} does not exist".format(storagebox_id)) + + new_snapshot_id = extracted_ids["storage_box_snapshot"] + # Retrieve created snapshot + url = "{0}/v1/storage_boxes/{1}/snapshots/{2}".format(API_BASE_URL, storagebox_id, new_snapshot_id) + snapshot = api_fetch_url_json(module, url, method='GET')[0]["snapshot"] + + module.exit_json(changed=True, snapshot=extract_legacy(snapshot)) + + # Update snapshot comment + elif state == 'present' and snapshot_name: + if snapshot_comment is None: + module.fail_json(msg="snapshot_comment is required when updating a snapshot") + + snapshot = find_snapshot(module, storagebox_id, snapshot_name) + if not snapshot: + module.fail_json(msg="Snapshot with name {0} does not exist".format(snapshot_name)) + if snapshot_comment == snapshot['description']: + module.exit_json(changed=False, snapshot=extract_legacy(snapshot)) + if not module.check_mode: + url = "{0}/v1/storage_boxes/{1}/snapshots/{2}".format(API_BASE_URL, storagebox_id, snapshot['id']) + headers = {"Content-type": "application/json"} + result, dummy, dummy2 = api_fetch_url_json( + module, + url, + method='PUT', + data=module.jsonify({"description": snapshot_comment}), + headers=headers, + ) + snapshot = result["snapshot"] else: - snapshots = legacy_fetch_snapshots(module=module, storagebox_id=storagebox_id) - snapshot = legacy_get_snapshot_by_name(snapshots, snapshot_name) - if snapshot: - if not module.check_mode: - legacy_delete_snapshot(module, storagebox_id, snapshot_name) - module.exit_json(changed=True) - else: - module.exit_json(changed=False) + snapshot['description'] = snapshot_comment + module.exit_json(changed=True, snapshot=extract_legacy(snapshot)) + # Delete snapshot else: - # NEW API! - - # Create snapshot - if state == 'present' and not snapshot_name: - if module.check_mode: - module.exit_json(changed=True) - action_url = "{0}/v1/storage_boxes/{1}/snapshots".format(API_BASE_URL, storagebox_id) - action = {} - if snapshot_comment: - action["description"] = snapshot_comment + snapshot = find_snapshot(module, storagebox_id, snapshot_name) + if not snapshot: + module.exit_json(changed=False) + if not module.check_mode: + action_url = "{0}/v1/storage_boxes/{1}/snapshots/{2}".format(API_BASE_URL, storagebox_id, snapshot['id']) try: - extracted_ids, error = api_apply_action( + api_apply_action( module, action_url, - action, + None, lambda action_id: "{0}/v1/storage_boxes/actions/{1}".format(API_BASE_URL, action_id), + method='DELETE', check_done_delay=1, check_done_timeout=120, - accept_errors=["not_found"], ) except ApplyActionError as exc: - module.fail_json(msg='Error while creating snapshot: {0}'.format(exc)) - - if error == "not_found": - module.fail_json(msg="Storagebox with ID {0} does not exist".format(storagebox_id)) - - new_snapshot_id = extracted_ids["storage_box_snapshot"] - # Retrieve created snapshot - url = "{0}/v1/storage_boxes/{1}/snapshots/{2}".format(API_BASE_URL, storagebox_id, new_snapshot_id) - snapshot = api_fetch_url_json(module, url, method='GET')[0]["snapshot"] - - module.exit_json(changed=True, snapshot=extract_legacy(snapshot)) - - # Update snapshot comment - elif state == 'present' and snapshot_name: - if snapshot_comment is None: - module.fail_json(msg="snapshot_comment is required when updating a snapshot") - - snapshot = find_snapshot(module, storagebox_id, snapshot_name) - if not snapshot: - module.fail_json(msg="Snapshot with name {0} does not exist".format(snapshot_name)) - if snapshot_comment == snapshot['description']: - module.exit_json(changed=False, snapshot=extract_legacy(snapshot)) - if not module.check_mode: - url = "{0}/v1/storage_boxes/{1}/snapshots/{2}".format(API_BASE_URL, storagebox_id, snapshot['id']) - headers = {"Content-type": "application/json"} - result, dummy, dummy2 = api_fetch_url_json( - module, - url, - method='PUT', - data=module.jsonify({"description": snapshot_comment}), - headers=headers, - ) - snapshot = result["snapshot"] - else: - snapshot['description'] = snapshot_comment - module.exit_json(changed=True, snapshot=extract_legacy(snapshot)) - - # Delete snapshot - else: - snapshot = find_snapshot(module, storagebox_id, snapshot_name) - if not snapshot: - module.exit_json(changed=False) - if not module.check_mode: - action_url = "{0}/v1/storage_boxes/{1}/snapshots/{2}".format(API_BASE_URL, storagebox_id, snapshot['id']) - try: - api_apply_action( - module, - action_url, - None, - lambda action_id: "{0}/v1/storage_boxes/actions/{1}".format(API_BASE_URL, action_id), - method='DELETE', - check_done_delay=1, - check_done_timeout=120, - ) - except ApplyActionError as exc: - module.fail_json(msg='Error while deleting snapshot: {0}'.format(exc)) - module.exit_json(changed=True) - - -def legacy_delete_snapshot(module, storagebox_id, snapshot_name): - url = "{0}/storagebox/{1}/snapshot/{2}".format(BASE_URL, storagebox_id, snapshot_name) - fetch_url_json(module, url, method="DELETE", allow_empty_result=True) - - -def legacy_update_snapshot_comment(module, storagebox_id, snapshot_name, snapshot_comment): - url = "{0}/storagebox/{1}/snapshot/{2}/comment".format(BASE_URL, storagebox_id, snapshot_name) - headers = {"Content-type": "application/x-www-form-urlencoded"} - fetch_url_json( - module, url, method="POST", data=urlencode({"comment": snapshot_comment}), headers=headers, allow_empty_result=True, - ) - - -def legacy_create_snapshot(module, storagebox_id): - url = "{0}/storagebox/{1}/snapshot".format(BASE_URL, storagebox_id) - result, error = fetch_url_json( - module, url, method="POST", accept_errors=["STORAGEBOX_NOT_FOUND", "SNAPSHOT_LIMIT_EXCEEDED"], - ) - if error: - legacy_handle_errors(module, error, storagebox_id) - return result['snapshot'] - - -def legacy_get_snapshot_by_name(snapshots, name): - for snapshot in snapshots: - if snapshot['name'] == name: - return snapshot - return None - - -def legacy_fetch_snapshots(module, storagebox_id): - url = "{0}/storagebox/{1}/snapshot".format(BASE_URL, storagebox_id) - result, error = fetch_url_json(module, url, method="GET", accept_errors=["STORAGEBOX_NOT_FOUND"]) - if error: - legacy_handle_errors(module, error, storagebox_id) - return [item['snapshot'] for item in result] + module.fail_json(msg='Error while deleting snapshot: {0}'.format(exc)) + module.exit_json(changed=True) def find_snapshot(module, storagebox_id, snapshot_name): diff --git a/plugins/modules/storagebox_snapshot_info.py b/plugins/modules/storagebox_snapshot_info.py index 9b16455f..2cae4d40 100644 --- a/plugins/modules/storagebox_snapshot_info.py +++ b/plugins/modules/storagebox_snapshot_info.py @@ -175,10 +175,8 @@ from ansible.module_utils.basic import AnsibleModule from ansible_collections.community.hrobot.plugins.module_utils.robot import ( - BASE_URL, ROBOT_DEFAULT_ARGUMENT_SPEC, _ROBOT_DEFAULT_ARGUMENT_SPEC_COMPAT_DEPRECATED, - fetch_url_json, ) from ansible_collections.community.hrobot.plugins.module_utils.api import ( @@ -244,29 +242,18 @@ def main(): collection_name="community.hrobot", version="3.0.0", ) - # DEPRECATED: old API - url = "{0}/storagebox/{1}/snapshot".format(BASE_URL, storagebox_id) - result, error = fetch_url_json(module, url, accept_errors=['STORAGEBOX_NOT_FOUND']) - if error: - module.fail_json(msg='Storagebox with ID {0} does not exist'.format(storagebox_id)) + module.warn("The old storagebox API has been disabled by Hetzner. The supporting code has been removed.") + module.fail_json(msg='Storagebox with ID {0} does not exist'.format(storagebox_id)) - module.exit_json( - changed=False, - snapshots=[item['snapshot'] for item in result], - ) - - else: - # NEW API! - - url = "{0}/v1/storage_boxes/{1}/snapshots".format(API_BASE_URL, storagebox_id) - result, dummy, error = api_fetch_url_json(module, url, accept_errors=['not_found']) - if error: - module.fail_json(msg='Storagebox with ID {0} does not exist'.format(storagebox_id)) + url = "{0}/v1/storage_boxes/{1}/snapshots".format(API_BASE_URL, storagebox_id) + result, dummy, error = api_fetch_url_json(module, url, accept_errors=['not_found']) + if error: + module.fail_json(msg='Storagebox with ID {0} does not exist'.format(storagebox_id)) - module.exit_json( - changed=False, - snapshots=[adjust_legacy(item) for item in result['snapshots']], - ) + module.exit_json( + changed=False, + snapshots=[adjust_legacy(item) for item in result['snapshots']], + ) if __name__ == '__main__': # pragma: no cover diff --git a/plugins/modules/storagebox_snapshot_plan.py b/plugins/modules/storagebox_snapshot_plan.py index f83335dd..223120cd 100644 --- a/plugins/modules/storagebox_snapshot_plan.py +++ b/plugins/modules/storagebox_snapshot_plan.py @@ -171,10 +171,8 @@ from ansible.module_utils.basic import AnsibleModule from ansible_collections.community.hrobot.plugins.module_utils.robot import ( - BASE_URL, ROBOT_DEFAULT_ARGUMENT_SPEC, _ROBOT_DEFAULT_ARGUMENT_SPEC_COMPAT_DEPRECATED, - fetch_url_json, ) from ansible_collections.community.hrobot.plugins.module_utils.api import ( @@ -190,28 +188,6 @@ deprecate_value, ) -try: - from urllib.parse import urlencode -except ImportError: - # Python 2.x fallback: - from urllib import urlencode - - -LEGACY_PARAMETERS = { - 'status': ('status', 'status'), # according to the API docs 'status' cannot be provided as input to POST, but that's not true - 'minute': ('minute', 'minute'), - 'hour': ('hour', 'hour'), - 'day_of_week': ('day_of_week', 'day_of_week'), - 'day_of_month': ('day_of_month', 'day_of_month'), - 'month': ('month', 'month'), - 'max_snapshots': ('max_snapshots', 'max_snapshots'), -} - - -def extract_legacy(result): - sb = result['snapshotplan'] - return {key: sb.get(key) for key, dummy in LEGACY_PARAMETERS.values()} - def extract(result): sb = result['storage_box'] @@ -282,114 +258,44 @@ def main(): collection_name="community.hrobot", version="3.0.0", ) - # DEPRECATED: old API - url = "{0}/storagebox/{1}/snapshotplan".format(BASE_URL, storagebox_id) - result, error = fetch_url_json(module, url, accept_errors=['STORAGEBOX_NOT_FOUND']) - if error: - module.fail_json(msg='Storagebox with ID {0} does not exist'.format(storagebox_id)) - - # The documentation (https://robot.hetzner.com/doc/webservice/en.html#get-storagebox-storagebox-id-snapshotplan) - # claims that the result is a list, but actually it is a dictionary. Convert it to a list of dicts if that's the case. - if isinstance(result, dict): - result = [result] + module.warn("The old storagebox API has been disabled by Hetzner. The supporting code has been removed.") + module.fail_json(msg='Storagebox with ID {0} does not exist'.format(storagebox_id)) - before = [extract_legacy(plan) for plan in result] - after = [ - { - data_name: ( - plan[option_name] - if plan['status'] == 'enabled' or option_name == 'status' else - None - ) - for option_name, (data_name, dummy) in LEGACY_PARAMETERS.items() - } - for plan in plans - ] - changes = [] + if plans[0]['month'] is not None: + module.fail_json(msg='The new Hetzner API does not support specifying month for a plan.') - for index, plan in enumerate(after): - existing_plan = before[index] if index < len(before) else {} - plan_values = {} - has_changes = False - for data_name, change_name in LEGACY_PARAMETERS.values(): - before_value = existing_plan.get(data_name) - after_value = plan[data_name] - if before_value != after_value: - has_changes = True - if after_value is not None and change_name is not None: - plan_values[change_name] = after_value - if has_changes: - if plan['status'] == 'disabled': - # For some reason, minute and hour are required even for disabled plans, - # even though the documentation says otherwise - plan_values['minute'] = 0 - plan_values['hour'] = 0 - changes.append((index, plan_values)) + url = "{0}/v1/storage_boxes/{1}".format(API_BASE_URL, storagebox_id) + result, dummy, error = api_fetch_url_json(module, url, accept_errors=["not_found"]) + if error: + module.fail_json(msg='Storagebox with ID {0} does not exist'.format(storagebox_id)) - if changes and not module.check_mode: - headers = {"Content-type": "application/x-www-form-urlencoded"} - # TODO: If the API ever changes to support more than one plan, the following need to change - if len(changes) != 1: # pragma: no cover - raise AssertionError('Current implementation can handle only one plan') # pragma: no cover - actual_changes = changes[0][1] - result, error = fetch_url_json( + before = extract(result) + after = { + key: (value if plan['status'] == 'enabled' or key == 'status' else None) + for key, value in plan.items() + } + action_enable = None + if before != after: + action_enable = (after['status'] == 'enabled') + action = {key: value for key, value in plan.items() if key not in ('status', 'month')} if action_enable else {} + + if action_enable is not None and not module.check_mode: + action_url = "{0}/actions/{1}".format(url, 'enable_snapshot_plan' if action_enable else 'disable_snapshot_plan') + try: + api_apply_action( module, - url, - data=urlencode(actual_changes) if actual_changes else None, - headers=headers, - method='POST', - accept_errors=['INVALID_INPUT'], + action_url, + action if action_enable else None, + lambda action_id: "{0}/v1/storage_boxes/actions/{1}".format(API_BASE_URL, action_id), + check_done_delay=1, + check_done_timeout=60, ) - if error: - invalid = result['error'].get('invalid') or [] - module.fail_json(msg='The values to update were invalid ({0})'.format(', '.join(invalid))) - - # The documentation (https://robot.hetzner.com/doc/webservice/en.html#post-storagebox-storagebox-id-snapshotplan) - # claims that the result is a list, but actually it is a dictionary. Convert it to a list of dicts if that's the case. - if isinstance(result, dict): - result = [result] - - after = [extract_legacy(plan) for plan in result] - - changed = bool(changes) - - else: - # NEW API! - if plans[0]['month'] is not None: - module.fail_json(msg='The new Hetzner API does not support specifying month for a plan.') - - url = "{0}/v1/storage_boxes/{1}".format(API_BASE_URL, storagebox_id) - result, dummy, error = api_fetch_url_json(module, url, accept_errors=["not_found"]) - if error: - module.fail_json(msg='Storagebox with ID {0} does not exist'.format(storagebox_id)) - - before = extract(result) - after = { - key: (value if plan['status'] == 'enabled' or key == 'status' else None) - for key, value in plan.items() - } - action_enable = None - if before != after: - action_enable = (after['status'] == 'enabled') - action = {key: value for key, value in plan.items() if key not in ('status', 'month')} if action_enable else {} - - if action_enable is not None and not module.check_mode: - action_url = "{0}/actions/{1}".format(url, 'enable_snapshot_plan' if action_enable else 'disable_snapshot_plan') - try: - api_apply_action( - module, - action_url, - action if action_enable else None, - lambda action_id: "{0}/v1/storage_boxes/actions/{1}".format(API_BASE_URL, action_id), - check_done_delay=1, - check_done_timeout=60, - ) - except ApplyActionError as exc: - module.fail_json(msg='Error while updating the snapshot plan: {0}'.format(exc)) + except ApplyActionError as exc: + module.fail_json(msg='Error while updating the snapshot plan: {0}'.format(exc)) - changed = action_enable is not None - before = [before] - after = [after] + changed = action_enable is not None + before = [before] + after = [after] module.exit_json( changed=changed, diff --git a/plugins/modules/storagebox_snapshot_plan_info.py b/plugins/modules/storagebox_snapshot_plan_info.py index 50bc3a15..443ef949 100644 --- a/plugins/modules/storagebox_snapshot_plan_info.py +++ b/plugins/modules/storagebox_snapshot_plan_info.py @@ -117,10 +117,8 @@ from ansible.module_utils.basic import AnsibleModule from ansible_collections.community.hrobot.plugins.module_utils.robot import ( - BASE_URL, ROBOT_DEFAULT_ARGUMENT_SPEC, _ROBOT_DEFAULT_ARGUMENT_SPEC_COMPAT_DEPRECATED, - fetch_url_json, ) from ansible_collections.community.hrobot.plugins.module_utils.api import ( @@ -135,19 +133,6 @@ ) -def extract_legacy(result): - sb = result['snapshotplan'] - return { - 'status': sb['status'], - 'minute': sb['minute'], - 'hour': sb['hour'], - 'day_of_week': sb['day_of_week'], - 'day_of_month': sb['day_of_month'], - 'month': sb['month'], - 'max_snapshots': sb['max_snapshots'], - } - - def extract(result): sb = result['storage_box'] sp = sb.get('snapshot_plan') @@ -194,27 +179,15 @@ def main(): collection_name="community.hrobot", version="3.0.0", ) - # DEPRECATED: old API - url = "{0}/storagebox/{1}/snapshotplan".format(BASE_URL, storagebox_id) - result, error = fetch_url_json(module, url, accept_errors=['STORAGEBOX_NOT_FOUND']) - if error: - module.fail_json(msg='Storagebox with ID {0} does not exist'.format(storagebox_id)) - - # The documentation (https://robot.hetzner.com/doc/webservice/en.html#get-storagebox-storagebox-id-snapshotplan) - # claims that the result is a list, but actually it is a dictionary. Convert it to a list of dicts if that's the case. - if isinstance(result, dict): - result = [result] - - plans = [extract_legacy(plan) for plan in result] - - else: - # NEW API! - url = "{0}/v1/storage_boxes/{1}".format(API_BASE_URL, storagebox_id) - result, dummy, error = api_fetch_url_json(module, url, accept_errors=["not_found"]) - if error: - module.fail_json(msg='Storagebox with ID {0} does not exist'.format(storagebox_id)) - - plans = [extract(result)] + module.warn("The old storagebox API has been disabled by Hetzner. The supporting code has been removed.") + module.fail_json(msg='Storagebox with ID {0} does not exist'.format(storagebox_id)) + + url = "{0}/v1/storage_boxes/{1}".format(API_BASE_URL, storagebox_id) + result, dummy, error = api_fetch_url_json(module, url, accept_errors=["not_found"]) + if error: + module.fail_json(msg='Storagebox with ID {0} does not exist'.format(storagebox_id)) + + plans = [extract(result)] module.exit_json( changed=False, diff --git a/plugins/modules/storagebox_subaccount.py b/plugins/modules/storagebox_subaccount.py index 8c48252f..09b09522 100644 --- a/plugins/modules/storagebox_subaccount.py +++ b/plugins/modules/storagebox_subaccount.py @@ -282,10 +282,8 @@ from ansible.module_utils.basic import AnsibleModule from ansible_collections.community.hrobot.plugins.module_utils.robot import ( - BASE_URL, ROBOT_DEFAULT_ARGUMENT_SPEC, _ROBOT_DEFAULT_ARGUMENT_SPEC_COMPAT_DEPRECATED, - fetch_url_json, ) from ansible_collections.community.hrobot.plugins.module_utils.api import ( @@ -301,133 +299,6 @@ deprecate_value, ) -try: - from urllib.parse import urlencode -except ImportError: - # Python 2.x fallback: - from urllib import urlencode - - -def legacy_encode_data(data): - """Converts booleans to lowercase strings and filters out None values.""" - return urlencode( - { - key: str(value).lower() if isinstance(value, bool) else value - for key, value in data.items() - if value is not None - } - ) - - -def legacy_create_subaccount(module, storagebox_id, subaccount): - url = "{0}/storagebox/{1}/subaccount".format(BASE_URL, storagebox_id) - res, error = fetch_url_json( - module, - url, - method="POST", - data=legacy_encode_data(subaccount), - headers={"Content-type": "application/x-www-form-urlencoded"}, - accept_errors=[ - "STORAGEBOX_SUBACCOUNT_LIMIT_EXCEEDED", - "STORAGEBOX_INVALID_PASSWORD", - ], - timeout=30000, # this endpoint is stupidly slow - ) - - if error == "STORAGEBOX_INVALID_PASSWORD": - module.fail_json(msg="Invalid password (says Hetzner)") - if error == "STORAGEBOX_SUBACCOUNT_LIMIT_EXCEEDED": - module.fail_json(msg="Subaccount limit exceeded") - - # Contains all subaccount informations - # { "subaccount": } - return res["subaccount"] - - -def legacy_merge_subaccounts_infos(original, updates): - # None values aren't updated - result = original.copy() - for key, value in updates.items(): - if value is not None: - result[key] = value - return result - - -def legacy_is_subaccount_updated(before, after): - for key, value in after.items(): - # Means user didn't provide a value - # we assume we don't want to update that field - if value is None: - continue - # password aren't considered part of update check - # due to being a different API call - if key == "password": - continue - if before.get(key) != value: - return True - return False - - -def legacy_delete_subaccount(module, storagebox_id, subaccount): - empty, error = fetch_url_json( - module, - "{0}/storagebox/{1}/subaccount/{2}".format( - BASE_URL, storagebox_id, subaccount["username"] - ), - method="DELETE", - allow_empty_result=True, - headers={"Content-type": "application/x-www-form-urlencoded"}, - ) - - -def legacy_update_subaccount(module, storagebox_id, subaccount): - empty, error = fetch_url_json( - module, - "{0}/storagebox/{1}/subaccount/{2}".format( - BASE_URL, storagebox_id, subaccount["username"] - ), - method="PUT", - data=legacy_encode_data({key: value for key, value in subaccount.items() if key != "password"}), - headers={"Content-type": "application/x-www-form-urlencoded"}, - allow_empty_result=True, - timeout=30000, # this endpoint is stupidly slow - ) - - -def legacy_update_subaccount_password(module, storagebox_id, subaccount): - new_password, error = fetch_url_json( - module, - "{0}/storagebox/{1}/subaccount/{2}/password".format( - BASE_URL, storagebox_id, subaccount["username"] - ), - method="POST", - data=legacy_encode_data({"password": subaccount["password"]}), - headers={"Content-type": "application/x-www-form-urlencoded"}, - accept_errors=[ - "STORAGEBOX_INVALID_PASSWORD", - ], - timeout=30000, # this endpoint is stupidly slow - ) - if error == "STORAGEBOX_INVALID_PASSWORD": - module.fail_json(msg="Invalid password (says Hetzner)") - - # { "password": } - return new_password["password"] - - -def legacy_get_subaccounts(module, storagebox_id): - url = "{0}/storagebox/{1}/subaccount".format(BASE_URL, storagebox_id) - result, error = fetch_url_json(module, url, accept_errors=["STORAGEBOX_NOT_FOUND"]) - if error: - module.fail_json( - msg="Storagebox with ID {0} does not exist".format(storagebox_id) - ) - # Hetzner's response [ { "subaccount": }, ... ] - return [item["subaccount"] for item in result] - - -# ----------------------------------------- - def create_subaccount(module, storagebox_id, subaccount): action_url = "{0}/v1/storage_boxes/{1}/subaccounts".format(API_BASE_URL, storagebox_id) @@ -682,139 +553,77 @@ def main(): collection_name="community.hrobot", version="3.0.0", ) - # DEPRECATED: old API - - existing_subaccounts = legacy_get_subaccounts(module, storagebox_id) - - matches = [ - sa for sa in existing_subaccounts - if sa[idempotence] == account_identifier - ] - if len(matches) > 1: - module.fail_json(msg="More than one subaccount matched the idempotence criteria.") - - existing = matches[0] if matches else None - - created = deleted = updated = password_updated = False - - if state == "absent": - if existing: - if not check_mode: - legacy_delete_subaccount(module, storagebox_id, existing) - deleted = True - elif state == "present" and existing: - # Set the found username in case user used comment as idempotence - subaccount["username"] = existing["username"] - - if ( - password_mode == "set-to-random" or - (password_mode == "update-if-provided" and subaccount["password"]) - ): - if password_mode == "set-to-random": - subaccount["password"] = None - if not check_mode: - new_password = legacy_update_subaccount_password(module, storagebox_id, subaccount) - subaccount["password"] = new_password - password_updated = True - - if legacy_is_subaccount_updated(existing, subaccount): - if not check_mode: - legacy_update_subaccount(module, storagebox_id, subaccount) - updated = True - else: # state 'present' without pre-existing account - if not subaccount["homedirectory"]: - module.fail_json(msg="homedirectory is required when creating a new subaccount") - if password_mode == "set-to-random": - subaccount["password"] = None - - del subaccount["username"] # username cannot be choosen + module.warn("The old storagebox API has been disabled by Hetzner. The supporting code has been removed.") + module.fail_json(msg="Storagebox with ID {0} does not exist".format(storagebox_id)) + + if password_mode == 'set-to-random': + module.fail_json(msg="The new Hetzner API does not support password_mode=set-to-random") + if idempotence == 'comment': + idempotence = 'description' + + existing_subaccounts = get_subaccounts(module, storagebox_id) + + matches = [ + sa for sa in existing_subaccounts + if sa[idempotence] == account_identifier + ] + if len(matches) > 1: + module.fail_json(msg="More than one subaccount matched the idempotence criteria.") + + existing = matches[0] if matches else None + + created = deleted = updated = password_updated = False + + if state == "absent": + if existing: if not check_mode: - # not necessary, allows us to get additional infos (created time etc...) - existing = legacy_create_subaccount(module, storagebox_id, subaccount) - created = True - - return_data = legacy_merge_subaccounts_infos(existing or {}, subaccount) - - module.exit_json( - changed=any([created, deleted, updated, password_updated]), - created=created, - deleted=deleted, - updated=updated, - password_updated=password_updated, - subaccount=return_data if state != "absent" else None, - ) + delete_subaccount(module, storagebox_id, existing) + deleted = True + elif state == "present" and existing: + # Set the found username in case user used comment as idempotence + subaccount["username"] = existing["username"] + + if ( + password_mode == "update-if-provided" and subaccount["password"] + ): + if not check_mode: + update_subaccount_password(module, storagebox_id, existing, subaccount["password"]) + password_updated = True - else: - # NEW API! - - if password_mode == 'set-to-random': - module.fail_json(msg="The new Hetzner API does not support password_mode=set-to-random") - if idempotence == 'comment': - idempotence = 'description' - - existing_subaccounts = get_subaccounts(module, storagebox_id) - - matches = [ - sa for sa in existing_subaccounts - if sa[idempotence] == account_identifier - ] - if len(matches) > 1: - module.fail_json(msg="More than one subaccount matched the idempotence criteria.") - - existing = matches[0] if matches else None - - created = deleted = updated = password_updated = False - - if state == "absent": - if existing: - if not check_mode: - delete_subaccount(module, storagebox_id, existing) - deleted = True - elif state == "present" and existing: - # Set the found username in case user used comment as idempotence - subaccount["username"] = existing["username"] - - if ( - password_mode == "update-if-provided" and subaccount["password"] - ): - if not check_mode: - update_subaccount_password(module, storagebox_id, existing, subaccount["password"]) - password_updated = True - - update, access_settings = get_subaccount_updates(existing, subaccount) - if update: - if not check_mode: - update_subaccount(module, storagebox_id, existing, update) - updated = True - if access_settings: - if not check_mode: - update_access_settings(module, storagebox_id, existing, access_settings) - updated = True - else: # state 'present' without pre-existing account - if not subaccount["homedirectory"]: - module.fail_json(msg="homedirectory is required when creating a new subaccount") - if not subaccount["password"]: - module.fail_json(msg="password is required when creating a new subaccount") - - del subaccount["username"] # username cannot be choosen + update, access_settings = get_subaccount_updates(existing, subaccount) + if update: if not check_mode: - new_subaccount_id = create_subaccount(module, storagebox_id, subaccount) - # Retrieve created subaccount - # (not necessary, allows us to get additional infos (created time etc...)) - url = "{0}/v1/storage_boxes/{1}/subaccounts/{2}".format(API_BASE_URL, storagebox_id, new_subaccount_id) - existing = api_fetch_url_json(module, url, method='GET')[0]["subaccount"] - created = True - - return_data = merge_subaccounts_infos(existing or {}, subaccount) - - module.exit_json( - changed=any([created, deleted, updated, password_updated]), - created=created, - deleted=deleted, - updated=updated, - password_updated=password_updated, - subaccount=adjust_legacy(return_data) if state != "absent" else None, - ) + update_subaccount(module, storagebox_id, existing, update) + updated = True + if access_settings: + if not check_mode: + update_access_settings(module, storagebox_id, existing, access_settings) + updated = True + else: # state 'present' without pre-existing account + if not subaccount["homedirectory"]: + module.fail_json(msg="homedirectory is required when creating a new subaccount") + if not subaccount["password"]: + module.fail_json(msg="password is required when creating a new subaccount") + + del subaccount["username"] # username cannot be choosen + if not check_mode: + new_subaccount_id = create_subaccount(module, storagebox_id, subaccount) + # Retrieve created subaccount + # (not necessary, allows us to get additional infos (created time etc...)) + url = "{0}/v1/storage_boxes/{1}/subaccounts/{2}".format(API_BASE_URL, storagebox_id, new_subaccount_id) + existing = api_fetch_url_json(module, url, method='GET')[0]["subaccount"] + created = True + + return_data = merge_subaccounts_infos(existing or {}, subaccount) + + module.exit_json( + changed=any([created, deleted, updated, password_updated]), + created=created, + deleted=deleted, + updated=updated, + password_updated=password_updated, + subaccount=adjust_legacy(return_data) if state != "absent" else None, + ) if __name__ == "__main__": # pragma: no cover diff --git a/plugins/modules/storagebox_subaccount_info.py b/plugins/modules/storagebox_subaccount_info.py index e1f51ae6..fc85df26 100644 --- a/plugins/modules/storagebox_subaccount_info.py +++ b/plugins/modules/storagebox_subaccount_info.py @@ -226,10 +226,8 @@ from ansible.module_utils.basic import AnsibleModule from ansible_collections.community.hrobot.plugins.module_utils.robot import ( - BASE_URL, ROBOT_DEFAULT_ARGUMENT_SPEC, _ROBOT_DEFAULT_ARGUMENT_SPEC_COMPAT_DEPRECATED, - fetch_url_json, ) from ansible_collections.community.hrobot.plugins.module_utils.api import ( @@ -310,32 +308,18 @@ def main(): collection_name="community.hrobot", version="3.0.0", ) - # DEPRECATED: old API + module.warn("The old storagebox API has been disabled by Hetzner. The supporting code has been removed.") + module.fail_json(msg="Storagebox with ID {0} does not exist".format(storagebox_id)) - url = "{0}/storagebox/{1}/subaccount".format(BASE_URL, storagebox_id) - result, error = fetch_url_json(module, url, accept_errors=["STORAGEBOX_NOT_FOUND"]) - if error: - module.fail_json( - msg="Storagebox with ID {0} does not exist".format(storagebox_id) - ) + url = "{0}/v1/storage_boxes/{1}/subaccounts".format(API_BASE_URL, storagebox_id) + result, dummy, error = api_fetch_url_json(module, url, accept_errors=['not_found']) + if error: + module.fail_json(msg='Storagebox with ID {0} does not exist'.format(storagebox_id)) - module.exit_json( - changed=False, - subaccounts=[item["subaccount"] for item in result], - ) - - else: - # NEW API! - - url = "{0}/v1/storage_boxes/{1}/subaccounts".format(API_BASE_URL, storagebox_id) - result, dummy, error = api_fetch_url_json(module, url, accept_errors=['not_found']) - if error: - module.fail_json(msg='Storagebox with ID {0} does not exist'.format(storagebox_id)) - - module.exit_json( - changed=False, - subaccounts=[adjust_legacy(item) for item in result['subaccounts']], - ) + module.exit_json( + changed=False, + subaccounts=[adjust_legacy(item) for item in result['subaccounts']], + ) if __name__ == "__main__": # pragma: no cover diff --git a/tests/sanity/ignore-2.15.txt b/tests/sanity/ignore-2.15.txt index d9ca91cd..275373d3 100644 --- a/tests/sanity/ignore-2.15.txt +++ b/tests/sanity/ignore-2.15.txt @@ -1,4 +1,3 @@ plugins/inventory/robot.py yamllint:unparsable-with-libyaml plugins/module_utils/robot.py pylint:ansible-bad-import-from -plugins/modules/storagebox_snapshot_plan.py pylint:unbalanced-dict-unpacking # only seems to be a problem with pylint included in ansible-core 2.15 tests/ee/roles/smoke/library/smoke_ipaddress.py shebang diff --git a/tests/unit/plugins/modules/test_storagebox.py b/tests/unit/plugins/modules/test_storagebox.py index 18e0693e..aa1007e4 100644 --- a/tests/unit/plugins/modules/test_storagebox.py +++ b/tests/unit/plugins/modules/test_storagebox.py @@ -14,63 +14,9 @@ from ansible_collections.community.internal_test_tools.tests.unit.compat.mock import call, MagicMock from ansible_collections.community.hrobot.plugins.module_utils.api import API_BASE_URL -from ansible_collections.community.hrobot.plugins.module_utils.robot import BASE_URL from ansible_collections.community.hrobot.plugins.modules import storagebox -STORAGEBOX_LEGACY_KEYS = ('name', 'webdav', 'samba', 'ssh', 'external_reachability', 'zfs') - -STORAGEBOX_LEGACY_DETAIL_DATA = { - 23: { - 'storagebox': { - 'id': 23, - 'login': 'u23', - 'name': 'Backup Server 2', - 'product': 'BX11', - 'cancelled': True, - 'locked': False, - 'location': 'HEL1', - 'linked_server': None, - 'paid_until': '2025-01-31', - 'disk_quota': 1234, - 'disk_usage': 123, - 'disk_usage_data': 50, - 'disk_usage_snapshots': 73, - 'webdav': False, - 'samba': False, - 'ssh': True, - 'external_reachability': True, - 'zfs': False, - 'server': 'u23.your-storagebox.de', - 'host_system': 'HEL1-FOOBAR' - }, - }, - 123456: { - 'storagebox': { - 'id': 123456, - 'login': 'u12345', - 'name': 'Backup Server 1', - 'product': 'BX60', - 'cancelled': False, - 'locked': False, - 'location': 'FSN1', - 'linked_server': 1234567, - 'paid_until': '2015-10-23', - 'disk_quota': 10240000, - 'disk_usage': 900, - 'disk_usage_data': 500, - 'disk_usage_snapshots': 400, - 'webdav': True, - 'samba': True, - 'ssh': True, - 'external_reachability': True, - 'zfs': False, - 'server': 'u12345.your-storagebox.de', - 'host_system': 'FSN1-BX355' - }, - }, -} - STORAGEBOX_KEYS = { 'name': ['name'], 'webdav': ['access_settings', 'webdav_enabled'], @@ -231,14 +177,6 @@ } -def legacy_update_info(id, **updates): - result = { - 'storagebox': dict(STORAGEBOX_LEGACY_DETAIL_DATA[id]['storagebox']), - } - result['storagebox'].update(updates) - return result - - def update_info(id, **updates): result = { 'storage_box': dict(STORAGEBOX_DETAIL_DATA[id]), @@ -263,363 +201,15 @@ class TestHetznerStorageboxLegacy(BaseTestModule): MOCK_ANSIBLE_MODULEUTILS_BASIC_ANSIBLEMODULE = 'ansible_collections.community.hrobot.plugins.modules.storagebox.AnsibleModule' MOCK_ANSIBLE_MODULEUTILS_URLS_FETCH_URL = 'ansible_collections.community.hrobot.plugins.module_utils.robot.fetch_url' - def test_idempotent(self, mocker): - updated = legacy_update_info(23) - result = self.run_module_success(mocker, storagebox, { - 'hetzner_user': 'test', - 'hetzner_password': 'hunter2', - 'id': 23, - 'name': 'Backup Server 2', - 'webdav': False, - 'samba': False, - 'ssh': True, - 'external_reachability': True, - }, [ - FetchUrlCall('GET', 200) - .expect_basic_auth('test', 'hunter2') - .expect_force_basic_auth(True) - .result_json(STORAGEBOX_LEGACY_DETAIL_DATA[23]) - .expect_url('{0}/storagebox/23'.format(BASE_URL)), - ]) - assert result['changed'] is False - for key in STORAGEBOX_LEGACY_KEYS: - assert result[key] == updated['storagebox'][key], "Unexpected difference for {0!r}".format(key) - def test_id_unknown(self, mocker): result = self.run_module_failed(mocker, storagebox, { 'hetzner_user': '', 'hetzner_password': '', 'id': 1, }, [ - FetchUrlCall('GET', 404) - .expect_basic_auth('', '') - .expect_force_basic_auth(True) - .result_json({ - 'error': { - 'status': 404, - 'code': 'STORAGEBOX_NOT_FOUND', - 'message': 'Storagebox not found', - }, - }) - .expect_url('{0}/storagebox/1'.format(BASE_URL)), ]) assert result['msg'] == 'Storagebox with ID 1 does not exist' - def test_invalid_input(self, mocker): - result = self.run_module_failed(mocker, storagebox, { - 'hetzner_user': '', - 'hetzner_password': '', - 'id': 23, - 'name': 'Backup', - }, [ - FetchUrlCall('GET', 200) - .expect_basic_auth('', '') - .expect_force_basic_auth(True) - .result_json(STORAGEBOX_LEGACY_DETAIL_DATA[23]) - .expect_url('{0}/storagebox/23'.format(BASE_URL)), - FetchUrlCall('POST', 400) - .expect_basic_auth('', '') - .expect_force_basic_auth(True) - .result_json({ - 'error': { - 'status': 400, - 'code': 'INVALID_INPUT', - 'message': 'Invalid input', - 'invalid': ['storagebox_name'], - 'missing': None, - }, - }) - .expect_form_value('storagebox_name', 'Backup') - .expect_form_value_absent('webdav') - .expect_form_value_absent('samba') - .expect_form_value_absent('ssh') - .expect_form_value_absent('external_reachability') - .expect_form_value_absent('zfs') - .expect_url('{0}/storagebox/23'.format(BASE_URL)), - ]) - assert result['msg'] == 'The values to update were invalid (storagebox_name)' - - def test_change_name(self, mocker): - updated = legacy_update_info(23, name='Backup') - result = self.run_module_success(mocker, storagebox, { - 'hetzner_user': 'test', - 'hetzner_password': 'hunter2', - 'id': 23, - 'name': 'Backup', - 'ssh': True, - }, [ - FetchUrlCall('GET', 200) - .expect_basic_auth('test', 'hunter2') - .expect_force_basic_auth(True) - .result_json(STORAGEBOX_LEGACY_DETAIL_DATA[23]) - .expect_url('{0}/storagebox/23'.format(BASE_URL)), - FetchUrlCall('POST', 200) - .result_json(updated) - .expect_form_value('storagebox_name', 'Backup') - .expect_form_value_absent('webdav') - .expect_form_value_absent('samba') - .expect_form_value_absent('ssh') - .expect_form_value_absent('external_reachability') - .expect_form_value_absent('zfs') - .expect_url('{0}/storagebox/23'.format(BASE_URL)), - ]) - assert result['changed'] is True - for key in STORAGEBOX_LEGACY_KEYS: - assert result[key] == updated['storagebox'][key], "Unexpected difference for {0!r}".format(key) - - def test_change_name_rate_limit_fail(self, mocker): - result = self.run_module_failed(mocker, storagebox, { - 'hetzner_user': 'test', - 'hetzner_password': 'hunter2', - 'id': 23, - 'name': 'Backup', - 'ssh': True, - 'rate_limit_retry_timeout': 0, - }, [ - FetchUrlCall('GET', 200) - .expect_basic_auth('test', 'hunter2') - .expect_force_basic_auth(True) - .result_json(STORAGEBOX_LEGACY_DETAIL_DATA[23]) - .expect_url('{0}/storagebox/23'.format(BASE_URL)), - FetchUrlCall('POST', 403) - .result_json({ - 'error': { - 'status': 403, - 'code': 'RATE_LIMIT_EXCEEDED', - 'message': 'Rate limit exceeded', - 'interval': 5, - 'max_request': 1, - }, - }) - .expect_form_value('storagebox_name', 'Backup') - .expect_form_value_absent('webdav') - .expect_form_value_absent('samba') - .expect_form_value_absent('ssh') - .expect_form_value_absent('external_reachability') - .expect_form_value_absent('zfs') - .expect_url('{0}/storagebox/23'.format(BASE_URL)), - ]) - assert result['msg'] == ( - 'Request failed: 403 RATE_LIMIT_EXCEEDED (Rate limit exceeded).' - ' Maximum allowed requests: 1. Time interval in seconds: 5' - ) - - def test_change_name_rate_limit(self, mocker): - sleep_mock = MagicMock() - mocker.patch('time.sleep', sleep_mock) - updated = legacy_update_info(23, name='Backup') - result = self.run_module_success(mocker, storagebox, { - 'hetzner_user': 'test', - 'hetzner_password': 'hunter2', - 'id': 23, - 'name': 'Backup', - 'ssh': True, - 'rate_limit_retry_timeout': -1, - }, [ - FetchUrlCall('GET', 200) - .expect_basic_auth('test', 'hunter2') - .expect_force_basic_auth(True) - .result_json(STORAGEBOX_LEGACY_DETAIL_DATA[23]) - .expect_url('{0}/storagebox/23'.format(BASE_URL)), - FetchUrlCall('POST', 403) - .result_json({ - 'error': { - 'status': 403, - 'code': 'RATE_LIMIT_EXCEEDED', - 'message': 'Rate limit exceeded', - 'interval': 5, - 'max_request': 1, - }, - }) - .expect_form_value('storagebox_name', 'Backup') - .expect_form_value_absent('webdav') - .expect_form_value_absent('samba') - .expect_form_value_absent('ssh') - .expect_form_value_absent('external_reachability') - .expect_form_value_absent('zfs') - .expect_url('{0}/storagebox/23'.format(BASE_URL)), - FetchUrlCall('POST', 403) - .result_json({ - 'error': { - 'status': 403, - 'code': 'RATE_LIMIT_EXCEEDED', - 'message': 'Rate limit exceeded', - 'interval': 3, - 'max_request': 1, - }, - }) - .expect_form_value('storagebox_name', 'Backup') - .expect_form_value_absent('webdav') - .expect_form_value_absent('samba') - .expect_form_value_absent('ssh') - .expect_form_value_absent('external_reachability') - .expect_form_value_absent('zfs') - .expect_url('{0}/storagebox/23'.format(BASE_URL)), - FetchUrlCall('POST', 403) - .result_json({ - 'error': { - 'status': 403, - 'code': 'RATE_LIMIT_EXCEEDED', - 'message': 'Rate limit exceeded', - 'interval': 4, - 'max_request': 1, - }, - }) - .expect_form_value('storagebox_name', 'Backup') - .expect_form_value_absent('webdav') - .expect_form_value_absent('samba') - .expect_form_value_absent('ssh') - .expect_form_value_absent('external_reachability') - .expect_form_value_absent('zfs') - .expect_url('{0}/storagebox/23'.format(BASE_URL)), - FetchUrlCall('POST', 403) - .result_json({ - 'error': { - 'status': 403, - 'code': 'RATE_LIMIT_EXCEEDED', - 'message': 'Rate limit exceeded', - }, - }) - .expect_form_value('storagebox_name', 'Backup') - .expect_form_value_absent('webdav') - .expect_form_value_absent('samba') - .expect_form_value_absent('ssh') - .expect_form_value_absent('external_reachability') - .expect_form_value_absent('zfs') - .expect_url('{0}/storagebox/23'.format(BASE_URL)), - FetchUrlCall('POST', 200) - .result_json(updated) - .expect_form_value('storagebox_name', 'Backup') - .expect_form_value_absent('webdav') - .expect_form_value_absent('samba') - .expect_form_value_absent('ssh') - .expect_form_value_absent('external_reachability') - .expect_form_value_absent('zfs') - .expect_url('{0}/storagebox/23'.format(BASE_URL)), - ]) - assert result['changed'] is True - for key in STORAGEBOX_LEGACY_KEYS: - assert result[key] == updated['storagebox'][key], "Unexpected difference for {0!r}".format(key) - sleep_mock.assert_has_calls([ - call(5), - call(3), - call(3), - call(3), - ]) - - def test_change_name_rate_limit_timeout(self, mocker): - elapsed = [123.4] - - def sleep(duration): - elapsed[0] += duration - print('sleep', duration, '->', elapsed[0]) - - def get_time(): - elapsed[0] += 0.03 - print('get', elapsed[0]) - return elapsed[0] - - mocker.patch('time.sleep', sleep) - mocker.patch('time.time', get_time) - result = self.run_module_failed(mocker, storagebox, { - 'hetzner_user': 'test', - 'hetzner_password': 'hunter2', - 'id': 23, - 'name': 'Backup', - 'ssh': True, - 'rate_limit_retry_timeout': 7, - }, [ - FetchUrlCall('GET', 200) - .expect_basic_auth('test', 'hunter2') - .expect_force_basic_auth(True) - .result_json(STORAGEBOX_LEGACY_DETAIL_DATA[23]) - .expect_url('{0}/storagebox/23'.format(BASE_URL)), - FetchUrlCall('POST', 403) - .result_json({ - 'error': { - 'status': 403, - 'code': 'RATE_LIMIT_EXCEEDED', - 'message': 'Rate limit exceeded', - 'interval': 5, - 'max_request': 1, - }, - }) - .expect_form_value('storagebox_name', 'Backup') - .expect_form_value_absent('webdav') - .expect_form_value_absent('samba') - .expect_form_value_absent('ssh') - .expect_form_value_absent('external_reachability') - .expect_form_value_absent('zfs') - .expect_url('{0}/storagebox/23'.format(BASE_URL)), - FetchUrlCall('POST', 403) - .result_json({ - 'error': { - 'status': 403, - 'code': 'RATE_LIMIT_EXCEEDED', - 'message': 'Rate limit exceeded', - 'interval': 5, - 'max_request': 1, - }, - }) - .expect_form_value('storagebox_name', 'Backup') - .expect_form_value_absent('webdav') - .expect_form_value_absent('samba') - .expect_form_value_absent('ssh') - .expect_form_value_absent('external_reachability') - .expect_form_value_absent('zfs') - .expect_url('{0}/storagebox/23'.format(BASE_URL)), - FetchUrlCall('POST', 403) - .result_json({ - 'error': { - 'status': 403, - 'code': 'RATE_LIMIT_EXCEEDED', - 'message': 'Rate limit exceeded', - 'interval': 5, - 'max_request': 1, - }, - }) - .expect_form_value('storagebox_name', 'Backup') - .expect_form_value_absent('webdav') - .expect_form_value_absent('samba') - .expect_form_value_absent('ssh') - .expect_form_value_absent('external_reachability') - .expect_form_value_absent('zfs') - .expect_url('{0}/storagebox/23'.format(BASE_URL)), - ]) - assert result['msg'] == ( - 'Request failed: 403 RATE_LIMIT_EXCEEDED (Rate limit exceeded).' - ' Maximum allowed requests: 1. Time interval in seconds: 5.' - ' Waited a total of 5.1 seconds for rate limit errors to go away' - ) - - def test_change_ssh(self, mocker): - updated = legacy_update_info(23, ssh=False) - result = self.run_module_success(mocker, storagebox, { - 'hetzner_user': 'test', - 'hetzner_password': 'hunter2', - 'id': 23, - 'name': 'Backup Server 2', - 'ssh': False, - }, [ - FetchUrlCall('GET', 200) - .expect_basic_auth('test', 'hunter2') - .expect_force_basic_auth(True) - .result_json(STORAGEBOX_LEGACY_DETAIL_DATA[23]) - .expect_url('{0}/storagebox/23'.format(BASE_URL)), - FetchUrlCall('POST', 200) - .result_json(updated) - .expect_form_value_absent('storagebox_name') - .expect_form_value_absent('webdav') - .expect_form_value_absent('samba') - .expect_form_value('ssh', 'false') - .expect_form_value_absent('external_reachability') - .expect_form_value_absent('zfs') - .expect_url('{0}/storagebox/23'.format(BASE_URL)), - ]) - assert result['changed'] is True - for key in STORAGEBOX_LEGACY_KEYS: - assert result[key] == updated['storagebox'][key], "Unexpected difference for {0!r}".format(key) - class TestHetznerStoragebox(BaseTestModule): MOCK_ANSIBLE_MODULEUTILS_BASIC_ANSIBLEMODULE = 'ansible_collections.community.hrobot.plugins.modules.storagebox.AnsibleModule' diff --git a/tests/unit/plugins/modules/test_storagebox_info.py b/tests/unit/plugins/modules/test_storagebox_info.py index 0b308e90..e75f0e31 100644 --- a/tests/unit/plugins/modules/test_storagebox_info.py +++ b/tests/unit/plugins/modules/test_storagebox_info.py @@ -14,91 +14,9 @@ from ansible_collections.community.internal_test_tools.tests.unit.compat.mock import call, MagicMock from ansible_collections.community.hrobot.plugins.module_utils.api import API_BASE_URL -from ansible_collections.community.hrobot.plugins.module_utils.robot import BASE_URL from ansible_collections.community.hrobot.plugins.modules import storagebox_info -STORAGEBOX_MINIMUM_DATA = [ - { - 'storagebox': { - 'id': 123456, - 'login': 'u12345', - 'name': 'Backup Server 1', - 'product': 'BX60', - 'cancelled': False, - 'locked': False, - 'location': 'FSN1', - 'linked_server': 1234567, - 'paid_until': '2015-10-23', - }, - }, - { - 'storagebox': { - 'id': 23, - 'login': 'u23', - 'name': 'Backup Server 2', - 'product': 'BX11', - 'cancelled': True, - 'locked': False, - 'location': 'HEL1', - 'linked_server': None, - 'paid_until': '2025-01-31', - }, - }, -] - - -STORAGEBOX_DETAIL_DATA = { - 23: { - 'storagebox': { - 'id': 23, - 'login': 'u23', - 'name': 'Backup Server 2', - 'product': 'BX11', - 'cancelled': True, - 'locked': False, - 'location': 'HEL1', - 'linked_server': None, - 'paid_until': '2025-01-31', - 'disk_quota': 1234, - 'disk_usage': 123, - 'disk_usage_data': 50, - 'disk_usage_snapshots': 73, - 'webdav': False, - 'samba': False, - 'ssh': True, - 'external_reachability': True, - 'zfs': False, - 'server': 'u23.your-storagebox.de', - 'host_system': 'HEL1-FOOBAR' - }, - }, - 123456: { - 'storagebox': { - 'id': 123456, - 'login': 'u12345', - 'name': 'Backup Server 1', - 'product': 'BX60', - 'cancelled': False, - 'locked': False, - 'location': 'FSN1', - 'linked_server': 1234567, - 'paid_until': '2015-10-23', - 'disk_quota': 10240000, - 'disk_usage': 900, - 'disk_usage_data': 500, - 'disk_usage_snapshots': 400, - 'webdav': True, - 'samba': True, - 'ssh': True, - 'external_reachability': True, - 'zfs': False, - 'server': 'u12345.your-storagebox.de', - 'host_system': 'FSN1-BX355' - }, - }, -} - STORAGEBOX_API_DATA = { 23: { 'id': 23, @@ -254,138 +172,12 @@ class TestHetznerStorageboxInfoLegacy(BaseTestModule): MOCK_ANSIBLE_MODULEUTILS_BASIC_ANSIBLEMODULE = 'ansible_collections.community.hrobot.plugins.modules.storagebox_info.AnsibleModule' MOCK_ANSIBLE_MODULEUTILS_URLS_FETCH_URL = 'ansible_collections.community.hrobot.plugins.module_utils.robot.fetch_url' - def test_storagebox_id(self, mocker): - result = self.run_module_success(mocker, storagebox_info, { - 'hetzner_user': 'test', - 'hetzner_password': 'hunter2', - 'storagebox_id': 23, - }, [ - FetchUrlCall('GET', 200) - .expect_basic_auth('test', 'hunter2') - .expect_force_basic_auth(True) - .result_json(STORAGEBOX_DETAIL_DATA[23]) - .expect_url('{0}/storagebox/23'.format(BASE_URL)), - ]) - assert result['changed'] is False - assert len(result['storageboxes']) == 1 - assert result['storageboxes'][0] == STORAGEBOX_DETAIL_DATA[23]['storagebox'] - def test_server_number_unknown(self, mocker): result = self.run_module_success(mocker, storagebox_info, { 'hetzner_user': '', 'hetzner_password': '', 'storagebox_id': 1, }, [ - FetchUrlCall('GET', 404) - .result_json({ - 'error': { - 'status': 404, - 'code': 'STORAGEBOX_NOT_FOUND', - 'message': 'server not found', - }, - }) - .expect_url('{0}/storagebox/1'.format(BASE_URL)), - ]) - assert result['changed'] is False - assert len(result['storageboxes']) == 0 - - def test_all(self, mocker): - result = self.run_module_success(mocker, storagebox_info, { - 'hetzner_user': '', - 'hetzner_password': '', - }, [ - FetchUrlCall('GET', 200) - .result_json(STORAGEBOX_MINIMUM_DATA) - .expect_url('{0}/storagebox'.format(BASE_URL)), - ]) - assert result['changed'] is False - assert len(result['storageboxes']) == 2 - assert result['storageboxes'][0] == STORAGEBOX_MINIMUM_DATA[0]['storagebox'] - assert result['storageboxes'][1] == STORAGEBOX_MINIMUM_DATA[1]['storagebox'] - - def test_linked_server_number(self, mocker): - result = self.run_module_success(mocker, storagebox_info, { - 'hetzner_user': '', - 'hetzner_password': '', - 'linked_server_number': 1234567, - }, [ - FetchUrlCall('GET', 200) - .result_json(STORAGEBOX_MINIMUM_DATA[0]) - .expect_form_value('linked_server', '1234567') - .expect_url('{0}/storagebox'.format(BASE_URL)), - ]) - assert result['changed'] is False - assert len(result['storageboxes']) == 1 - assert result['storageboxes'][0] == STORAGEBOX_MINIMUM_DATA[0]['storagebox'] - - def test_linked_server_number_unknown(self, mocker): - result = self.run_module_success(mocker, storagebox_info, { - 'hetzner_user': '', - 'hetzner_password': '', - 'linked_server_number': 1, - }, [ - FetchUrlCall('GET', 404) - .result_json({ - 'error': { - 'status': 404, - 'code': 'STORAGEBOX_NOT_FOUND', - 'message': 'server not found', - }, - }) - .expect_form_value('linked_server', '1') - .expect_url('{0}/storagebox'.format(BASE_URL)), - ]) - assert result['changed'] is False - assert len(result['storageboxes']) == 0 - - def test_all_full_info(self, mocker): - result = self.run_module_success(mocker, storagebox_info, { - 'hetzner_user': '', - 'hetzner_password': '', - 'full_info': True, - }, [ - FetchUrlCall('GET', 200) - .result_json(STORAGEBOX_MINIMUM_DATA) - .expect_url('{0}/storagebox'.format(BASE_URL)), - FetchUrlCall('GET', 200) - .result_json(STORAGEBOX_DETAIL_DATA[123456]) - .expect_url('{0}/storagebox/123456'.format(BASE_URL)), - FetchUrlCall('GET', 200) - .result_json(STORAGEBOX_DETAIL_DATA[23]) - .expect_url('{0}/storagebox/23'.format(BASE_URL)), - ]) - assert result['changed'] is False - assert len(result['storageboxes']) == 2 - assert result['storageboxes'][0] == STORAGEBOX_DETAIL_DATA[123456]['storagebox'] - assert result['storageboxes'][1] == STORAGEBOX_DETAIL_DATA[23]['storagebox'] - - def test_all_none(self, mocker): - result = self.run_module_success(mocker, storagebox_info, { - 'hetzner_user': '', - 'hetzner_password': '', - }, [ - FetchUrlCall('GET', 200) - .result_json([]) - .expect_url('{0}/storagebox'.format(BASE_URL)), - ]) - assert result['changed'] is False - assert len(result['storageboxes']) == 0 - - def test_all_none_error(self, mocker): - # According to the API docs, when no storagebox is found this API can return 404. - result = self.run_module_success(mocker, storagebox_info, { - 'hetzner_user': '', - 'hetzner_password': '', - }, [ - FetchUrlCall('GET', 404) - .result_json({ - 'error': { - 'status': 404, - 'code': 'STORAGEBOX_NOT_FOUND', - 'message': 'server not found', - }, - }) - .expect_url('{0}/storagebox'.format(BASE_URL)), ]) assert result['changed'] is False assert len(result['storageboxes']) == 0 diff --git a/tests/unit/plugins/modules/test_storagebox_set_password.py b/tests/unit/plugins/modules/test_storagebox_set_password.py index 27871cf8..069bc346 100644 --- a/tests/unit/plugins/modules/test_storagebox_set_password.py +++ b/tests/unit/plugins/modules/test_storagebox_set_password.py @@ -14,7 +14,6 @@ from ansible_collections.community.internal_test_tools.tests.unit.compat.mock import MagicMock from ansible_collections.community.hrobot.plugins.module_utils.api import API_BASE_URL -from ansible_collections.community.hrobot.plugins.module_utils.robot import BASE_URL from ansible_collections.community.hrobot.plugins.modules import storagebox_set_password RANDOM_PASSWORD = 'randompassword' @@ -25,78 +24,14 @@ class TestStorageboxSetPasswordLegacy(BaseTestModule): MOCK_ANSIBLE_MODULEUTILS_BASIC_ANSIBLEMODULE = 'ansible_collections.community.hrobot.plugins.modules.storagebox_set_password.AnsibleModule' MOCK_ANSIBLE_MODULEUTILS_URLS_FETCH_URL = 'ansible_collections.community.hrobot.plugins.module_utils.robot.fetch_url' - def test_specific_password(self, mocker): - result = self.run_module_success(mocker, storagebox_set_password, { - 'hetzner_user': 'test', - 'hetzner_password': 'hunter2', - 'id': 123, - 'password': 'newpassword' - }, [ - FetchUrlCall("POST", 200) - .expect_basic_auth('test', 'hunter2') - .expect_force_basic_auth(True) - .expect_form_value('password', 'newpassword') - .result_json({'password': 'newpassword'}) - .expect_url(BASE_URL + '/storagebox/123/password'), - ]) - assert result['changed'] is True - assert result['password'] == 'newpassword' - - def test_random_password(self, mocker): - result = self.run_module_success(mocker, storagebox_set_password, { - 'hetzner_user': 'test', - 'hetzner_password': 'hunter2', - 'id': 123, - }, [ - FetchUrlCall("POST", 200) - .expect_basic_auth('test', 'hunter2') - .expect_force_basic_auth(True) - .expect_form_value_absent('password') - .result_json({'password': RANDOM_PASSWORD}) - .expect_url(BASE_URL + '/storagebox/123/password'), - ]) - assert result['changed'] is True - assert result['password'] == RANDOM_PASSWORD - def test_id_unknown(self, mocker): result = self.run_module_failed(mocker, storagebox_set_password, { 'hetzner_user': 'test', 'hetzner_password': 'hunter2', 'id': 456, - }, [FetchUrlCall("POST", 200) - .expect_basic_auth('test', 'hunter2') - .expect_force_basic_auth(True) - .result_json({ - 'error': { - 'status': 404, - 'code': 'STORAGEBOX_NOT_FOUND', - 'message': 'Storage Box with ID 456 not found', - }, - }) - .expect_url(BASE_URL + '/storagebox/456/password'), - ]) + }, []) assert result['msg'] == 'Storage Box with ID 456 not found' - def test_password_invalid(self, mocker): - result = self.run_module_failed(mocker, storagebox_set_password, { - 'hetzner_user': 'test', - 'hetzner_password': 'hunter2', - 'id': 123, - 'password': 'invalidpassword', - }, [ - FetchUrlCall("POST", 409) - .expect_basic_auth('test', 'hunter2') - .expect_force_basic_auth(True) - .result_json({ - 'error': { - 'status': 409, - 'code': 'STORAGEBOX_INVALID_PASSWORD', - 'message': "The chosen password has been considered insecure or does not comply with Hetzner's password guideline", - }}) - .expect_url(BASE_URL + '/storagebox/123/password') - ]) - assert result['msg'] == "The chosen password has been considered insecure or does not comply with Hetzner's password guideline" - class TestStorageboxSetPassword(BaseTestModule): diff --git a/tests/unit/plugins/modules/test_storagebox_snapshot.py b/tests/unit/plugins/modules/test_storagebox_snapshot.py index 1f3aefd5..008485d6 100644 --- a/tests/unit/plugins/modules/test_storagebox_snapshot.py +++ b/tests/unit/plugins/modules/test_storagebox_snapshot.py @@ -14,41 +14,8 @@ from ansible_collections.community.internal_test_tools.tests.unit.compat.mock import MagicMock from ansible_collections.community.hrobot.plugins.module_utils.api import API_BASE_URL -from ansible_collections.community.hrobot.plugins.module_utils.robot import BASE_URL from ansible_collections.community.hrobot.plugins.modules import storagebox_snapshot -LEGACY_CREATED_SNAPSHOT = { - 'snapshot': { - 'name': '2025-03-28T15-20-51', - 'timestamp': '2025-03-28T16:20:51+01:00', - 'size': 0 - } -} - -LEGACY_EXISTING_SNAPSHOTS = [ - { - 'snapshot': { - 'name': '2015-12-21T12-40-38', - 'timestamp': '2015-12-21T13:40:38+00:00', - 'size': 400, - 'filesystem_size': 12345, - 'automatic': False, - 'comment': 'Test-Snapshot 1' - } - }, - { - 'snapshot': { - 'name': '2025-03-28T15-20-51', - 'timestamp': '2025-03-28T15:19:30+00:00', - 'size': 10000, - 'filesystem_size': 22345, - 'automatic': False, - 'comment': 'Test-Snapshot 2' - } - } -] - - STORAGEBOX_SNAPSHOTS = { 1: { "id": 1, @@ -83,98 +50,6 @@ class TestHetznerStorageboxSnapshotPlanInfoLegacy(BaseTestModule): MOCK_ANSIBLE_MODULEUTILS_BASIC_ANSIBLEMODULE = 'ansible_collections.community.hrobot.plugins.modules.storagebox_snapshot.AnsibleModule' MOCK_ANSIBLE_MODULEUTILS_URLS_FETCH_URL = 'ansible_collections.community.hrobot.plugins.module_utils.robot.fetch_url' - def test_create_snapshot(self, mocker): - result = self.run_module_success(mocker, storagebox_snapshot, { - 'hetzner_user': 'test', - 'hetzner_password': 'hunter2', - 'storagebox_id': 23}, [ - FetchUrlCall('POST', 200) - .expect_basic_auth('test', 'hunter2') - .expect_force_basic_auth(True) - .result_json(LEGACY_CREATED_SNAPSHOT) - .expect_url(BASE_URL + '/storagebox/23/snapshot') - ]) - assert result['changed'] is True - assert result['snapshot'] == LEGACY_CREATED_SNAPSHOT['snapshot'] - - def test_create_snapshot_check_mode(self, mocker): - result = self.run_module_success(mocker, storagebox_snapshot, { - 'hetzner_user': 'test', - 'hetzner_password': 'hunter2', - 'storagebox_id': 23, - '_ansible_check_mode': True}, [ - ]) - assert result['changed'] is True - - def test_create_snapshot_with_comment(self, mocker): - result = self.run_module_success(mocker, storagebox_snapshot, { - 'hetzner_user': 'test', - 'hetzner_password': 'hunter2', - 'storagebox_id': 23, - 'snapshot_comment': 'On Creation Comment'}, [ - FetchUrlCall('POST', 200) - .expect_basic_auth('test', 'hunter2') - .expect_force_basic_auth(True) - .result_json(LEGACY_CREATED_SNAPSHOT) - .expect_url(BASE_URL + '/storagebox/23/snapshot'), - FetchUrlCall("POST", 200) - .expect_url('{0}/storagebox/23/snapshot/{1}/comment'.format(BASE_URL, LEGACY_CREATED_SNAPSHOT['snapshot']['name'])) - .result_json({ - 'snapshot': { - 'name': '2025-03-28T15-20-51', - 'timestamp': '2025-03-28T16:20:51+01:00', - 'size': 0, - 'comment': 'On Creation Comment' - }}) - ]) - assert result['changed'] is True - assert result['snapshot']['comment'] == 'On Creation Comment' - - def test_comment_snapshot(self, mocker): - self.run_module_success(mocker, storagebox_snapshot, { - 'hetzner_user': 'test', - 'hetzner_password': 'hunter2', - 'storagebox_id': 23, - 'snapshot_name': '2025-03-28T15-20-51', - 'snapshot_comment': 'Changing Comment', - }, [ - FetchUrlCall('GET', 200) - .result_json(LEGACY_EXISTING_SNAPSHOTS) - .expect_url(BASE_URL + '/storagebox/23/snapshot'), - FetchUrlCall('POST', 200) - .expect_basic_auth('test', 'hunter2') - .expect_force_basic_auth(True) - .expect_url('{0}/storagebox/23/snapshot/{1}/comment'.format(BASE_URL, '2025-03-28T15-20-51')) - ]) - - def test_same_comment_snapshot(self, mocker): - self.run_module_success(mocker, storagebox_snapshot, { - 'hetzner_user': 'test', - 'hetzner_password': 'hunter2', - 'storagebox_id': 23, - 'snapshot_name': '2025-03-28T15-20-51', - 'snapshot_comment': 'Test-Snapshot 2', - }, [ - FetchUrlCall('GET', 200) - .result_json(LEGACY_EXISTING_SNAPSHOTS) - .expect_url(BASE_URL + '/storagebox/23/snapshot') - ]) - - def test_comment_snapshot_check_mode(self, mocker): - result = self.run_module_success(mocker, storagebox_snapshot, { - 'hetzner_user': 'test', - 'hetzner_password': 'hunter2', - 'storagebox_id': 23, - 'snapshot_name': '2025-03-28T15-20-51', - 'snapshot_comment': 'Changing Comment', - '_ansible_check_mode': True - }, [ - FetchUrlCall('GET', 200) - .result_json(LEGACY_EXISTING_SNAPSHOTS) - .expect_url(BASE_URL + '/storagebox/23/snapshot'), - ]) - assert result['changed'] is True - def test_comment_snapshot_nonexistent_storagebox(self, mocker): self.run_module_failed(mocker, storagebox_snapshot, { 'hetzner_user': 'test', @@ -183,153 +58,8 @@ def test_comment_snapshot_nonexistent_storagebox(self, mocker): 'snapshot_name': '2025-03-28T15-20-51', 'snapshot_comment': 'Changing Comment', }, [ - FetchUrlCall('GET', 404) - .result_json({ - 'error': { - 'status': 404, - 'code': 'STORAGEBOX_NOT_FOUND', - 'message': 'Storagebox with ID 54 does not exist', - } - }) - .expect_url(BASE_URL + '/storagebox/54/snapshot') ]) - def test_delete_snapshot(self, mocker): - self.run_module_success(mocker, storagebox_snapshot, { - 'hetzner_user': 'test', - 'hetzner_password': 'hunter2', - 'storagebox_id': 23, - 'snapshot_name': '2025-03-28T15-20-51', - 'state': 'absent' - }, [ - FetchUrlCall('GET', 200) - .result_json(LEGACY_EXISTING_SNAPSHOTS) - .expect_url(BASE_URL + '/storagebox/23/snapshot'), - FetchUrlCall('DELETE', 200) - .expect_basic_auth('test', 'hunter2') - .expect_force_basic_auth(True) - .expect_url('{0}/storagebox/23/snapshot/{1}'.format(BASE_URL, '2025-03-28T15-20-51')) - ]) - - def test_delete_snapshot_check_mode(self, mocker): - result = self.run_module_success(mocker, storagebox_snapshot, { - 'hetzner_user': 'test', - 'hetzner_password': 'hunter2', - 'storagebox_id': 23, - 'snapshot_name': '2025-03-28T15-20-51', - 'state': 'absent', - '_ansible_check_mode': True - }, [ - FetchUrlCall('GET', 200) - .result_json(LEGACY_EXISTING_SNAPSHOTS) - .expect_url(BASE_URL + '/storagebox/23/snapshot') - ]) - assert result['changed'] is True - - def test_create_limit_exceeded(self, mocker): - resutl = self.run_module_failed(mocker, storagebox_snapshot, { - 'hetzner_user': 'test', - 'hetzner_password': 'hunter2', - 'storagebox_id': 23, - }, [ - FetchUrlCall('POST', 409) - .expect_basic_auth('test', 'hunter2') - .expect_force_basic_auth(True) - .expect_url('{0}/storagebox/23/snapshot'.format(BASE_URL)) - ]) - resutl['msg'] == 'Snapshot limit exceeded' - - def test_create_with_state_present(self, mocker): - result = self.run_module_success(mocker, storagebox_snapshot, { - 'hetzner_user': 'test', - 'hetzner_password': 'hunter2', - 'storagebox_id': 23, - 'state': 'present'}, [ - FetchUrlCall('POST', 200) - .expect_basic_auth('test', 'hunter2') - .expect_force_basic_auth(True) - .result_json(LEGACY_CREATED_SNAPSHOT) - .expect_url(BASE_URL + '/storagebox/23/snapshot') - ]) - assert result['changed'] is True - assert result['snapshot'] == LEGACY_CREATED_SNAPSHOT['snapshot'] - - def test_delete_nonexistent_snapshot(self, mocker): - result = self.run_module_success(mocker, storagebox_snapshot, { - 'hetzner_user': 'test', - 'hetzner_password': 'hunter2', - 'storagebox_id': 23, - 'snapshot_name': 'does-not-exist', - 'state': 'absent' - }, [ - FetchUrlCall("GET", 200) - .expect_url(BASE_URL + '/storagebox/23/snapshot') - .result_json(LEGACY_EXISTING_SNAPSHOTS) - ]) - assert result['changed'] is False - - def test_delete_snapshot_nonexistent_storagebox(self, mocker): - result = self.run_module_failed(mocker, storagebox_snapshot, { - 'hetzner_user': 'test', - 'hetzner_password': 'hunter2', - 'storagebox_id': 54, - 'snapshot_name': '2025-03-28T15-20-51', - 'state': 'absent' - }, [ - FetchUrlCall("GET", 404) - .result_json({ - 'error': { - 'status': 404, - 'code': 'STORAGEBOX_NOT_FOUND', - 'message': 'Storagebox with ID 54 does not exist', - } - }) - .expect_url(BASE_URL + '/storagebox/54/snapshot') - ]) - assert result['msg'] == 'Storagebox with ID 54 does not exist' - - def test_storagebox_id_unknown(self, mocker): - result = self.run_module_failed(mocker, storagebox_snapshot, { - 'hetzner_user': '', - 'hetzner_password': '', - 'storagebox_id': 54 - }, [ - FetchUrlCall('POST', 404) - .result_json({ - 'error': { - 'status': 404, - 'code': 'STORAGEBOX_NOT_FOUND', - 'message': 'Storagebox with ID 54 does not exist', - } - }) - .expect_url(BASE_URL + '/storagebox/54/snapshot') - ]) - assert result['msg'] == 'Storagebox with ID 54 does not exist' - - def test_snapshot_name_unknown(self, mocker): - result = self.run_module_failed(mocker, storagebox_snapshot, { - 'hetzner_user': '', - 'hetzner_password': '', - 'storagebox_id': 23, - 'snapshot_name': '2038-01-19T03:14:17', - 'snapshot_comment': 'Test comment' - }, [ - FetchUrlCall('GET', 200) - .result_json(LEGACY_EXISTING_SNAPSHOTS) - .expect_url(BASE_URL + '/storagebox/23/snapshot') - ]) - assert result['msg'] == 'Snapshot with name 2038-01-19T03:14:17 does not exist' - - def test_snapshot_name_with_state_present(self, mocker): - result = self.run_module_failed(mocker, storagebox_snapshot, { - 'hetzner_user': '', - 'hetzner_password': '', - 'storagebox_id': 23, - 'snapshot_name': '2038-01-19T03:14:17', - 'state': 'present' - }, []) - assert result['msg'] == "snapshot_comment is required when updating a snapshot" - class TestHetznerStorageboxSnapshotPlanInfo(BaseTestModule): MOCK_ANSIBLE_MODULEUTILS_BASIC_ANSIBLEMODULE = 'ansible_collections.community.hrobot.plugins.modules.storagebox_snapshot.AnsibleModule' diff --git a/tests/unit/plugins/modules/test_storagebox_snapshot_info.py b/tests/unit/plugins/modules/test_storagebox_snapshot_info.py index d2fab094..0606af52 100644 --- a/tests/unit/plugins/modules/test_storagebox_snapshot_info.py +++ b/tests/unit/plugins/modules/test_storagebox_snapshot_info.py @@ -12,34 +12,9 @@ ) from ansible_collections.community.hrobot.plugins.module_utils.api import API_BASE_URL -from ansible_collections.community.hrobot.plugins.module_utils.robot import BASE_URL from ansible_collections.community.hrobot.plugins.modules import storagebox_snapshot_info -LEGACY_STORAGEBOX_SNAPSHOTS = [ - { - "snapshot": { - "name": "2015-12-21T12-40-38", - "timestamp": "2015-12-21T13:40:38+00:00", - "size": 400, - "filesystem_size": 12345, - "automatic": False, - "comment": "Test-Snapshot 1" - } - }, - { - "snapshot": { - "name": "2025-01-24T12-00-00", - "timestamp": "2025-01-24T12:00:00+00:00", - "size": 10000, - "filesystem_size": 22345, - "automatic": False, - "comment": "Test-Snapshot 2" - } - } -] - - STORAGEBOX_SNAPSHOTS = { "snapshots": [ { @@ -84,36 +59,12 @@ class TestHetznerStorageboxSnapshotPlanInfoLegacy(BaseTestModule): MOCK_ANSIBLE_MODULEUTILS_BASIC_ANSIBLEMODULE = 'ansible_collections.community.hrobot.plugins.modules.storagebox_snapshot_info.AnsibleModule' MOCK_ANSIBLE_MODULEUTILS_URLS_FETCH_URL = 'ansible_collections.community.hrobot.plugins.module_utils.robot.fetch_url' - def test_snapshot(self, mocker): - result = self.run_module_success(mocker, storagebox_snapshot_info, { - 'hetzner_user': 'test', - 'hetzner_password': 'hunter2', - 'storagebox_id': 23}, [ - FetchUrlCall('GET', 200) - .expect_basic_auth('test', 'hunter2') - .expect_force_basic_auth(True) - .result_json(LEGACY_STORAGEBOX_SNAPSHOTS) - .expect_url(BASE_URL + '/storagebox/23/snapshot') - ]) - assert result['changed'] is False - assert len(result['snapshots']) == 2 - assert result['snapshots'][0] == LEGACY_STORAGEBOX_SNAPSHOTS[0]['snapshot'] - def test_storagebox_id_unknown(self, mocker): result = self.run_module_failed(mocker, storagebox_snapshot_info, { 'hetzner_user': '', 'hetzner_password': '', 'storagebox_id': 23 }, [ - FetchUrlCall('GET', 404) - .result_json({ - 'error': { - 'status': 404, - 'code': 'STORAGEBOX_NOT_FOUND', - 'message': 'Storage Box with ID 23 does not exist', - } - }) - .expect_url(BASE_URL + '/storagebox/23/snapshot') ]) assert result['msg'] == 'Storagebox with ID 23 does not exist' diff --git a/tests/unit/plugins/modules/test_storagebox_snapshot_plan.py b/tests/unit/plugins/modules/test_storagebox_snapshot_plan.py index 999f0a89..1ff495df 100644 --- a/tests/unit/plugins/modules/test_storagebox_snapshot_plan.py +++ b/tests/unit/plugins/modules/test_storagebox_snapshot_plan.py @@ -14,38 +14,9 @@ from ansible_collections.community.internal_test_tools.tests.unit.compat.mock import MagicMock from ansible_collections.community.hrobot.plugins.module_utils.api import API_BASE_URL -from ansible_collections.community.hrobot.plugins.module_utils.robot import BASE_URL from ansible_collections.community.hrobot.plugins.modules import storagebox_snapshot_plan -LEGACY_STORAGEBOX_PLAN_ENABLED = [ - { - 'snapshotplan': { - 'status': 'enabled', - 'minute': 5, - 'hour': 12, - 'day_of_week': 2, - 'day_of_month': None, - 'month': None, - 'max_snapshots': 2, - }, - }, -] - -LEGACY_STORAGEBOX_PLAN_DISABLED = [ - { - 'snapshotplan': { - 'status': 'disabled', - 'minute': None, - 'hour': None, - 'day_of_week': None, - 'day_of_month': None, - 'month': None, - 'max_snapshots': None, - }, - }, -] - STORAGEBOX_PLAN_ENABLED = { "storage_box": { 'id': 23, @@ -197,223 +168,29 @@ } -def legacy_update_plan(plan, **values): - def update(p): - p = dict(p) - p.update(values) - return p - - return [update(p) for p in plan] - - class TestHetznerStorageboxSnapshotPlanLegacy(BaseTestModule): MOCK_ANSIBLE_MODULEUTILS_BASIC_ANSIBLEMODULE = 'ansible_collections.community.hrobot.plugins.modules.storagebox_snapshot_plan.AnsibleModule' MOCK_ANSIBLE_MODULEUTILS_URLS_FETCH_URL = 'ansible_collections.community.hrobot.plugins.module_utils.robot.fetch_url' - def test_idempotent(self, mocker): - result = self.run_module_success(mocker, storagebox_snapshot_plan, { - 'hetzner_user': 'test', - 'hetzner_password': 'hunter2', - 'storagebox_id': 23, - 'plans': [ - LEGACY_STORAGEBOX_PLAN_ENABLED[0]['snapshotplan'], - ], - }, [ - FetchUrlCall('GET', 200) - .expect_basic_auth('test', 'hunter2') - .expect_force_basic_auth(True) - .result_json(LEGACY_STORAGEBOX_PLAN_ENABLED) - .expect_url('{0}/storagebox/23/snapshotplan'.format(BASE_URL)), - ]) - assert result['changed'] is False - assert len(result['plans']) == 1 - assert result['plans'][0] == LEGACY_STORAGEBOX_PLAN_ENABLED[0]['snapshotplan'] - def test_id_unknown(self, mocker): result = self.run_module_failed(mocker, storagebox_snapshot_plan, { 'hetzner_user': '', 'hetzner_password': '', 'storagebox_id': 1, - 'plans': [ - LEGACY_STORAGEBOX_PLAN_ENABLED[0]['snapshotplan'], - ], - }, [ - FetchUrlCall('GET', 404) - .expect_basic_auth('', '') - .expect_force_basic_auth(True) - .result_json({ - 'error': { - 'status': 404, - 'code': 'STORAGEBOX_NOT_FOUND', - 'message': 'Storagebox not found', - }, - }) - .expect_url('{0}/storagebox/1/snapshotplan'.format(BASE_URL)), - ]) - assert result['msg'] == 'Storagebox with ID 1 does not exist' - - def test_wrong_number_of_plans(self, mocker): - result = self.run_module_failed(mocker, storagebox_snapshot_plan, { - 'hetzner_user': '', - 'hetzner_password': '', - 'storagebox_id': 23, - 'plans': [], - }, [ - ]) - assert result['msg'] == '`plans` must have exactly one element' - - def test_invalid_input(self, mocker): - result = self.run_module_failed(mocker, storagebox_snapshot_plan, { - 'hetzner_user': '', - 'hetzner_password': '', - 'storagebox_id': 23, - 'plans': [ - { - 'status': 'enabled', - 'hour': 25, - 'minute': 0, - 'max_snapshots': -1, - }, - ], - }, [ - FetchUrlCall('GET', 200) - .expect_basic_auth('', '') - .expect_force_basic_auth(True) - # The actual API does not return a list, but its only entry directly - .result_json(LEGACY_STORAGEBOX_PLAN_ENABLED[0]) - .expect_url('{0}/storagebox/23/snapshotplan'.format(BASE_URL)), - FetchUrlCall('POST', 400) - .expect_basic_auth('', '') - .expect_force_basic_auth(True) - .result_json({ - 'error': { - 'status': 400, - 'code': 'INVALID_INPUT', - 'message': 'Invalid input', - 'invalid': ['hour', 'max_snapshots'], - 'missing': None, - }, - }) - .expect_form_value('status', 'enabled') - .expect_form_value('hour', '25') - .expect_form_value('minute', '0') - .expect_form_value_absent('day_of_week') - .expect_form_value_absent('day_of_month') - .expect_form_value_absent('month') - .expect_form_value('max_snapshots', '-1') - .expect_url('{0}/storagebox/23/snapshotplan'.format(BASE_URL)), - ]) - assert result['msg'] == 'The values to update were invalid (hour, max_snapshots)' - - def test_disable(self, mocker): - result = self.run_module_success(mocker, storagebox_snapshot_plan, { - 'hetzner_user': '', - 'hetzner_password': '', - 'storagebox_id': 23, - 'plans': [ - { - 'status': 'disabled', - }, - ], - }, [ - FetchUrlCall('GET', 200) - .expect_basic_auth('', '') - .expect_force_basic_auth(True) - .result_json(LEGACY_STORAGEBOX_PLAN_ENABLED) - .expect_url('{0}/storagebox/23/snapshotplan'.format(BASE_URL)), - FetchUrlCall('POST', 400) - .expect_basic_auth('', '') - .expect_force_basic_auth(True) - .result_json(LEGACY_STORAGEBOX_PLAN_DISABLED) - .expect_form_value('status', 'disabled') - .expect_form_value('hour', '0') # should be absent, but API does not permit that - .expect_form_value('minute', '0') # should be absent, but API does not permit that - .expect_form_value_absent('day_of_week') - .expect_form_value_absent('day_of_month') - .expect_form_value_absent('month') - .expect_form_value_absent('max_snapshots') - .expect_url('{0}/storagebox/23/snapshotplan'.format(BASE_URL)), - ]) - assert result['changed'] is True - assert len(result['plans']) == 1 - assert result['plans'][0] == LEGACY_STORAGEBOX_PLAN_DISABLED[0]['snapshotplan'] - - def test_enable(self, mocker): - result = self.run_module_success(mocker, storagebox_snapshot_plan, { - 'hetzner_user': '', - 'hetzner_password': '', - 'storagebox_id': 23, 'plans': [ { 'status': 'enabled', 'minute': 5, 'hour': 12, 'day_of_week': 2, - 'max_snapshots': 2, - }, - ], - }, [ - FetchUrlCall('GET', 200) - .expect_basic_auth('', '') - .expect_force_basic_auth(True) - .result_json(LEGACY_STORAGEBOX_PLAN_DISABLED) - .expect_url('{0}/storagebox/23/snapshotplan'.format(BASE_URL)), - FetchUrlCall('POST', 400) - .expect_basic_auth('', '') - .expect_force_basic_auth(True) - .result_json(LEGACY_STORAGEBOX_PLAN_ENABLED) - .expect_form_value('status', 'enabled') - .expect_form_value('hour', '12') - .expect_form_value('minute', '5') - .expect_form_value('day_of_week', '2') - .expect_form_value_absent('day_of_month') - .expect_form_value_absent('month') - .expect_form_value('max_snapshots', '2') - .expect_url('{0}/storagebox/23/snapshotplan'.format(BASE_URL)), - ]) - assert result['changed'] is True - assert len(result['plans']) == 1 - assert result['plans'][0] == LEGACY_STORAGEBOX_PLAN_ENABLED[0]['snapshotplan'] - - def test_change(self, mocker): - updated_plan = legacy_update_plan(LEGACY_STORAGEBOX_PLAN_ENABLED, day_of_week=None, day_of_month=1) - result = self.run_module_success(mocker, storagebox_snapshot_plan, { - 'hetzner_user': '', - 'hetzner_password': '', - 'storagebox_id': 23, - 'plans': [ - { - 'status': 'enabled', - 'minute': 5, - 'hour': 12, - 'day_of_month': 1, + 'day_of_month': None, 'month': None, 'max_snapshots': 2, }, ], }, [ - FetchUrlCall('GET', 200) - .expect_basic_auth('', '') - .expect_force_basic_auth(True) - .result_json(LEGACY_STORAGEBOX_PLAN_ENABLED) - .expect_url('{0}/storagebox/23/snapshotplan'.format(BASE_URL)), - FetchUrlCall('POST', 400) - .expect_basic_auth('', '') - .expect_force_basic_auth(True) - # The actual API does not return a list, but its only entry directly - .result_json(updated_plan[0]) - .expect_form_value('status', 'enabled') - .expect_form_value('hour', '12') - .expect_form_value('minute', '5') - .expect_form_value_absent('day_of_week') - .expect_form_value('day_of_month', '1') - .expect_form_value_absent('month') - .expect_form_value('max_snapshots', '2') - .expect_url('{0}/storagebox/23/snapshotplan'.format(BASE_URL)), ]) - assert result['changed'] is True - assert len(result['plans']) == 1 - assert result['plans'][0] == updated_plan[0]['snapshotplan'] + assert result['msg'] == 'Storagebox with ID 1 does not exist' class TestHetznerStorageboxSnapshotPlan(BaseTestModule): diff --git a/tests/unit/plugins/modules/test_storagebox_snapshot_plan_info.py b/tests/unit/plugins/modules/test_storagebox_snapshot_plan_info.py index 6f1448ba..c8b3bfdf 100644 --- a/tests/unit/plugins/modules/test_storagebox_snapshot_plan_info.py +++ b/tests/unit/plugins/modules/test_storagebox_snapshot_plan_info.py @@ -12,38 +12,9 @@ ) from ansible_collections.community.hrobot.plugins.module_utils.api import API_BASE_URL -from ansible_collections.community.hrobot.plugins.module_utils.robot import BASE_URL from ansible_collections.community.hrobot.plugins.modules import storagebox_snapshot_plan_info -LEGACY_STORAGEBOX_PLAN_ENABLED = [ - { - 'snapshotplan': { - 'status': 'enabled', - 'minute': 5, - 'hour': 12, - 'day_of_week': 2, - 'day_of_month': None, - 'month': None, - 'max_snapshots': 2, - }, - }, -] - -LEGACY_STORAGEBOX_PLAN_DISABLED = [ - { - 'snapshotplan': { - 'status': 'disabled', - 'minute': None, - 'hour': None, - 'day_of_week': None, - 'day_of_month': None, - 'month': None, - 'max_snapshots': None, - }, - }, -] - STORAGEBOX_PLAN_ENABLED = { "storage_box": { 'id': 23, @@ -199,87 +170,12 @@ class TestHetznerStorageboxSnapshotPlanInfoLegacy(BaseTestModule): MOCK_ANSIBLE_MODULEUTILS_BASIC_ANSIBLEMODULE = 'ansible_collections.community.hrobot.plugins.modules.storagebox_snapshot_plan_info.AnsibleModule' MOCK_ANSIBLE_MODULEUTILS_URLS_FETCH_URL = 'ansible_collections.community.hrobot.plugins.module_utils.robot.fetch_url' - def test_regular_enabled(self, mocker): - result = self.run_module_success(mocker, storagebox_snapshot_plan_info, { - 'hetzner_user': 'test', - 'hetzner_password': 'hunter2', - 'storagebox_id': 23, - }, [ - FetchUrlCall('GET', 200) - .expect_basic_auth('test', 'hunter2') - .expect_force_basic_auth(True) - .result_json(LEGACY_STORAGEBOX_PLAN_ENABLED) - .expect_url('{0}/storagebox/23/snapshotplan'.format(BASE_URL)), - ]) - assert result['changed'] is False - assert len(result['plans']) == 1 - assert result['plans'][0] == LEGACY_STORAGEBOX_PLAN_ENABLED[0]['snapshotplan'] - - def test_regular_disabled(self, mocker): - result = self.run_module_success(mocker, storagebox_snapshot_plan_info, { - 'hetzner_user': 'test', - 'hetzner_password': 'hunter2', - 'storagebox_id': 23, - }, [ - FetchUrlCall('GET', 200) - .expect_basic_auth('test', 'hunter2') - .expect_force_basic_auth(True) - .result_json(LEGACY_STORAGEBOX_PLAN_DISABLED) - .expect_url('{0}/storagebox/23/snapshotplan'.format(BASE_URL)), - ]) - assert result['changed'] is False - assert len(result['plans']) == 1 - assert result['plans'][0] == LEGACY_STORAGEBOX_PLAN_DISABLED[0]['snapshotplan'] - - def test_actual_enabled(self, mocker): - result = self.run_module_success(mocker, storagebox_snapshot_plan_info, { - 'hetzner_user': 'test', - 'hetzner_password': 'hunter2', - 'storagebox_id': 23, - }, [ - FetchUrlCall('GET', 200) - .expect_basic_auth('test', 'hunter2') - .expect_force_basic_auth(True) - # The actual API does not return a list, but its only entry directly - .result_json(LEGACY_STORAGEBOX_PLAN_ENABLED[0]) - .expect_url('{0}/storagebox/23/snapshotplan'.format(BASE_URL)), - ]) - assert result['changed'] is False - assert len(result['plans']) == 1 - assert result['plans'][0] == LEGACY_STORAGEBOX_PLAN_ENABLED[0]['snapshotplan'] - - def test_actual_disabled(self, mocker): - result = self.run_module_success(mocker, storagebox_snapshot_plan_info, { - 'hetzner_user': 'test', - 'hetzner_password': 'hunter2', - 'storagebox_id': 23, - }, [ - FetchUrlCall('GET', 200) - .expect_basic_auth('test', 'hunter2') - .expect_force_basic_auth(True) - # The actual API does not return a list, but its only entry directly - .result_json(LEGACY_STORAGEBOX_PLAN_DISABLED[0]) - .expect_url('{0}/storagebox/23/snapshotplan'.format(BASE_URL)), - ]) - assert result['changed'] is False - assert len(result['plans']) == 1 - assert result['plans'][0] == LEGACY_STORAGEBOX_PLAN_DISABLED[0]['snapshotplan'] - def test_server_number_unknown(self, mocker): result = self.run_module_failed(mocker, storagebox_snapshot_plan_info, { 'hetzner_user': '', 'hetzner_password': '', 'storagebox_id': 23, }, [ - FetchUrlCall('GET', 404) - .result_json({ - 'error': { - 'status': 404, - 'code': 'STORAGEBOX_NOT_FOUND', - 'message': 'server not found', - }, - }) - .expect_url('{0}/storagebox/23/snapshotplan'.format(BASE_URL)), ]) assert result['msg'] == 'Storagebox with ID 23 does not exist' diff --git a/tests/unit/plugins/modules/test_storagebox_subaccount.py b/tests/unit/plugins/modules/test_storagebox_subaccount.py index b316b548..7d789ba9 100644 --- a/tests/unit/plugins/modules/test_storagebox_subaccount.py +++ b/tests/unit/plugins/modules/test_storagebox_subaccount.py @@ -15,45 +15,10 @@ from ansible_collections.community.internal_test_tools.tests.unit.compat.mock import MagicMock from ansible_collections.community.hrobot.plugins.module_utils.api import API_BASE_URL -from ansible_collections.community.hrobot.plugins.module_utils.robot import BASE_URL from ansible_collections.community.hrobot.plugins.modules import ( storagebox_subaccount, ) -LEGACY_STORAGEBOX_SUBACCOUNTS = [ - { - "subaccount": { - "username": "u2342-sub1", - "accountid": "u2342", - "server": "u12345-sub1.your-storagebox.de", - "homedirectory": "test", - "samba": True, - "ssh": True, - "external_reachability": True, - "webdav": False, - "readonly": False, - "createtime": "2017-05-24 00:00:00", - "comment": "Test account", - } - }, - { - "subaccount": { - "username": "u2342-sub2", - "accountid": "u2342", - "server": "u12345-sub2.your-storagebox.de", - "homedirectory": "test2", - "samba": False, - "ssh": True, - "external_reachability": True, - "webdav": True, - "readonly": False, - "createtime": "2025-01-24 00:00:00", - "comment": "Test account 2", - } - }, -] - - STORAGEBOX_SUBACCOUNTS = [ { "id": 1, @@ -102,879 +67,9 @@ def test_storagebox_id_unknown(self, mocker): 'hetzner_password': '', 'storagebox_id': 23, }, [ - FetchUrlCall('GET', 404) - .result_json({ - 'error': { - 'status': 404, - 'code': 'STORAGEBOX_NOT_FOUND', - 'message': 'Storage Box with ID 23 does not exist', - } - }) - .expect_url(BASE_URL + '/storagebox/23/subaccount') ]) assert result['msg'] == 'Storagebox with ID 23 does not exist' - def test_delete_unknown_subaccount_noop(self, mocker): - """Test deletion of a subaccount that doesn't exist (no-op).""" - result = self.run_module_success(mocker, storagebox_subaccount, { - 'hetzner_user': '', - 'hetzner_password': '', - 'storagebox_id': 1234, - 'state': 'absent', - 'username': 'ghost_user', - }, [ - FetchUrlCall('GET', 200) - .result_json(LEGACY_STORAGEBOX_SUBACCOUNTS) - .expect_url(BASE_URL + '/storagebox/1234/subaccount') - ]) - - assert result['changed'] is False - assert result['created'] is False - assert result['deleted'] is False - assert result['updated'] is False - assert result['password_updated'] is False - assert result['subaccount'] is None - - def test_delete_existing_subaccount(self, mocker): - """Test successful deletion of an existing subaccount.""" - result = self.run_module_success(mocker, storagebox_subaccount, { - 'hetzner_user': '', - 'hetzner_password': '', - 'storagebox_id': 1234, - 'state': 'absent', - 'username': 'u2342-sub2', - }, [ - FetchUrlCall('GET', 200) - .result_json(LEGACY_STORAGEBOX_SUBACCOUNTS) - .expect_url(BASE_URL + '/storagebox/1234/subaccount'), - - FetchUrlCall('DELETE', 200) - .result_json({}) - .expect_url(BASE_URL + '/storagebox/1234/subaccount/u2342-sub2') - ]) - - assert result['changed'] is True - assert result['created'] is False - assert result['deleted'] is True - assert result['updated'] is False - assert result['password_updated'] is False - assert result['subaccount'] is None - - def test_delete_existing_subaccount_idempotence_by_comment(self, mocker): - """Test successful deletion of an existing subaccount.""" - result = self.run_module_success(mocker, storagebox_subaccount, { - 'hetzner_user': '', - 'hetzner_password': '', - 'storagebox_id': 1234, - 'state': 'absent', - 'comment': 'Test account', - 'idempotence': 'comment' - }, [ - FetchUrlCall('GET', 200) - .result_json(LEGACY_STORAGEBOX_SUBACCOUNTS) - .expect_url(BASE_URL + '/storagebox/1234/subaccount'), - - FetchUrlCall('DELETE', 200) - .result_json({}) - .expect_url(BASE_URL + '/storagebox/1234/subaccount/u2342-sub1') - ]) - - assert result['changed'] is True - assert result['created'] is False - assert result['deleted'] is True - assert result['updated'] is False - assert result['password_updated'] is False - assert result['subaccount'] is None - - def test_create_subaccount(self, mocker): - result = self.run_module_success( - mocker, - storagebox_subaccount, - { - 'hetzner_user': '', - 'hetzner_password': '', - 'storagebox_id': 1234, - # subaccount - 'homedirectory': '/data/newsub', - 'samba': True, - 'ssh': False, - 'webdav': True, - 'readonly': False, - 'comment': 'My new subaccount' - }, - [ - FetchUrlCall('GET', 200) - .result_json(LEGACY_STORAGEBOX_SUBACCOUNTS) - .expect_url(BASE_URL + '/storagebox/1234/subaccount'), - - FetchUrlCall('POST', 200) - .expect_url(BASE_URL + '/storagebox/1234/subaccount') - .expect_form_value('homedirectory', '/data/newsub') - .expect_form_value('samba', 'true') - .expect_form_value('ssh', 'false') - .expect_form_value('webdav', 'true') - .expect_form_value('readonly', 'false') - .expect_form_value('comment', 'My new subaccount') - .result_json({ - 'subaccount': { - 'username': 'generated_user', - 'homedirectory': '/data/newsub', - 'password': 'autogeneratedpass123' - } - }), - ] - ) - - assert result['changed'] is True - assert result['created'] is True - assert result['deleted'] is False - assert result['updated'] is False - assert result['password_updated'] is False - assert result['subaccount'] == { - 'username': 'generated_user', - 'homedirectory': '/data/newsub', - 'password': 'autogeneratedpass123', - 'samba': True, - 'ssh': False, - 'webdav': True, - 'readonly': False, - 'comment': 'My new subaccount', - } - - # Ensures providing a username doesn't trigger accidental update (if username isn't known) - def test_create_subaccount_unknown_username(self, mocker): - result = self.run_module_success( - mocker, - storagebox_subaccount, - { - 'hetzner_user': '', - 'hetzner_password': '', - 'storagebox_id': 1234, - # subaccount data - 'homedirectory': '/data/newsub', - 'samba': True, - 'ssh': False, - 'webdav': True, - 'readonly': False, - 'comment': 'My new subaccount', - 'username': "I'll be ignored", - }, - [ - FetchUrlCall('GET', 200) - .result_json(LEGACY_STORAGEBOX_SUBACCOUNTS) - .expect_url(BASE_URL + '/storagebox/1234/subaccount'), - - FetchUrlCall('POST', 200) - .expect_url(BASE_URL + '/storagebox/1234/subaccount') - .expect_form_value('homedirectory', '/data/newsub') - .expect_form_value('samba', 'true') - .expect_form_value('ssh', 'false') - .expect_form_value('webdav', 'true') - .expect_form_value('readonly', 'false') - .expect_form_value('comment', 'My new subaccount') - .result_json({ - 'subaccount': { - 'username': 'generated_user', - 'homedirectory': '/data/newsub', - 'password': 'autogeneratedpass123' - } - }), - ] - ) - - assert result['changed'] is True - assert result['created'] is True - assert result['deleted'] is False - assert result['updated'] is False - assert result['password_updated'] is False - assert result['subaccount'] == { - 'username': 'generated_user', - 'homedirectory': '/data/newsub', - 'password': 'autogeneratedpass123', - 'samba': True, - 'ssh': False, - 'webdav': True, - 'readonly': False, - 'comment': 'My new subaccount', - } - - def test_create_subaccount_set_to_random(self, mocker): - result = self.run_module_success( - mocker, - storagebox_subaccount, - { - 'hetzner_user': '', - 'hetzner_password': '', - 'storagebox_id': 1234, - 'password_mode': 'set-to-random', - # subaccount - 'homedirectory': '/data/newsub', - 'samba': True, - 'ssh': False, - 'webdav': True, - 'readonly': False, - 'comment': 'My new subaccount', - 'password': 'toto' - }, - [ - FetchUrlCall('GET', 200) - .result_json(LEGACY_STORAGEBOX_SUBACCOUNTS) - .expect_url(BASE_URL + '/storagebox/1234/subaccount'), - - FetchUrlCall('POST', 200) - .expect_url(BASE_URL + '/storagebox/1234/subaccount') - .expect_form_value('homedirectory', '/data/newsub') - .expect_form_value('samba', 'true') - .expect_form_value('ssh', 'false') - .expect_form_value('webdav', 'true') - .expect_form_value('readonly', 'false') - .expect_form_value('comment', 'My new subaccount') - .expect_form_value_absent('password') - .result_json({ - 'subaccount': { - 'username': 'generated_user', - 'homedirectory': '/data/newsub', - 'password': 'autogeneratedpass123' - } - }), - ] - ) - - assert result['changed'] is True - assert result['created'] is True - assert result['deleted'] is False - assert result['updated'] is False - assert result['password_updated'] is False - assert result['subaccount'] == { - 'username': 'generated_user', - 'homedirectory': '/data/newsub', - 'password': 'autogeneratedpass123', - 'samba': True, - 'ssh': False, - 'webdav': True, - 'readonly': False, - 'comment': 'My new subaccount', - } - - def test_create_subaccount_limit_exceeded(self, mocker): - result = self.run_module_failed( - mocker, - storagebox_subaccount, - { - 'hetzner_user': '', - 'hetzner_password': '', - 'storagebox_id': 1234, - # subaccount - 'homedirectory': '/data/newsub', - 'samba': True, - 'ssh': False, - 'webdav': True, - 'readonly': False, - 'comment': 'My new subaccount' - }, - [ - FetchUrlCall('GET', 200) - .result_json(LEGACY_STORAGEBOX_SUBACCOUNTS) - .expect_url(BASE_URL + '/storagebox/1234/subaccount'), - - FetchUrlCall('POST', 400) - .expect_url(BASE_URL + '/storagebox/1234/subaccount') - .expect_form_value('homedirectory', '/data/newsub') - .expect_form_value('samba', 'true') - .expect_form_value('ssh', 'false') - .expect_form_value('webdav', 'true') - .expect_form_value('readonly', 'false') - .expect_form_value('comment', 'My new subaccount') - .result_json({ - "error": { - "status": 400, - "code": "STORAGEBOX_SUBACCOUNT_LIMIT_EXCEEDED", - "message": "Too many requests" - } - }) - ] - ) - assert result['msg'] == "Subaccount limit exceeded" - - def test_create_subaccount_invalid_password(self, mocker): - result = self.run_module_failed( - mocker, - storagebox_subaccount, - { - 'hetzner_user': '', - 'hetzner_password': '', - 'storagebox_id': 1234, - # subaccount - 'homedirectory': '/data/newsub', - 'samba': True, - 'ssh': False, - 'webdav': True, - 'readonly': False, - 'comment': 'Invalid password attempt', - 'password': '123' - }, - [ - FetchUrlCall('GET', 200) - .result_json(LEGACY_STORAGEBOX_SUBACCOUNTS) - .expect_url(BASE_URL + '/storagebox/1234/subaccount'), - - FetchUrlCall('POST', 400) - .expect_url(BASE_URL + '/storagebox/1234/subaccount') - .expect_form_value("password", "123") - .result_json({ - "error": { - "status": 400, - "code": "STORAGEBOX_INVALID_PASSWORD", - "message": "Password does not meet security requirements" - } - }) - ] - ) - - assert result['msg'] == "Invalid password (says Hetzner)" - - def test_update_subaccount_noop(self, mocker): - result = self.run_module_success( - mocker, - storagebox_subaccount, - { - 'hetzner_user': '', - 'hetzner_password': '', - 'storagebox_id': 1234, - # subaccount - "username": "u2342-sub1", - "homedirectory": "test", - "samba": True, - "ssh": True, - "external_reachability": True, - "webdav": False, - "readonly": False, - "comment": "Test account", - }, - [ - FetchUrlCall('GET', 200) - .result_json(LEGACY_STORAGEBOX_SUBACCOUNTS) - .expect_url(BASE_URL + '/storagebox/1234/subaccount'), - ] - ) - - assert result['changed'] is False - assert result['created'] is False - assert result['deleted'] is False - assert result['updated'] is False - assert result['password_updated'] is False - assert result['subaccount'] == { - 'username': 'u2342-sub1', - 'accountid': 'u2342', - 'server': 'u12345-sub1.your-storagebox.de', - 'homedirectory': 'test', - 'samba': True, - 'ssh': True, - 'external_reachability': True, - 'webdav': False, - 'readonly': False, - 'createtime': '2017-05-24 00:00:00', - 'comment': 'Test account', - } - - def test_update_subaccount_noop_idempotence_by_comment(self, mocker): - result = self.run_module_success( - mocker, - storagebox_subaccount, - { - 'hetzner_user': '', - 'hetzner_password': '', - 'storagebox_id': 1234, - # subaccount - 'idempotence': 'comment', - "homedirectory": "test", - "samba": True, - "ssh": True, - "external_reachability": True, - "webdav": False, - "readonly": False, - "comment": "Test account", - }, - [ - FetchUrlCall('GET', 200) - .result_json(LEGACY_STORAGEBOX_SUBACCOUNTS) - .expect_url(BASE_URL + '/storagebox/1234/subaccount'), - ] - ) - - assert result['changed'] is False - assert result['created'] is False - assert result['deleted'] is False - assert result['updated'] is False - assert result['password_updated'] is False - assert result['subaccount'] == { - 'username': 'u2342-sub1', - 'accountid': 'u2342', - 'server': 'u12345-sub1.your-storagebox.de', - 'homedirectory': 'test', - 'samba': True, - 'ssh': True, - 'external_reachability': True, - 'webdav': False, - 'readonly': False, - 'createtime': '2017-05-24 00:00:00', - 'comment': 'Test account', - } - - def test_update_subaccount(self, mocker): - input = { - 'hetzner_user': '', - 'hetzner_password': '', - 'storagebox_id': 1234, - # subaccount - "username": "u2342-sub1", - "homedirectory": "test", - "samba": True, - "ssh": True, - "external_reachability": True, - "webdav": False, - "readonly": False, - "comment": "new comment", - } - result = self.run_module_success( - mocker, - storagebox_subaccount, - input, - [ - FetchUrlCall('GET', 200) - .result_json(LEGACY_STORAGEBOX_SUBACCOUNTS) - .expect_url(BASE_URL + '/storagebox/1234/subaccount'), - - FetchUrlCall('PUT', 200) - .expect_url(BASE_URL + '/storagebox/1234/subaccount/' + input['username']) - .expect_form_value("homedirectory", input["homedirectory"]) - .expect_form_value("samba", str(input["samba"]).lower()) - .expect_form_value("ssh", str(input["ssh"]).lower()) - .expect_form_value("external_reachability", str(input["external_reachability"]).lower()) - .expect_form_value("webdav", str(input["webdav"]).lower()) - .expect_form_value("readonly", str(input["readonly"]).lower()) - .expect_form_value("comment", input["comment"]) - .result_json({}) - ] - ) - - assert result['changed'] is True - assert result['created'] is False - assert result['deleted'] is False - assert result['updated'] is True - assert result['password_updated'] is False - assert result['subaccount'] == { - 'username': 'u2342-sub1', - 'accountid': 'u2342', - 'server': 'u12345-sub1.your-storagebox.de', - 'homedirectory': 'test', - 'samba': True, - 'ssh': True, - 'external_reachability': True, - 'webdav': False, - 'readonly': False, - 'createtime': '2017-05-24 00:00:00', - 'comment': 'new comment', - } - - def test_update_subaccount_idempotence_by_comment(self, mocker): - input = { - 'hetzner_user': '', - 'hetzner_password': '', - 'storagebox_id': 1234, - # subaccount - 'idempotence': 'comment', - "homedirectory": "/new/homedir", - "samba": True, - "ssh": True, - "external_reachability": True, - "webdav": False, - "readonly": False, - "comment": "Test account", - } - result = self.run_module_success( - mocker, - storagebox_subaccount, - input, - [ - FetchUrlCall('GET', 200) - .result_json(LEGACY_STORAGEBOX_SUBACCOUNTS) - .expect_url(BASE_URL + '/storagebox/1234/subaccount'), - - FetchUrlCall('PUT', 200) - .expect_url(BASE_URL + '/storagebox/1234/subaccount/' + LEGACY_STORAGEBOX_SUBACCOUNTS[0]['subaccount']['username']) - .expect_form_value("homedirectory", input["homedirectory"]) - .expect_form_value("samba", str(input["samba"]).lower()) - .expect_form_value("ssh", str(input["ssh"]).lower()) - .expect_form_value("external_reachability", str(input["external_reachability"]).lower()) - .expect_form_value("webdav", str(input["webdav"]).lower()) - .expect_form_value("readonly", str(input["readonly"]).lower()) - .expect_form_value("comment", input["comment"]) - .result_json({}) - ] - ) - - assert result['changed'] is True - assert result['created'] is False - assert result['deleted'] is False - assert result['updated'] is True - assert result['password_updated'] is False - assert result['subaccount'] == { - 'username': 'u2342-sub1', - 'accountid': 'u2342', - 'server': 'u12345-sub1.your-storagebox.de', - 'homedirectory': '/new/homedir', - 'samba': True, - 'ssh': True, - 'external_reachability': True, - 'webdav': False, - 'readonly': False, - 'createtime': '2017-05-24 00:00:00', - 'comment': 'Test account', - } - - def test_update_subaccount_set_to_random(self, mocker): - input = { - 'hetzner_user': '', - 'hetzner_password': '', - 'storagebox_id': 1234, - # subaccount - "username": "u2342-sub1", - "homedirectory": "test", - "samba": True, - "ssh": True, - "external_reachability": True, - "webdav": False, - "readonly": False, - "comment": "new comment", - "password": "toto", - "password_mode": "set-to-random", - } - result = self.run_module_success( - mocker, - storagebox_subaccount, - input, - [ - FetchUrlCall('GET', 200) - .result_json(LEGACY_STORAGEBOX_SUBACCOUNTS) - .expect_url(BASE_URL + '/storagebox/1234/subaccount'), - - FetchUrlCall('POST', 200) - .expect_url(BASE_URL + '/storagebox/1234/subaccount/' + input['username'] + '/password') - .expect_form_value_absent("password") - .result_json({"password": "newRandomPassword"}), - - FetchUrlCall('PUT', 200) - .expect_url(BASE_URL + '/storagebox/1234/subaccount/' + input['username']) - .expect_form_value("homedirectory", input["homedirectory"]) - .expect_form_value("samba", str(input["samba"]).lower()) - .expect_form_value("ssh", str(input["ssh"]).lower()) - .expect_form_value("external_reachability", str(input["external_reachability"]).lower()) - .expect_form_value("webdav", str(input["webdav"]).lower()) - .expect_form_value("readonly", str(input["readonly"]).lower()) - .expect_form_value("comment", input["comment"]) - .result_json({}) - ] - ) - - assert result['changed'] is True - assert result['created'] is False - assert result['deleted'] is False - assert result['updated'] is True - assert result['password_updated'] is True - assert result['subaccount'] == { - 'username': 'u2342-sub1', - 'accountid': 'u2342', - 'server': 'u12345-sub1.your-storagebox.de', - 'homedirectory': 'test', - 'samba': True, - 'ssh': True, - 'external_reachability': True, - 'webdav': False, - 'readonly': False, - 'createtime': '2017-05-24 00:00:00', - 'comment': 'new comment', - 'password': 'newRandomPassword', - } - - def test_update_password_only(self, mocker): - result = self.run_module_success( - mocker, - storagebox_subaccount, - { - 'hetzner_user': '', - 'hetzner_password': '', - 'storagebox_id': 1234, - # account - "password": "newsecurepassword", - "username": "u2342-sub1", - "homedirectory": "test", - "samba": True, - "ssh": True, - "external_reachability": True, - "webdav": False, - "readonly": False, - "comment": "Test account", - }, - [ - FetchUrlCall('GET', 200) - .result_json(LEGACY_STORAGEBOX_SUBACCOUNTS) - .expect_url(BASE_URL + '/storagebox/1234/subaccount'), - - FetchUrlCall('POST', 200) - .expect_url(BASE_URL + '/storagebox/1234/subaccount/u2342-sub1/password') - .expect_form_value("password", "newsecurepassword") - .result_json({"password": "newsecurepassword"}) - ] - ) - - assert result['changed'] is True - assert result['created'] is False - assert result['deleted'] is False - assert result['updated'] is False - assert result['password_updated'] is True - assert result['subaccount'] == { - 'username': 'u2342-sub1', - 'accountid': 'u2342', - 'server': 'u12345-sub1.your-storagebox.de', - 'homedirectory': 'test', - 'samba': True, - 'ssh': True, - 'external_reachability': True, - 'webdav': False, - 'readonly': False, - 'createtime': '2017-05-24 00:00:00', - 'comment': 'Test account', - 'password': 'newsecurepassword', - } - - def test_update_subaccount_invalid_password(self, mocker): - result = self.run_module_failed( - mocker, - storagebox_subaccount, - { - 'hetzner_user': '', - 'hetzner_password': '', - 'storagebox_id': 1234, - # subaccount - "password": "123", - "username": "u2342-sub1", - "homedirectory": "test", - "samba": True, - "ssh": True, - "external_reachability": True, - "webdav": False, - "readonly": False, - "comment": "Test account", - }, - [ - FetchUrlCall('GET', 200) - .result_json(LEGACY_STORAGEBOX_SUBACCOUNTS) - .expect_url(BASE_URL + '/storagebox/1234/subaccount'), - - FetchUrlCall('POST', 400) - .expect_url(BASE_URL + '/storagebox/1234/subaccount/u2342-sub1/password') - .expect_form_value("password", "123") - .result_json({ - "error": { - "status": 400, - "code": "STORAGEBOX_INVALID_PASSWORD", - "message": "Password does not meet security requirements" - } - }) - ] - ) - - assert result['msg'] == "Invalid password (says Hetzner)" - - def test_invalid_spec_create_no_homedir(self, mocker): - result = self.run_module_failed( - mocker, - storagebox_subaccount, - { - 'hetzner_user': '', - 'hetzner_password': '', - 'storagebox_id': 1234, - 'state': 'present', - }, - [ - FetchUrlCall('GET', 200) - .result_json(LEGACY_STORAGEBOX_SUBACCOUNTS) - .expect_url(BASE_URL + '/storagebox/1234/subaccount') - ] - ) - - assert result['msg'] == "homedirectory is required when creating a new subaccount" - - # Check mode tests - def test_update_password_only_CHECK_MODE(self, mocker): - result = self.run_module_success( - mocker, - storagebox_subaccount, - { - 'hetzner_user': '', - 'hetzner_password': '', - 'storagebox_id': 1234, - '_ansible_check_mode': True, - # subaccount - "password": "newsecurepassword", - "username": "u2342-sub1", - "homedirectory": "test", - "samba": True, - "ssh": True, - "external_reachability": True, - "webdav": False, - "readonly": False, - "comment": "Test account", - }, - [ - FetchUrlCall('GET', 200) - .result_json(LEGACY_STORAGEBOX_SUBACCOUNTS) - .expect_url(BASE_URL + '/storagebox/1234/subaccount') - ] - ) - - assert result['changed'] is True - assert result['created'] is False - assert result['deleted'] is False - assert result['updated'] is False - assert result['password_updated'] is True - assert result['subaccount'] == { - 'username': 'u2342-sub1', - 'accountid': 'u2342', - 'server': 'u12345-sub1.your-storagebox.de', - 'homedirectory': 'test', - 'samba': True, - 'ssh': True, - 'external_reachability': True, - 'webdav': False, - 'readonly': False, - 'createtime': '2017-05-24 00:00:00', - 'comment': 'Test account', - 'password': 'newsecurepassword', - } - - def test_create_subaccount_CHECK_MODE(self, mocker): - result = self.run_module_success( - mocker, - storagebox_subaccount, - { - 'hetzner_user': '', - 'hetzner_password': '', - 'storagebox_id': 1234, - # subaccount, - 'homedirectory': '/data/newsub', - 'samba': True, - 'ssh': False, - 'webdav': True, - 'readonly': False, - 'comment': 'My new subaccount', - '_ansible_check_mode': True, - }, - [ - FetchUrlCall('GET', 200) - .result_json(LEGACY_STORAGEBOX_SUBACCOUNTS) - .expect_url(BASE_URL + '/storagebox/1234/subaccount') - ] - ) - - assert result['changed'] is True - assert result['created'] is True - assert result['deleted'] is False - assert result['updated'] is False - assert result['password_updated'] is False - assert result['subaccount'] == { - 'homedirectory': '/data/newsub', - 'samba': True, - 'ssh': False, - 'webdav': True, - 'readonly': False, - 'comment': 'My new subaccount', - } - - def test_update_subaccount_CHECK_MODE(self, mocker): - result = self.run_module_success( - mocker, - storagebox_subaccount, - { - 'hetzner_user': '', - 'hetzner_password': '', - 'storagebox_id': 1234, - '_ansible_check_mode': True, - # subaccount - "username": "u2342-sub1", - "homedirectory": "test", - "samba": True, - "ssh": True, - "external_reachability": True, - "webdav": False, - "readonly": False, - "comment": "new comment", - - }, - [ - FetchUrlCall('GET', 200) - .result_json(LEGACY_STORAGEBOX_SUBACCOUNTS) - .expect_url(BASE_URL + '/storagebox/1234/subaccount') - ] - ) - - assert result['changed'] is True - assert result['created'] is False - assert result['deleted'] is False - assert result['updated'] is True - assert result['password_updated'] is False - assert result['subaccount'] == { - 'username': 'u2342-sub1', - 'accountid': 'u2342', - 'server': 'u12345-sub1.your-storagebox.de', - 'homedirectory': 'test', - 'samba': True, - 'ssh': True, - 'external_reachability': True, - 'webdav': False, - 'readonly': False, - 'createtime': '2017-05-24 00:00:00', - 'comment': 'new comment', - } - - def test_delete_existing_subaccount_CHECK_MODE(self, mocker): - """Test successful deletion of an existing subaccount.""" - result = self.run_module_success(mocker, storagebox_subaccount, { - 'hetzner_user': '', - 'hetzner_password': '', - 'storagebox_id': 1234, - 'state': 'absent', - 'username': 'u2342-sub2', - '_ansible_check_mode': True, - }, [ - FetchUrlCall('GET', 200) - .result_json(LEGACY_STORAGEBOX_SUBACCOUNTS) - .expect_url(BASE_URL + '/storagebox/1234/subaccount') - ]) - - assert result['changed'] is True - assert result['created'] is False - assert result['deleted'] is True - assert result['updated'] is False - assert result['password_updated'] is False - assert result['subaccount'] is None - - def test_broken_idempotence_same_comment_multiple_accounts(self, mocker): - result = self.run_module_failed(mocker, storagebox_subaccount, { - 'hetzner_user': '', - 'hetzner_password': '', - 'storagebox_id': 1234, - 'idempotence': 'comment', - 'comment': LEGACY_STORAGEBOX_SUBACCOUNTS[0]['subaccount']['comment'], - }, [ - FetchUrlCall('GET', 200) - .result_json([LEGACY_STORAGEBOX_SUBACCOUNTS[0], LEGACY_STORAGEBOX_SUBACCOUNTS[0]]) - .expect_url(BASE_URL + '/storagebox/1234/subaccount') - ]) - - assert result['msg'] == "More than one subaccount matched the idempotence criteria." - class TestHetznerStorageboxSubbacount(BaseTestModule): MOCK_ANSIBLE_MODULEUTILS_BASIC_ANSIBLEMODULE = 'ansible_collections.community.hrobot.plugins.modules.storagebox_subaccount.AnsibleModule' diff --git a/tests/unit/plugins/modules/test_storagebox_subaccount_info.py b/tests/unit/plugins/modules/test_storagebox_subaccount_info.py index f43fad8a..643ade82 100644 --- a/tests/unit/plugins/modules/test_storagebox_subaccount_info.py +++ b/tests/unit/plugins/modules/test_storagebox_subaccount_info.py @@ -13,44 +13,10 @@ ) from ansible_collections.community.hrobot.plugins.module_utils.api import API_BASE_URL -from ansible_collections.community.hrobot.plugins.module_utils.robot import BASE_URL from ansible_collections.community.hrobot.plugins.modules import ( storagebox_subaccount_info, ) -LEGACY_STORAGEBOX_SUBACCOUNTS = [ - { - "subaccount": { - "username": "u2342-sub1", - "accountid": "u2342", - "server": "u12345-sub1.your-storagebox.de", - "homedirectory": "test", - "samba": True, - "ssh": True, - "external_reachability": True, - "webdav": False, - "readonly": False, - "createtime": "2017-05-24 00:00:00", - "comment": "Test account", - } - }, - { - "subaccount": { - "username": "u2342-sub2", - "accountid": "u2342", - "server": "u12345-sub2.your-storagebox.de", - "homedirectory": "test2", - "samba": False, - "ssh": True, - "external_reachability": True, - "webdav": True, - "readonly": False, - "createtime": "2025-01-24 00:00:00", - "comment": "Test account 2", - } - }, -] - STORAGEBOX_SUBACCOUNTS = { "subaccounts": [ { @@ -82,36 +48,12 @@ class TestHetznerStorageboxSubbacountInfoLegacy(BaseTestModule): MOCK_ANSIBLE_MODULEUTILS_BASIC_ANSIBLEMODULE = 'ansible_collections.community.hrobot.plugins.modules.storagebox_subaccount_info.AnsibleModule' MOCK_ANSIBLE_MODULEUTILS_URLS_FETCH_URL = 'ansible_collections.community.hrobot.plugins.module_utils.robot.fetch_url' - def test_subaccounts(self, mocker): - result = self.run_module_success(mocker, storagebox_subaccount_info, { - 'hetzner_user': 'test', - 'hetzner_password': 'hunter2', - 'storagebox_id': 23}, [ - FetchUrlCall('GET', 200) - .expect_basic_auth('test', 'hunter2') - .expect_force_basic_auth(True) - .result_json(LEGACY_STORAGEBOX_SUBACCOUNTS) - .expect_url(BASE_URL + '/storagebox/23/subaccount') - ]) - assert result['changed'] is False - assert len(result['subaccounts']) == 2 - assert result['subaccounts'][0] == LEGACY_STORAGEBOX_SUBACCOUNTS[0]['subaccount'] - def test_storagebox_id_unknown(self, mocker): result = self.run_module_failed(mocker, storagebox_subaccount_info, { 'hetzner_user': '', 'hetzner_password': '', 'storagebox_id': 23 }, [ - FetchUrlCall('GET', 404) - .result_json({ - 'error': { - 'status': 404, - 'code': 'STORAGEBOX_NOT_FOUND', - 'message': 'Storage Box with ID 23 does not exist', - } - }) - .expect_url(BASE_URL + '/storagebox/23/subaccount') ]) assert result['msg'] == 'Storagebox with ID 23 does not exist'