From f493b964053bd452cb61bff08202d342075af1c8 Mon Sep 17 00:00:00 2001 From: Mark Tripod Date: Wed, 29 Nov 2023 13:21:26 -0500 Subject: [PATCH 1/7] doc: add report_user_by_email.py to examples --- examples/report_user_by_email.py | 53 ++++++++++++++++++++++++++++++++ 1 file changed, 53 insertions(+) create mode 100755 examples/report_user_by_email.py diff --git a/examples/report_user_by_email.py b/examples/report_user_by_email.py new file mode 100755 index 0000000..8704772 --- /dev/null +++ b/examples/report_user_by_email.py @@ -0,0 +1,53 @@ +#!/usr/bin/env python + +""" Script to illustrate how to retrieve a user from the Duo Admin API using the associated email address""" + +from __future__ import absolute_import, print_function +import sys +import getpass +from pprint import pprint + +import duo_client +from six.moves import input + +argv_iter = iter(sys.argv[1:]) + + +def get_next_arg(prompt, secure=False): + """Read information from STDIN, using getpass when sensitive information should not be echoed to tty""" + try: + return next(argv_iter) + except StopIteration: + if secure is True: + return getpass.getpass(prompt) + else: + return input(prompt) + + +def main(): + """ Primary script execution code """ + # Configuration and information about objects to create. + admin_api = duo_client.Admin( + ikey=get_next_arg('Admin API integration key ("DI..."): '), + skey=get_next_arg('integration secret key: ', secure=True), + host=get_next_arg('API hostname ("api-....duosecurity.com"): '), + ) + + # Retrieve user info from API: + email_address = get_next_arg('E-mail address of user to retrieve: ') + req_params = {"email": email_address} + # There is no get_user_by_email in the duo_client_python library, so we use the generic api_call + user = admin_api.json_api_call( + method='GET', + path='/admin/v1/users', + params=req_params + ) + + if user: + pprint(user, indent=2) + else: + print(f"User with email [{email_address}] could not be found.") + + +if __name__ == '__main__': + main() From 48580c1cd1fdc7465762e1bcf180f9b0d88c5b70 Mon Sep 17 00:00:00 2001 From: Mark Tripod Date: Thu, 30 Nov 2023 10:58:28 -0500 Subject: [PATCH 2/7] feat: add get_user_by_email() method to admin.py doc: add report_user_by_email.py to examples --- duo_client/admin.py | 552 ++++++++++++++++--------------- examples/report_user_by_email.py | 8 +- 2 files changed, 284 insertions(+), 276 deletions(-) diff --git a/duo_client/admin.py b/duo_client/admin.py index 884c6c3..ee50170 100644 --- a/duo_client/admin.py +++ b/duo_client/admin.py @@ -194,19 +194,19 @@ TOKEN_YUBIKEY = "yk" VALID_AUTHLOG_REQUEST_PARAMS = [ - "mintime", - "maxtime", - "limit", - "sort", - "next_offset", - "event_types", - "reasons", - "results", - "users", - "applications", - "groups", - "factors", - "api_version", + "mintime", + "maxtime", + "limit", + "sort", + "next_offset", + "event_types", + "reasons", + "results", + "users", + "applications", + "groups", + "factors", + "api_version", ] VALID_ACTIVITY_REQUEST_PARAMS = ["mintime", "maxtime", "limit", "sort", "next_offset"] @@ -220,12 +220,11 @@ def api_call(self, method, path, params): params['account_id'] = self.account_id return super(Admin, self).api_call( - method, - path, - params, + method, + path, + params, ) - @classmethod def _canonicalize_ip_whitelist(klass, ip_whitelist): if isinstance(ip_whitelist, six.string_types): @@ -280,7 +279,7 @@ def get_administrative_units(self, admin_id=None, group_id=None, params) iterator = self.get_administrative_units_iterator( - admin_id, group_id, integration_key) + admin_id, group_id, integration_key) return list(iterator) @@ -339,12 +338,12 @@ def get_administrator_log(self, # Sanity check mintime as unix timestamp, then transform to string mintime = str(int(mintime)) params = { - 'mintime': mintime, + 'mintime': mintime, } response = self.json_api_call( - 'GET', - '/admin/v1/logs/administrator', - params, + 'GET', + '/admin/v1/logs/administrator', + params, ) for row in response: row['eventtype'] = 'administrator' @@ -378,12 +377,12 @@ def get_offline_log(self, # Sanity check mintime as unix timestamp, then transform to string mintime = str(int(mintime)) params = { - 'mintime': mintime, + 'mintime': mintime, } response = self.json_api_call( - 'GET', - '/admin/v1/logs/offline_enrollment', - params, + 'GET', + '/admin/v1/logs/offline_enrollment', + params, ) return response @@ -486,21 +485,21 @@ def get_authentication_log(self, api_version=1, **kwargs): Raises RuntimeError on error. """ - if api_version not in [1,2]: + if api_version not in [1, 2]: raise ValueError("Invalid API Version") params = {} - if api_version == 1: #v1 + if api_version == 1: # v1 params['mintime'] = kwargs['mintime'] if 'mintime' in kwargs else 0; # Sanity check mintime as unix timestamp, then transform to string params['mintime'] = '{:d}'.format(int(params['mintime'])) warnings.warn( - 'The v1 Admin API for retrieving authentication log events ' - 'will be deprecated in a future release of the Duo Admin API. ' - 'Please migrate to the v2 API.', - DeprecationWarning) - else: #v2 + 'The v1 Admin API for retrieving authentication log events ' + 'will be deprecated in a future release of the Duo Admin API. ' + 'Please migrate to the v2 API.', + DeprecationWarning) + else: # v2 for k in kwargs: if kwargs[k] is not None and k in VALID_AUTHLOG_REQUEST_PARAMS: params[k] = kwargs[k] @@ -510,17 +509,15 @@ def get_authentication_log(self, api_version=1, **kwargs): # Sanity check mintime as unix timestamp, then transform to string params['mintime'] = '{:d}'.format(int(params['mintime'])) - if 'maxtime' not in params: params['maxtime'] = int(time.time()) * 1000 # Sanity check maxtime as unix timestamp, then transform to string params['maxtime'] = '{:d}'.format(int(params['maxtime'])) - response = self.json_api_call( - 'GET', - '/admin/v{}/logs/authentication'.format(api_version), - params, + 'GET', + '/admin/v{}/logs/authentication'.format(api_version), + params, ) if api_version == 1: @@ -613,16 +610,16 @@ def get_activity_logs(self, **kwargs): params['mintime'] = default_mintime params['mintime'] = str(int(params['mintime'])) if 'maxtime' not in params: - #if maxtime is not provided, the script defaults it to now + # if maxtime is not provided, the script defaults it to now params['maxtime'] = default_maxtime params['maxtime'] = str(int(params['maxtime'])) if 'limit' in params: params['limit'] = str(int(params['limit'])) response = self.json_api_call( - 'GET', - '/admin/v2/logs/activity', - params, + 'GET', + '/admin/v2/logs/activity', + params, ) for row in response['items']: row['eventtype'] = 'activity' @@ -681,7 +678,7 @@ def get_telephony_log(self, mintime=0, api_version=1, **kwargs): Raises RuntimeError on error. """ - if api_version not in [1,2]: + if api_version not in [1, 2]: raise ValueError("Invalid API Version") if api_version == 2: @@ -709,10 +706,28 @@ def get_users(self, limit=None, offset=0): (limit, offset) = self.normalize_paging_args(limit, offset) if limit: return self.json_api_call( - 'GET', '/admin/v1/users', {'limit': limit, 'offset': offset}) + 'GET', '/admin/v1/users', {'limit': limit, 'offset': offset}) return list(self.get_users_iterator()) + def get_user_by_email(self, email_address: str) -> list: + """ + Returns user specified by email_address. + + email_address - E-mail address of user to fetch + + Returns a list of 0 or 1 user objects. + + Raises RuntimeError on error. + """ + params = { + 'email': email_address, + } + response = self.json_api_call('GET', + '/admin/v1/users', + params) + return response + def get_user_by_id(self, user_id): """ Returns user specified by user_id. @@ -739,7 +754,7 @@ def get_users_by_name(self, username): Raises RuntimeError on error. """ params = { - 'username': username, + 'username': username, } response = self.json_api_call('GET', '/admin/v1/users', @@ -758,7 +773,7 @@ def get_users_by_names(self, usernames): """ username_list = json.dumps(usernames) params = { - 'username_list': username_list, + 'username_list': username_list, } response = self.json_paging_api_call('GET', '/admin/v1/users', @@ -777,7 +792,7 @@ def get_users_by_ids(self, user_ids): """ user_id_list = json.dumps(user_ids) params = { - 'user_id_list': user_id_list, + 'user_id_list': user_id_list, } response = self.json_paging_api_call('GET', '/admin/v1/users', @@ -806,7 +821,7 @@ def add_user(self, username, realname=None, status=None, Raises RuntimeError on error. """ params = { - 'username': username, + 'username': username, } if realname is not None: params['realname'] = realname @@ -914,8 +929,8 @@ def enroll_user(self, username, email, valid_secs=None): """ path = '/admin/v1/users/enroll' params = { - 'username': username, - 'email': email, + 'username': username, + 'email': email, } if valid_secs is not None: @@ -923,7 +938,8 @@ def enroll_user(self, username, email, valid_secs=None): return self.json_api_call('POST', path, params) - def add_user_bypass_codes(self, user_id, count=None, valid_secs=None, remaining_uses=None, codes=None, preserve_existing=None): + def add_user_bypass_codes(self, user_id, count=None, valid_secs=None, remaining_uses=None, codes=None, + preserve_existing=None): """ Replace a user's bypass codes with new codes. @@ -953,7 +969,7 @@ def add_user_bypass_codes(self, user_id, count=None, valid_secs=None, remaining_ if codes is not None: params['codes'] = self._canonicalize_bypass_codes(codes) - + if preserve_existing is not None: params['preserve_existing'] = preserve_existing @@ -975,7 +991,6 @@ def get_user_bypass_codes_iterator(self, user_id): path = '/admin/v1/users/' + user_id + '/bypass_codes' return self.json_paging_api_call('GET', path, {}) - def get_user_bypass_codes(self, user_id, limit=None, offset=0): """ Returns a list of bypass codes associated with a user. @@ -995,7 +1010,7 @@ def get_user_bypass_codes(self, user_id, limit=None, offset=0): user_id = six.moves.urllib.parse.quote_plus(str(user_id)) path = '/admin/v1/users/' + user_id + '/bypass_codes' return self.json_api_call( - 'GET', path, {'limit': limit, 'offset': offset}) + 'GET', path, {'limit': limit, 'offset': offset}) return list(self.get_user_bypass_codes_iterator(user_id)) @@ -1030,7 +1045,7 @@ def get_user_phones(self, user_id, limit=None, offset=0): user_id = six.moves.urllib.parse.quote_plus(str(user_id)) path = '/admin/v1/users/' + user_id + '/phones' return self.json_api_call( - 'GET', path, {'limit': limit, 'offset': offset}) + 'GET', path, {'limit': limit, 'offset': offset}) return list(self.get_user_phones_iterator(user_id)) @@ -1048,7 +1063,7 @@ def add_user_phone(self, user_id, phone_id): user_id = six.moves.urllib.parse.quote_plus(str(user_id)) path = '/admin/v1/users/' + user_id + '/phones' params = { - 'phone_id': phone_id, + 'phone_id': phone_id, } return self.json_api_call('POST', path, params) @@ -1067,7 +1082,7 @@ def delete_user_phone(self, user_id, phone_id): path = '/admin/v1/users/' + user_id + '/phones/' + phone_id params = {} return self.json_api_call('DELETE', path, - params) + params) def get_user_tokens_iterator(self, user_id): """ @@ -1100,7 +1115,7 @@ def get_user_tokens(self, user_id, limit=None, offset=0): user_id = six.moves.urllib.parse.quote_plus(str(user_id)) path = '/admin/v1/users/' + user_id + '/tokens' return self.json_api_call( - 'GET', path, {'limit': limit, 'offset': offset}) + 'GET', path, {'limit': limit, 'offset': offset}) return list(self.get_user_tokens_iterator(user_id)) @@ -1118,7 +1133,7 @@ def add_user_token(self, user_id, token_id): user_id = six.moves.urllib.parse.quote_plus(str(user_id)) path = '/admin/v1/users/' + user_id + '/tokens' params = { - 'token_id': token_id, + 'token_id': token_id, } return self.json_api_call('POST', path, params) @@ -1173,7 +1188,7 @@ def get_user_u2ftokens(self, user_id, limit=None, offset=0): user_id = six.moves.urllib.parse.quote_plus(str(user_id)) path = '/admin/v1/users/' + user_id + '/u2ftokens' return self.json_api_call( - 'GET', path, {'limit': limit, 'offset': offset}) + 'GET', path, {'limit': limit, 'offset': offset}) return list(self.get_user_u2ftokens_iterator(user_id)) @@ -1213,7 +1228,7 @@ def get_user_webauthncredentials(self, user_id, limit=None, offset=0): user_id = six.moves.urllib.parse.quote_plus(str(user_id)) path = '/admin/v1/users/' + user_id + '/webauthncredentials' return self.json_api_call( - 'GET', path, {'limit': limit, 'offset': offset}) + 'GET', path, {'limit': limit, 'offset': offset}) return list(self.get_user_webauthncredentials_iterator(user_id)) @@ -1248,7 +1263,7 @@ def get_user_groups(self, user_id, limit=None, offset=0): user_id = six.moves.urllib.parse.quote_plus(str(user_id)) path = '/admin/v1/users/' + user_id + '/groups' return self.json_api_call( - 'GET', path, {'limit': limit, 'offset': offset}) + 'GET', path, {'limit': limit, 'offset': offset}) return list(self.get_user_groups_iterator(user_id)) @@ -1329,9 +1344,9 @@ def get_phones_generator(self): Returns a generator yielding phones. """ return self.json_paging_api_call( - 'GET', - '/admin/v1/phones', - {} + 'GET', + '/admin/v1/phones', + {} ) def get_phones(self, limit=None, offset=0): @@ -1349,9 +1364,9 @@ def get_phones(self, limit=None, offset=0): (limit, offset) = self.normalize_paging_args(limit, offset) if limit: return self.json_api_call( - 'GET', - '/admin/v1/phones', - {'limit': limit, 'offset': offset} + 'GET', + '/admin/v1/phones', + {'limit': limit, 'offset': offset} ) return list(self.get_phones_generator()) @@ -1386,7 +1401,7 @@ def get_phones_by_number(self, number, extension=None): if extension is not None: params['extension'] = extension response = self.json_api_call('GET', path, - params) + params) return response def add_phone(self, @@ -1431,7 +1446,7 @@ def add_phone(self, if postdelay is not None: params['postdelay'] = postdelay response = self.json_api_call('POST', path, - params) + params) return response def update_phone(self, phone_id, @@ -1477,7 +1492,7 @@ def update_phone(self, phone_id, if postdelay is not None: params['postdelay'] = postdelay response = self.json_api_call('POST', path, - params) + params) return response def delete_phone(self, phone_id): @@ -1534,7 +1549,7 @@ def send_sms_activation_to_phone(self, phone_id, if activation_msg is not None: params['activation_msg'] = activation_msg return self.json_api_call('POST', path, - params) + params) def create_activation_url(self, phone_id, valid_secs=None, @@ -1592,9 +1607,9 @@ def get_desktoptokens_generator(self): Returns a generator yielding desktoptokens. """ return self.json_paging_api_call( - 'GET', - '/admin/v1/desktoptokens', - {} + 'GET', + '/admin/v1/desktoptokens', + {} ) def get_desktoptokens(self, limit=None, offset=0): @@ -1613,9 +1628,9 @@ def get_desktoptokens(self, limit=None, offset=0): (limit, offset) = self.normalize_paging_args(limit, offset) if limit: return self.json_api_call( - 'GET', - '/admin/v1/desktoptokens', - {'limit': limit, 'offset': offset} + 'GET', + '/admin/v1/desktoptokens', + {'limit': limit, 'offset': offset} ) return list(self.get_desktoptokens_generator()) @@ -1648,7 +1663,7 @@ def add_desktoptoken(self, Raises RuntimeError on error. """ params = { - 'platform': platform, + 'platform': platform, } if name is not None: params['name'] = name @@ -1716,8 +1731,8 @@ def activate_desktoptoken(self, desktoptoken_id, valid_secs=None): params['valid_secs'] = str(valid_secs) quoted_id = six.moves.urllib.parse.quote_plus(desktoptoken_id) response = self.json_api_call('POST', - '/admin/v1/desktoptokens/%s/activate' % quoted_id, - params) + '/admin/v1/desktoptokens/%s/activate' % quoted_id, + params) return response def get_tokens_generator(self): @@ -1725,9 +1740,9 @@ def get_tokens_generator(self): Returns a generator yielding tokens. """ return self.json_paging_api_call( - 'GET', - '/admin/v1/tokens', - {} + 'GET', + '/admin/v1/tokens', + {} ) def get_tokens(self, limit=None, offset=0): @@ -1745,9 +1760,9 @@ def get_tokens(self, limit=None, offset=0): (limit, offset) = self.normalize_paging_args(limit, offset) if limit: return self.json_api_call( - 'GET', - '/admin/v1/tokens', - {'limit': limit, 'offset': offset} + 'GET', + '/admin/v1/tokens', + {'limit': limit, 'offset': offset} ) return list(self.get_tokens_generator()) @@ -1764,7 +1779,7 @@ def get_token_by_id(self, token_id): path = '/admin/v1/tokens/' + token_id params = {} response = self.json_api_call('GET', path, - params) + params) return response def get_tokens_by_serial(self, type, serial): @@ -1777,8 +1792,8 @@ def get_tokens_by_serial(self, type, serial): Returns a list of 0 or 1 token objects. """ params = { - 'type': type, - 'serial': serial, + 'type': type, + 'serial': serial, } response = self.json_api_call('GET', '/admin/v1/tokens', params) return response @@ -1808,7 +1823,7 @@ def add_hotp6_token(self, serial, secret, counter=None): if counter is not None: params['counter'] = str(int(counter)) response = self.json_api_call('POST', path, - params) + params) return response def add_hotp8_token(self, serial, secret, counter=None): @@ -1826,7 +1841,7 @@ def add_hotp8_token(self, serial, secret, counter=None): if counter is not None: params['counter'] = str(int(counter)) response = self.json_api_call('POST', path, - params) + params) return response def add_totp6_token(self, serial, secret, totp_step=None): @@ -1844,7 +1859,7 @@ def add_totp6_token(self, serial, secret, totp_step=None): if totp_step is not None: params['totp_step'] = str(int(totp_step)) response = self.json_api_call('POST', path, - params) + params) return response def add_totp8_token(self, serial, secret, totp_step=None): @@ -1862,7 +1877,7 @@ def add_totp8_token(self, serial, secret, totp_step=None): if totp_step is not None: params['totp_step'] = str(int(totp_step)) response = self.json_api_call('POST', path, - params) + params) return response def update_token(self, token_id, totp_step=None): @@ -1881,7 +1896,7 @@ def update_token(self, token_id, totp_step=None): if totp_step is not None: params['totp_step'] = totp_step response = self.json_api_call('POST', path, - params) + params) return response def add_yubikey_token(self, serial, private_id, aes_key): @@ -1894,10 +1909,10 @@ def add_yubikey_token(self, serial, private_id, aes_key): Returns newly added token object. """ path = '/admin/v1/tokens' - params = {'type': 'yk', 'serial': serial, 'private_id': private_id, + params = {'type': 'yk', 'serial': serial, 'private_id': private_id, 'aes_key': aes_key} response = self.json_api_call('POST', path, - params) + params) return response def resync_hotp_token(self, token_id, code1, code2, code3): @@ -2037,7 +2052,7 @@ def update_settings(self, params['fraud_email'] = fraud_email if fraud_email_enabled is not None: params['fraud_email_enabled'] = ('1' if - fraud_email_enabled else '0') + fraud_email_enabled else '0') if keypress_confirm is not None: params['keypress_confirm'] = keypress_confirm if keypress_fraud is not None: @@ -2054,16 +2069,16 @@ def update_settings(self, params['minimum_password_length'] = str(minimum_password_length) if password_requires_upper_alpha is not None: params['password_requires_upper_alpha'] = ('1' if - password_requires_upper_alpha else '0') + password_requires_upper_alpha else '0') if password_requires_lower_alpha is not None: params['password_requires_lower_alpha'] = ('1' if - password_requires_lower_alpha else '0') + password_requires_lower_alpha else '0') if password_requires_numeric is not None: params['password_requires_numeric'] = ('1' if - password_requires_numeric else '0') + password_requires_numeric else '0') if password_requires_special is not None: params['password_requires_special'] = ('1' if - password_requires_special else '0') + password_requires_special else '0') if helpdesk_bypass is not None: params['helpdesk_bypass'] = str(helpdesk_bypass) if helpdesk_bypass_expiration is not None: @@ -2072,24 +2087,24 @@ def update_settings(self, params['helpdesk_message'] = str(helpdesk_message) if helpdesk_can_send_enroll_email is not None: params['helpdesk_can_send_enroll_email'] = ('1' if - helpdesk_can_send_enroll_email else '0') + helpdesk_can_send_enroll_email else '0') if reactivation_url is not None: params['reactivation_url'] = reactivation_url if reactivation_integration_key is not None: params['reactivation_integration_key'] = reactivation_integration_key if security_checkup_enabled is not None: params['security_checkup_enabled'] = ('1' if - security_checkup_enabled else '0') + security_checkup_enabled else '0') if user_managers_can_put_users_in_bypass is not None: params['user_managers_can_put_users_in_bypass'] = ('1' if - user_managers_can_put_users_in_bypass else '0') + user_managers_can_put_users_in_bypass else '0') if email_activity_notification_enabled is not None: params['email_activity_notification_enabled'] = ( - '1' if email_activity_notification_enabled else '0' + '1' if email_activity_notification_enabled else '0' ) if push_activity_notification_enabled is not None: params['push_activity_notification_enabled'] = ( - '1' if push_activity_notification_enabled else '0' + '1' if push_activity_notification_enabled else '0' ) if not params: @@ -2101,45 +2116,45 @@ def update_settings(self, return response def set_allowed_admin_auth_methods(self, - push_enabled=None, - sms_enabled=None, - voice_enabled=None, - mobile_otp_enabled=None, - yubikey_enabled=None, - hardware_token_enabled=None, - ): + push_enabled=None, + sms_enabled=None, + voice_enabled=None, + mobile_otp_enabled=None, + yubikey_enabled=None, + hardware_token_enabled=None, + ): params = {} if push_enabled is not None: params['push_enabled'] = ( - '1' if push_enabled else '0') + '1' if push_enabled else '0') if sms_enabled is not None: params['sms_enabled'] = ( - '1' if sms_enabled else '0') + '1' if sms_enabled else '0') if mobile_otp_enabled is not None: params['mobile_otp_enabled'] = ( - '1' if mobile_otp_enabled else '0') + '1' if mobile_otp_enabled else '0') if hardware_token_enabled is not None: params['hardware_token_enabled'] = ( - '1' if hardware_token_enabled else '0') + '1' if hardware_token_enabled else '0') if yubikey_enabled is not None: params['yubikey_enabled'] = ( - '1' if yubikey_enabled else '0') + '1' if yubikey_enabled else '0') if voice_enabled is not None: params['voice_enabled'] = ( - '1' if voice_enabled else '0') + '1' if voice_enabled else '0') response = self.json_api_call( - 'POST', - '/admin/v1/admins/allowed_auth_methods', - params + 'POST', + '/admin/v1/admins/allowed_auth_methods', + params ) return response def get_allowed_admin_auth_methods(self): - params={} + params = {} response = self.json_api_call( - 'GET', - '/admin/v1/admins/allowed_auth_methods', - params + 'GET', + '/admin/v1/admins/allowed_auth_methods', + params ) return response @@ -2152,9 +2167,9 @@ def get_info_summary(self): """ params = {} response = self.json_api_call( - 'GET', - '/admin/v1/info/summary', - params + 'GET', + '/admin/v1/info/summary', + params ) return response @@ -2177,9 +2192,9 @@ def get_info_telephony_credits_used(self, if maxtime is not None: params['maxtime'] = maxtime response = self.json_api_call( - 'GET', - '/admin/v1/info/telephony_credits_used', - params + 'GET', + '/admin/v1/info/telephony_credits_used', + params ) return response @@ -2212,9 +2227,9 @@ def get_authentication_attempts(self, if maxtime is not None: params['maxtime'] = maxtime response = self.json_api_call( - 'GET', - '/admin/v1/info/authentication_attempts', - params + 'GET', + '/admin/v1/info/authentication_attempts', + params ) return response @@ -2249,9 +2264,9 @@ def get_user_authentication_attempts(self, if maxtime is not None: params['maxtime'] = maxtime response = self.json_api_call( - 'GET', - '/admin/v1/info/user_authentication_attempts', - params + 'GET', + '/admin/v1/info/user_authentication_attempts', + params ) return response @@ -2260,9 +2275,9 @@ def get_groups_generator(self): Returns a generator yielding groups. """ return self.json_paging_api_call( - 'GET', - '/admin/v1/groups', - {} + 'GET', + '/admin/v1/groups', + {} ) def get_groups_by_group_ids(self, group_ids): @@ -2277,9 +2292,9 @@ def get_groups_by_group_ids(self, group_ids): """ group_id_list = json.dumps(group_ids) return self.json_api_call( - 'GET', - '/admin/v1/groups', - {'group_id_list': group_id_list} + 'GET', + '/admin/v1/groups', + {'group_id_list': group_id_list} ) def get_groups(self, limit=None, offset=0): @@ -2298,9 +2313,9 @@ def get_groups(self, limit=None, offset=0): (limit, offset) = self.normalize_paging_args(limit, offset) if limit: return self.json_api_call( - 'GET', - '/admin/v1/groups', - {'limit': limit, 'offset': offset} + 'GET', + '/admin/v1/groups', + {'limit': limit, 'offset': offset} ) return list(self.get_groups_generator()) @@ -2320,10 +2335,10 @@ def get_group(self, group_id, api_version=1): if api_version == 1: url = '/admin/v1/groups/' warnings.warn( - 'The v1 Admin API for group details will be deprecated ' - 'in a future release of the Duo Admin API. Please migrate to ' - 'the v2 API.', - DeprecationWarning) + 'The v1 Admin API for group details will be deprecated ' + 'in a future release of the Duo Admin API. Please migrate to ' + 'the v2 API.', + DeprecationWarning) elif api_version == 2: url = '/admin/v2/groups/' else: @@ -2343,12 +2358,12 @@ def get_group_users(self, group_id, limit=None, offset=0): (limit, offset) = self.normalize_paging_args(limit, offset) if limit: return self.json_api_call( - 'GET', - '/admin/v2/groups/' + group_id + '/users', - { - 'limit': limit, - 'offset': offset, - }) + 'GET', + '/admin/v2/groups/' + group_id + '/users', + { + 'limit': limit, + 'offset': offset, + }) return list(self.get_group_users_iterator(group_id)) def get_group_users_iterator(self, group_id): @@ -2358,20 +2373,20 @@ def get_group_users_iterator(self, group_id): group_id - The id of the group (Required) """ return self.json_paging_api_call( - 'GET', - '/admin/v2/groups/' + group_id + '/users', - {} + 'GET', + '/admin/v2/groups/' + group_id + '/users', + {} ) def create_group(self, name, - desc=None, - status=None, - push_enabled=None, - sms_enabled=None, - voice_enabled=None, - mobile_otp_enabled=None, - u2f_enabled=None, - ): + desc=None, + status=None, + push_enabled=None, + sms_enabled=None, + voice_enabled=None, + mobile_otp_enabled=None, + u2f_enabled=None, + ): """ Create a new group. @@ -2384,7 +2399,7 @@ def create_group(self, name, mobile_otp_enabled - Mobile OTP restriction <>True/False (Optional) """ params = { - 'name': name, + 'name': name, } if desc is not None: params['desc'] = desc @@ -2401,9 +2416,9 @@ def create_group(self, name, if u2f_enabled is not None: params['u2f_enabled'] = '1' if u2f_enabled else '0' response = self.json_api_call( - 'POST', - '/admin/v1/groups', - params + 'POST', + '/admin/v1/groups', + params ) return response @@ -2414,9 +2429,9 @@ def delete_group(self, group_id): group_id - The id of the group (Required) """ return self.json_api_call( - 'DELETE', - '/admin/v1/groups/' + group_id, - {} + 'DELETE', + '/admin/v1/groups/' + group_id, + {} ) def modify_group(self, @@ -2429,7 +2444,7 @@ def modify_group(self, voice_enabled=None, mobile_otp_enabled=None, u2f_enabled=None, - ): + ): """ Modify a group @@ -2461,9 +2476,9 @@ def modify_group(self, if u2f_enabled is not None: params['u2f_enabled'] = '1' if u2f_enabled else '0' response = self.json_api_call( - 'POST', - '/admin/v1/groups/' + group_id, - params + 'POST', + '/admin/v1/groups/' + group_id, + params ) return response @@ -2472,9 +2487,9 @@ def get_integrations_generator(self): Returns a generator yielding integrations. """ return self.json_paging_api_call( - 'GET', - '/admin/v2/integrations', - {}, + 'GET', + '/admin/v2/integrations', + {}, ) def get_integrations(self, limit=None, offset=0): @@ -2492,9 +2507,9 @@ def get_integrations(self, limit=None, offset=0): (limit, offset) = self.normalize_paging_args(limit, offset) if limit: return self.json_api_call( - 'GET', - '/admin/v2/integrations', - {'limit': limit, 'offset': offset}, + 'GET', + '/admin/v2/integrations', + {'limit': limit, 'offset': offset}, ) return list(self.get_integrations_generator()) @@ -2511,9 +2526,9 @@ def get_integration(self, integration_key): """ params = {} response = self.json_api_call( - 'GET', - '/admin/v2/integrations/' + integration_key, - params, + 'GET', + '/admin/v2/integrations/' + integration_key, + params, ) return response @@ -2603,12 +2618,12 @@ def create_integration(self, params['adminapi_read_log'] = '1' if adminapi_read_log else '0' if adminapi_read_resource is not None: params['adminapi_read_resource'] = ( - '1' if adminapi_read_resource else '0') + '1' if adminapi_read_resource else '0') if adminapi_settings is not None: params['adminapi_settings'] = '1' if adminapi_settings else '0' if adminapi_write_resource is not None: params['adminapi_write_resource'] = ( - '1' if adminapi_write_resource else '0') + '1' if adminapi_write_resource else '0') if groups_allowed is not None: params['groups_allowed'] = groups_allowed if self_service_allowed is not None: @@ -2618,7 +2633,7 @@ def create_integration(self, response = self.json_api_call('POST', '/admin/v2/integrations', params, - ) + ) return response def delete_integration(self, integration_key): @@ -2632,9 +2647,9 @@ def delete_integration(self, integration_key): integration_key = six.moves.urllib.parse.quote_plus(str(integration_key)) path = '/admin/v2/integrations/%s' % integration_key return self.json_api_call( - 'DELETE', - path, - {}, + 'DELETE', + path, + {}, ) def update_integration(self, @@ -2727,12 +2742,12 @@ def update_integration(self, params['adminapi_read_log'] = '1' if adminapi_read_log else '0' if adminapi_read_resource is not None: params['adminapi_read_resource'] = ( - '1' if adminapi_read_resource else '0') + '1' if adminapi_read_resource else '0') if adminapi_settings is not None: params['adminapi_settings'] = '1' if adminapi_settings else '0' if adminapi_write_resource is not None: params['adminapi_write_resource'] = ( - '1' if adminapi_write_resource else '0') + '1' if adminapi_write_resource else '0') if reset_secret_key is not None: params['reset_secret_key'] = '1' if groups_allowed is not None: @@ -2746,9 +2761,9 @@ def update_integration(self, raise TypeError("No new values were provided") response = self.json_api_call( - 'POST', - path, - params, + 'POST', + path, + params, ) return response @@ -2769,9 +2784,9 @@ def get_admins(self, limit=None, offset=0): (limit, offset) = self.normalize_paging_args(limit, offset) if limit: return self.json_api_call( - 'GET', - '/admin/v1/admins', - {'limit': limit, 'offset': offset} + 'GET', + '/admin/v1/admins', + {'limit': limit, 'offset': offset} ) iterator = self.get_admins_iterator() @@ -2953,13 +2968,13 @@ def get_external_password_mgmt_statuses(self, limit=None, offset=0): (limit, offset) = self.normalize_paging_args(limit, offset) if limit: return self.json_api_call( - 'GET', - '/admin/v1/admins/password_mgmt', - {'limit': limit, 'offset': offset} + 'GET', + '/admin/v1/admins/password_mgmt', + {'limit': limit, 'offset': offset} ) iterator = self.json_paging_api_call( - 'GET', '/admin/v1/admins/password_mgmt', {}) + 'GET', '/admin/v1/admins/password_mgmt', {}) return list(iterator) @@ -2981,8 +2996,8 @@ def get_external_password_mgmt_status_for_admin(self, admin_id): return response def update_admin_password_mgmt_status( - self, admin_id, has_external_password_mgmt=None, - password=None): + self, admin_id, has_external_password_mgmt=None, + password=None): """ Enable or disable an admin for external password management, and optionally set the password for an admin @@ -3033,7 +3048,7 @@ def update_logo(self, logo): Raises RuntimeError on error. """ params = { - 'logo': base64.b64encode(logo).decode(), + 'logo': base64.b64encode(logo).decode(), } return self.json_api_call('POST', '/admin/v1/logo', params) @@ -3182,9 +3197,9 @@ def get_bypass_codes_generator(self): Returns a generator yielding bypass codes. """ return self.json_paging_api_call( - 'GET', - '/admin/v1/bypass_codes', - {} + 'GET', + '/admin/v1/bypass_codes', + {} ) def get_bypass_codes(self, limit=None, offset=0): @@ -3203,9 +3218,9 @@ def get_bypass_codes(self, limit=None, offset=0): (limit, offset) = self.normalize_paging_args(limit, offset) if limit: return self.json_api_call( - 'GET', - '/admin/v1/bypass_codes', - {'limit': limit, 'offset': offset} + 'GET', + '/admin/v1/bypass_codes', + {'limit': limit, 'offset': offset} ) return list(self.get_bypass_codes_generator()) @@ -3237,19 +3252,19 @@ def sync_user(self, username, directory_key): Raises RuntimeError on error. """ params = { - 'username': username, + 'username': username, } directory_key = six.moves.urllib.parse.quote_plus(directory_key) path = ( - '/admin/v1/users/directorysync/{directory_key}/syncuser').format( + '/admin/v1/users/directorysync/{directory_key}/syncuser').format( directory_key=directory_key) return self.json_api_call('POST', path, params) def get_trust_monitor_events_iterator( - self, - mintime, - maxtime, - event_type=None, + self, + mintime, + maxtime, + event_type=None, ): """ Returns a generator which yields trust monitor events. @@ -3272,27 +3287,27 @@ def get_trust_monitor_events_iterator( """ params = { - "mintime": "{}".format(mintime), - "maxtime": "{}".format(maxtime), + "mintime": "{}".format(mintime), + "maxtime": "{}".format(maxtime), } if event_type is not None: params["type"] = event_type return self.json_cursor_api_call( - "GET", - "/admin/v1/trust_monitor/events", - params, - lambda resp: resp["events"], + "GET", + "/admin/v1/trust_monitor/events", + params, + lambda resp: resp["events"], ) def get_trust_monitor_events_by_offset( - self, - mintime, - maxtime, - limit=None, - offset=None, - event_type=None, + self, + mintime, + maxtime, + limit=None, + offset=None, + event_type=None, ): """ Fetch Duo Trust Monitor Events from the Admin API. @@ -3319,8 +3334,8 @@ def get_trust_monitor_events_by_offset( """ params = { - "mintime": "{}".format(mintime), - "maxtime": "{}".format(maxtime), + "mintime": "{}".format(mintime), + "maxtime": "{}".format(maxtime), } if limit is not None: @@ -3333,9 +3348,9 @@ def get_trust_monitor_events_by_offset( params["type"] = event_type return self.json_api_call( - "GET", - "/admin/v1/trust_monitor/events", - params, + "GET", + "/admin/v1/trust_monitor/events", + params, ) def _quote_policy_id(self, policy_key): @@ -3348,9 +3363,9 @@ def get_policies_v2_iterator(self): """ return self.json_paging_api_call( - "GET", - "/admin/v2/policies", - {}, + "GET", + "/admin/v2/policies", + {}, ) def get_policies_v2(self, limit=None, offset=0): @@ -3366,9 +3381,9 @@ def get_policies_v2(self, limit=None, offset=0): (limit, offset) = self.normalize_paging_args(limit, offset) if limit: return self.json_api_call( - "GET", - "/admin/v2/policies", - {"limit": limit, "offset": offset}, + "GET", + "/admin/v2/policies", + {"limit": limit, "offset": offset}, ) return list(self.get_policies_v2_iterator()) @@ -3396,8 +3411,8 @@ def update_policy_v2(self, policy_key, json_request): path = "/admin/v2/policies/" + self._quote_policy_id(policy_key) response = self.json_api_call("PUT", path, json_request) return response - - def update_policies_v2(self, sections, sections_to_delete, + + def update_policies_v2(self, sections, sections_to_delete, edit_list, edit_all_policies=False): """ Update the contents of multiple policies. @@ -3413,14 +3428,14 @@ def update_policies_v2(self, sections, sections_to_delete, """ path = "/admin/v2/policies/update" params = { - "policies_to_update": { - "edit_all_policies": edit_all_policies, - "edit_list": edit_list, - }, - "policy_changes": { - "sections": sections, - "sections_to_delete": sections_to_delete, - }, + "policies_to_update": { + "edit_all_policies": edit_all_policies, + "edit_list": edit_list, + }, + "policy_changes": { + "sections": sections, + "sections_to_delete": sections_to_delete, + }, } response = self.json_api_call("PUT", path, params) return response @@ -3435,7 +3450,7 @@ def create_policy_v2(self, json_request): path = "/admin/v2/policies" response = self.json_api_call("POST", path, json_request) return response - + def copy_policy_v2(self, policy_key, new_policy_names_list): """ Copy policy to multiple new policies. @@ -3449,8 +3464,8 @@ def copy_policy_v2(self, policy_key, new_policy_names_list): """ path = "/admin/v2/policies/copy" params = { - "policy_key": policy_key, - "new_policy_names_list": new_policy_names_list + "policy_key": policy_key, + "new_policy_names_list": new_policy_names_list } response = self.json_api_call("POST", path, params) return response @@ -3465,7 +3480,7 @@ def get_policy_v2(self, policy_key): path = "/admin/v2/policies/" + self._quote_policy_id(policy_key) response = self.json_api_call("GET", path, {}) return response - + def get_policy_summary_v2(self): """ Returns (dict) - summary of all policies and the applications @@ -3488,17 +3503,17 @@ def __init__(self, account_id, child_api_host=None, **kwargs): See the Client base class for other parameters. """ if not child_api_host: - child_api_host = Accounts.child_map.get(account_id, None) + child_api_host = Accounts.child_map.get(account_id, None) if child_api_host is None: child_api_host = kwargs.get('host') try: accounts_api = Accounts(**kwargs) accounts_api.get_child_accounts() - child_api_host = Accounts.child_map.get(account_id, kwargs['host']) + child_api_host = Accounts.child_map.get(account_id, kwargs['host']) except RuntimeError: pass kwargs['host'] = child_api_host - + super(AccountAdmin, self).__init__(**kwargs) self.account_id = account_id @@ -3526,7 +3541,7 @@ def set_edition(self, edition): Raises RuntimeError on error. """ params = { - 'edition': edition, + 'edition': edition, } return self.json_api_call('POST', @@ -3542,9 +3557,8 @@ def get_telephony_credits(self): Raises RuntimeError on error. """ return self.json_api_call('GET', - '/admin/v1/billing/telephony_credits', - params={}) - + '/admin/v1/billing/telephony_credits', + params={}) def set_telephony_credits(self, credits): """ @@ -3558,8 +3572,8 @@ def set_telephony_credits(self, credits): Raises RuntimeError on error. """ params = { - 'credits': str(credits), + 'credits': str(credits), } return self.json_api_call('POST', - '/admin/v1/billing/telephony_credits', - params) + '/admin/v1/billing/telephony_credits', + params) diff --git a/examples/report_user_by_email.py b/examples/report_user_by_email.py index 8704772..d8e135a 100755 --- a/examples/report_user_by_email.py +++ b/examples/report_user_by_email.py @@ -35,13 +35,7 @@ def main(): # Retrieve user info from API: email_address = get_next_arg('E-mail address of user to retrieve: ') - req_params = {"email": email_address} - # There is no get_user_by_email in the duo_client_python library, so we use the generic api_call - user = admin_api.json_api_call( - method='GET', - path='/admin/v1/users', - params=req_params - ) + user = admin_api.get_user_by_email(email_address) if user: pprint(user, indent=2) From 023cbe61d6ac5fb759f96a2e558ed4492e338351 Mon Sep 17 00:00:00 2001 From: Mark Tripod Date: Tue, 12 Dec 2023 09:08:55 -0500 Subject: [PATCH 3/7] Revert "feat: add get_user_by_email() method to admin.py" This reverts commit 48580c1cd1fdc7465762e1bcf180f9b0d88c5b70. --- duo_client/admin.py | 552 +++++++++++++++---------------- examples/report_user_by_email.py | 8 +- 2 files changed, 276 insertions(+), 284 deletions(-) diff --git a/duo_client/admin.py b/duo_client/admin.py index ee50170..884c6c3 100644 --- a/duo_client/admin.py +++ b/duo_client/admin.py @@ -194,19 +194,19 @@ TOKEN_YUBIKEY = "yk" VALID_AUTHLOG_REQUEST_PARAMS = [ - "mintime", - "maxtime", - "limit", - "sort", - "next_offset", - "event_types", - "reasons", - "results", - "users", - "applications", - "groups", - "factors", - "api_version", + "mintime", + "maxtime", + "limit", + "sort", + "next_offset", + "event_types", + "reasons", + "results", + "users", + "applications", + "groups", + "factors", + "api_version", ] VALID_ACTIVITY_REQUEST_PARAMS = ["mintime", "maxtime", "limit", "sort", "next_offset"] @@ -220,11 +220,12 @@ def api_call(self, method, path, params): params['account_id'] = self.account_id return super(Admin, self).api_call( - method, - path, - params, + method, + path, + params, ) + @classmethod def _canonicalize_ip_whitelist(klass, ip_whitelist): if isinstance(ip_whitelist, six.string_types): @@ -279,7 +280,7 @@ def get_administrative_units(self, admin_id=None, group_id=None, params) iterator = self.get_administrative_units_iterator( - admin_id, group_id, integration_key) + admin_id, group_id, integration_key) return list(iterator) @@ -338,12 +339,12 @@ def get_administrator_log(self, # Sanity check mintime as unix timestamp, then transform to string mintime = str(int(mintime)) params = { - 'mintime': mintime, + 'mintime': mintime, } response = self.json_api_call( - 'GET', - '/admin/v1/logs/administrator', - params, + 'GET', + '/admin/v1/logs/administrator', + params, ) for row in response: row['eventtype'] = 'administrator' @@ -377,12 +378,12 @@ def get_offline_log(self, # Sanity check mintime as unix timestamp, then transform to string mintime = str(int(mintime)) params = { - 'mintime': mintime, + 'mintime': mintime, } response = self.json_api_call( - 'GET', - '/admin/v1/logs/offline_enrollment', - params, + 'GET', + '/admin/v1/logs/offline_enrollment', + params, ) return response @@ -485,21 +486,21 @@ def get_authentication_log(self, api_version=1, **kwargs): Raises RuntimeError on error. """ - if api_version not in [1, 2]: + if api_version not in [1,2]: raise ValueError("Invalid API Version") params = {} - if api_version == 1: # v1 + if api_version == 1: #v1 params['mintime'] = kwargs['mintime'] if 'mintime' in kwargs else 0; # Sanity check mintime as unix timestamp, then transform to string params['mintime'] = '{:d}'.format(int(params['mintime'])) warnings.warn( - 'The v1 Admin API for retrieving authentication log events ' - 'will be deprecated in a future release of the Duo Admin API. ' - 'Please migrate to the v2 API.', - DeprecationWarning) - else: # v2 + 'The v1 Admin API for retrieving authentication log events ' + 'will be deprecated in a future release of the Duo Admin API. ' + 'Please migrate to the v2 API.', + DeprecationWarning) + else: #v2 for k in kwargs: if kwargs[k] is not None and k in VALID_AUTHLOG_REQUEST_PARAMS: params[k] = kwargs[k] @@ -509,15 +510,17 @@ def get_authentication_log(self, api_version=1, **kwargs): # Sanity check mintime as unix timestamp, then transform to string params['mintime'] = '{:d}'.format(int(params['mintime'])) + if 'maxtime' not in params: params['maxtime'] = int(time.time()) * 1000 # Sanity check maxtime as unix timestamp, then transform to string params['maxtime'] = '{:d}'.format(int(params['maxtime'])) + response = self.json_api_call( - 'GET', - '/admin/v{}/logs/authentication'.format(api_version), - params, + 'GET', + '/admin/v{}/logs/authentication'.format(api_version), + params, ) if api_version == 1: @@ -610,16 +613,16 @@ def get_activity_logs(self, **kwargs): params['mintime'] = default_mintime params['mintime'] = str(int(params['mintime'])) if 'maxtime' not in params: - # if maxtime is not provided, the script defaults it to now + #if maxtime is not provided, the script defaults it to now params['maxtime'] = default_maxtime params['maxtime'] = str(int(params['maxtime'])) if 'limit' in params: params['limit'] = str(int(params['limit'])) response = self.json_api_call( - 'GET', - '/admin/v2/logs/activity', - params, + 'GET', + '/admin/v2/logs/activity', + params, ) for row in response['items']: row['eventtype'] = 'activity' @@ -678,7 +681,7 @@ def get_telephony_log(self, mintime=0, api_version=1, **kwargs): Raises RuntimeError on error. """ - if api_version not in [1, 2]: + if api_version not in [1,2]: raise ValueError("Invalid API Version") if api_version == 2: @@ -706,28 +709,10 @@ def get_users(self, limit=None, offset=0): (limit, offset) = self.normalize_paging_args(limit, offset) if limit: return self.json_api_call( - 'GET', '/admin/v1/users', {'limit': limit, 'offset': offset}) + 'GET', '/admin/v1/users', {'limit': limit, 'offset': offset}) return list(self.get_users_iterator()) - def get_user_by_email(self, email_address: str) -> list: - """ - Returns user specified by email_address. - - email_address - E-mail address of user to fetch - - Returns a list of 0 or 1 user objects. - - Raises RuntimeError on error. - """ - params = { - 'email': email_address, - } - response = self.json_api_call('GET', - '/admin/v1/users', - params) - return response - def get_user_by_id(self, user_id): """ Returns user specified by user_id. @@ -754,7 +739,7 @@ def get_users_by_name(self, username): Raises RuntimeError on error. """ params = { - 'username': username, + 'username': username, } response = self.json_api_call('GET', '/admin/v1/users', @@ -773,7 +758,7 @@ def get_users_by_names(self, usernames): """ username_list = json.dumps(usernames) params = { - 'username_list': username_list, + 'username_list': username_list, } response = self.json_paging_api_call('GET', '/admin/v1/users', @@ -792,7 +777,7 @@ def get_users_by_ids(self, user_ids): """ user_id_list = json.dumps(user_ids) params = { - 'user_id_list': user_id_list, + 'user_id_list': user_id_list, } response = self.json_paging_api_call('GET', '/admin/v1/users', @@ -821,7 +806,7 @@ def add_user(self, username, realname=None, status=None, Raises RuntimeError on error. """ params = { - 'username': username, + 'username': username, } if realname is not None: params['realname'] = realname @@ -929,8 +914,8 @@ def enroll_user(self, username, email, valid_secs=None): """ path = '/admin/v1/users/enroll' params = { - 'username': username, - 'email': email, + 'username': username, + 'email': email, } if valid_secs is not None: @@ -938,8 +923,7 @@ def enroll_user(self, username, email, valid_secs=None): return self.json_api_call('POST', path, params) - def add_user_bypass_codes(self, user_id, count=None, valid_secs=None, remaining_uses=None, codes=None, - preserve_existing=None): + def add_user_bypass_codes(self, user_id, count=None, valid_secs=None, remaining_uses=None, codes=None, preserve_existing=None): """ Replace a user's bypass codes with new codes. @@ -969,7 +953,7 @@ def add_user_bypass_codes(self, user_id, count=None, valid_secs=None, remaining_ if codes is not None: params['codes'] = self._canonicalize_bypass_codes(codes) - + if preserve_existing is not None: params['preserve_existing'] = preserve_existing @@ -991,6 +975,7 @@ def get_user_bypass_codes_iterator(self, user_id): path = '/admin/v1/users/' + user_id + '/bypass_codes' return self.json_paging_api_call('GET', path, {}) + def get_user_bypass_codes(self, user_id, limit=None, offset=0): """ Returns a list of bypass codes associated with a user. @@ -1010,7 +995,7 @@ def get_user_bypass_codes(self, user_id, limit=None, offset=0): user_id = six.moves.urllib.parse.quote_plus(str(user_id)) path = '/admin/v1/users/' + user_id + '/bypass_codes' return self.json_api_call( - 'GET', path, {'limit': limit, 'offset': offset}) + 'GET', path, {'limit': limit, 'offset': offset}) return list(self.get_user_bypass_codes_iterator(user_id)) @@ -1045,7 +1030,7 @@ def get_user_phones(self, user_id, limit=None, offset=0): user_id = six.moves.urllib.parse.quote_plus(str(user_id)) path = '/admin/v1/users/' + user_id + '/phones' return self.json_api_call( - 'GET', path, {'limit': limit, 'offset': offset}) + 'GET', path, {'limit': limit, 'offset': offset}) return list(self.get_user_phones_iterator(user_id)) @@ -1063,7 +1048,7 @@ def add_user_phone(self, user_id, phone_id): user_id = six.moves.urllib.parse.quote_plus(str(user_id)) path = '/admin/v1/users/' + user_id + '/phones' params = { - 'phone_id': phone_id, + 'phone_id': phone_id, } return self.json_api_call('POST', path, params) @@ -1082,7 +1067,7 @@ def delete_user_phone(self, user_id, phone_id): path = '/admin/v1/users/' + user_id + '/phones/' + phone_id params = {} return self.json_api_call('DELETE', path, - params) + params) def get_user_tokens_iterator(self, user_id): """ @@ -1115,7 +1100,7 @@ def get_user_tokens(self, user_id, limit=None, offset=0): user_id = six.moves.urllib.parse.quote_plus(str(user_id)) path = '/admin/v1/users/' + user_id + '/tokens' return self.json_api_call( - 'GET', path, {'limit': limit, 'offset': offset}) + 'GET', path, {'limit': limit, 'offset': offset}) return list(self.get_user_tokens_iterator(user_id)) @@ -1133,7 +1118,7 @@ def add_user_token(self, user_id, token_id): user_id = six.moves.urllib.parse.quote_plus(str(user_id)) path = '/admin/v1/users/' + user_id + '/tokens' params = { - 'token_id': token_id, + 'token_id': token_id, } return self.json_api_call('POST', path, params) @@ -1188,7 +1173,7 @@ def get_user_u2ftokens(self, user_id, limit=None, offset=0): user_id = six.moves.urllib.parse.quote_plus(str(user_id)) path = '/admin/v1/users/' + user_id + '/u2ftokens' return self.json_api_call( - 'GET', path, {'limit': limit, 'offset': offset}) + 'GET', path, {'limit': limit, 'offset': offset}) return list(self.get_user_u2ftokens_iterator(user_id)) @@ -1228,7 +1213,7 @@ def get_user_webauthncredentials(self, user_id, limit=None, offset=0): user_id = six.moves.urllib.parse.quote_plus(str(user_id)) path = '/admin/v1/users/' + user_id + '/webauthncredentials' return self.json_api_call( - 'GET', path, {'limit': limit, 'offset': offset}) + 'GET', path, {'limit': limit, 'offset': offset}) return list(self.get_user_webauthncredentials_iterator(user_id)) @@ -1263,7 +1248,7 @@ def get_user_groups(self, user_id, limit=None, offset=0): user_id = six.moves.urllib.parse.quote_plus(str(user_id)) path = '/admin/v1/users/' + user_id + '/groups' return self.json_api_call( - 'GET', path, {'limit': limit, 'offset': offset}) + 'GET', path, {'limit': limit, 'offset': offset}) return list(self.get_user_groups_iterator(user_id)) @@ -1344,9 +1329,9 @@ def get_phones_generator(self): Returns a generator yielding phones. """ return self.json_paging_api_call( - 'GET', - '/admin/v1/phones', - {} + 'GET', + '/admin/v1/phones', + {} ) def get_phones(self, limit=None, offset=0): @@ -1364,9 +1349,9 @@ def get_phones(self, limit=None, offset=0): (limit, offset) = self.normalize_paging_args(limit, offset) if limit: return self.json_api_call( - 'GET', - '/admin/v1/phones', - {'limit': limit, 'offset': offset} + 'GET', + '/admin/v1/phones', + {'limit': limit, 'offset': offset} ) return list(self.get_phones_generator()) @@ -1401,7 +1386,7 @@ def get_phones_by_number(self, number, extension=None): if extension is not None: params['extension'] = extension response = self.json_api_call('GET', path, - params) + params) return response def add_phone(self, @@ -1446,7 +1431,7 @@ def add_phone(self, if postdelay is not None: params['postdelay'] = postdelay response = self.json_api_call('POST', path, - params) + params) return response def update_phone(self, phone_id, @@ -1492,7 +1477,7 @@ def update_phone(self, phone_id, if postdelay is not None: params['postdelay'] = postdelay response = self.json_api_call('POST', path, - params) + params) return response def delete_phone(self, phone_id): @@ -1549,7 +1534,7 @@ def send_sms_activation_to_phone(self, phone_id, if activation_msg is not None: params['activation_msg'] = activation_msg return self.json_api_call('POST', path, - params) + params) def create_activation_url(self, phone_id, valid_secs=None, @@ -1607,9 +1592,9 @@ def get_desktoptokens_generator(self): Returns a generator yielding desktoptokens. """ return self.json_paging_api_call( - 'GET', - '/admin/v1/desktoptokens', - {} + 'GET', + '/admin/v1/desktoptokens', + {} ) def get_desktoptokens(self, limit=None, offset=0): @@ -1628,9 +1613,9 @@ def get_desktoptokens(self, limit=None, offset=0): (limit, offset) = self.normalize_paging_args(limit, offset) if limit: return self.json_api_call( - 'GET', - '/admin/v1/desktoptokens', - {'limit': limit, 'offset': offset} + 'GET', + '/admin/v1/desktoptokens', + {'limit': limit, 'offset': offset} ) return list(self.get_desktoptokens_generator()) @@ -1663,7 +1648,7 @@ def add_desktoptoken(self, Raises RuntimeError on error. """ params = { - 'platform': platform, + 'platform': platform, } if name is not None: params['name'] = name @@ -1731,8 +1716,8 @@ def activate_desktoptoken(self, desktoptoken_id, valid_secs=None): params['valid_secs'] = str(valid_secs) quoted_id = six.moves.urllib.parse.quote_plus(desktoptoken_id) response = self.json_api_call('POST', - '/admin/v1/desktoptokens/%s/activate' % quoted_id, - params) + '/admin/v1/desktoptokens/%s/activate' % quoted_id, + params) return response def get_tokens_generator(self): @@ -1740,9 +1725,9 @@ def get_tokens_generator(self): Returns a generator yielding tokens. """ return self.json_paging_api_call( - 'GET', - '/admin/v1/tokens', - {} + 'GET', + '/admin/v1/tokens', + {} ) def get_tokens(self, limit=None, offset=0): @@ -1760,9 +1745,9 @@ def get_tokens(self, limit=None, offset=0): (limit, offset) = self.normalize_paging_args(limit, offset) if limit: return self.json_api_call( - 'GET', - '/admin/v1/tokens', - {'limit': limit, 'offset': offset} + 'GET', + '/admin/v1/tokens', + {'limit': limit, 'offset': offset} ) return list(self.get_tokens_generator()) @@ -1779,7 +1764,7 @@ def get_token_by_id(self, token_id): path = '/admin/v1/tokens/' + token_id params = {} response = self.json_api_call('GET', path, - params) + params) return response def get_tokens_by_serial(self, type, serial): @@ -1792,8 +1777,8 @@ def get_tokens_by_serial(self, type, serial): Returns a list of 0 or 1 token objects. """ params = { - 'type': type, - 'serial': serial, + 'type': type, + 'serial': serial, } response = self.json_api_call('GET', '/admin/v1/tokens', params) return response @@ -1823,7 +1808,7 @@ def add_hotp6_token(self, serial, secret, counter=None): if counter is not None: params['counter'] = str(int(counter)) response = self.json_api_call('POST', path, - params) + params) return response def add_hotp8_token(self, serial, secret, counter=None): @@ -1841,7 +1826,7 @@ def add_hotp8_token(self, serial, secret, counter=None): if counter is not None: params['counter'] = str(int(counter)) response = self.json_api_call('POST', path, - params) + params) return response def add_totp6_token(self, serial, secret, totp_step=None): @@ -1859,7 +1844,7 @@ def add_totp6_token(self, serial, secret, totp_step=None): if totp_step is not None: params['totp_step'] = str(int(totp_step)) response = self.json_api_call('POST', path, - params) + params) return response def add_totp8_token(self, serial, secret, totp_step=None): @@ -1877,7 +1862,7 @@ def add_totp8_token(self, serial, secret, totp_step=None): if totp_step is not None: params['totp_step'] = str(int(totp_step)) response = self.json_api_call('POST', path, - params) + params) return response def update_token(self, token_id, totp_step=None): @@ -1896,7 +1881,7 @@ def update_token(self, token_id, totp_step=None): if totp_step is not None: params['totp_step'] = totp_step response = self.json_api_call('POST', path, - params) + params) return response def add_yubikey_token(self, serial, private_id, aes_key): @@ -1909,10 +1894,10 @@ def add_yubikey_token(self, serial, private_id, aes_key): Returns newly added token object. """ path = '/admin/v1/tokens' - params = {'type': 'yk', 'serial': serial, 'private_id': private_id, + params = {'type': 'yk', 'serial': serial, 'private_id': private_id, 'aes_key': aes_key} response = self.json_api_call('POST', path, - params) + params) return response def resync_hotp_token(self, token_id, code1, code2, code3): @@ -2052,7 +2037,7 @@ def update_settings(self, params['fraud_email'] = fraud_email if fraud_email_enabled is not None: params['fraud_email_enabled'] = ('1' if - fraud_email_enabled else '0') + fraud_email_enabled else '0') if keypress_confirm is not None: params['keypress_confirm'] = keypress_confirm if keypress_fraud is not None: @@ -2069,16 +2054,16 @@ def update_settings(self, params['minimum_password_length'] = str(minimum_password_length) if password_requires_upper_alpha is not None: params['password_requires_upper_alpha'] = ('1' if - password_requires_upper_alpha else '0') + password_requires_upper_alpha else '0') if password_requires_lower_alpha is not None: params['password_requires_lower_alpha'] = ('1' if - password_requires_lower_alpha else '0') + password_requires_lower_alpha else '0') if password_requires_numeric is not None: params['password_requires_numeric'] = ('1' if - password_requires_numeric else '0') + password_requires_numeric else '0') if password_requires_special is not None: params['password_requires_special'] = ('1' if - password_requires_special else '0') + password_requires_special else '0') if helpdesk_bypass is not None: params['helpdesk_bypass'] = str(helpdesk_bypass) if helpdesk_bypass_expiration is not None: @@ -2087,24 +2072,24 @@ def update_settings(self, params['helpdesk_message'] = str(helpdesk_message) if helpdesk_can_send_enroll_email is not None: params['helpdesk_can_send_enroll_email'] = ('1' if - helpdesk_can_send_enroll_email else '0') + helpdesk_can_send_enroll_email else '0') if reactivation_url is not None: params['reactivation_url'] = reactivation_url if reactivation_integration_key is not None: params['reactivation_integration_key'] = reactivation_integration_key if security_checkup_enabled is not None: params['security_checkup_enabled'] = ('1' if - security_checkup_enabled else '0') + security_checkup_enabled else '0') if user_managers_can_put_users_in_bypass is not None: params['user_managers_can_put_users_in_bypass'] = ('1' if - user_managers_can_put_users_in_bypass else '0') + user_managers_can_put_users_in_bypass else '0') if email_activity_notification_enabled is not None: params['email_activity_notification_enabled'] = ( - '1' if email_activity_notification_enabled else '0' + '1' if email_activity_notification_enabled else '0' ) if push_activity_notification_enabled is not None: params['push_activity_notification_enabled'] = ( - '1' if push_activity_notification_enabled else '0' + '1' if push_activity_notification_enabled else '0' ) if not params: @@ -2116,45 +2101,45 @@ def update_settings(self, return response def set_allowed_admin_auth_methods(self, - push_enabled=None, - sms_enabled=None, - voice_enabled=None, - mobile_otp_enabled=None, - yubikey_enabled=None, - hardware_token_enabled=None, - ): + push_enabled=None, + sms_enabled=None, + voice_enabled=None, + mobile_otp_enabled=None, + yubikey_enabled=None, + hardware_token_enabled=None, + ): params = {} if push_enabled is not None: params['push_enabled'] = ( - '1' if push_enabled else '0') + '1' if push_enabled else '0') if sms_enabled is not None: params['sms_enabled'] = ( - '1' if sms_enabled else '0') + '1' if sms_enabled else '0') if mobile_otp_enabled is not None: params['mobile_otp_enabled'] = ( - '1' if mobile_otp_enabled else '0') + '1' if mobile_otp_enabled else '0') if hardware_token_enabled is not None: params['hardware_token_enabled'] = ( - '1' if hardware_token_enabled else '0') + '1' if hardware_token_enabled else '0') if yubikey_enabled is not None: params['yubikey_enabled'] = ( - '1' if yubikey_enabled else '0') + '1' if yubikey_enabled else '0') if voice_enabled is not None: params['voice_enabled'] = ( - '1' if voice_enabled else '0') + '1' if voice_enabled else '0') response = self.json_api_call( - 'POST', - '/admin/v1/admins/allowed_auth_methods', - params + 'POST', + '/admin/v1/admins/allowed_auth_methods', + params ) return response def get_allowed_admin_auth_methods(self): - params = {} + params={} response = self.json_api_call( - 'GET', - '/admin/v1/admins/allowed_auth_methods', - params + 'GET', + '/admin/v1/admins/allowed_auth_methods', + params ) return response @@ -2167,9 +2152,9 @@ def get_info_summary(self): """ params = {} response = self.json_api_call( - 'GET', - '/admin/v1/info/summary', - params + 'GET', + '/admin/v1/info/summary', + params ) return response @@ -2192,9 +2177,9 @@ def get_info_telephony_credits_used(self, if maxtime is not None: params['maxtime'] = maxtime response = self.json_api_call( - 'GET', - '/admin/v1/info/telephony_credits_used', - params + 'GET', + '/admin/v1/info/telephony_credits_used', + params ) return response @@ -2227,9 +2212,9 @@ def get_authentication_attempts(self, if maxtime is not None: params['maxtime'] = maxtime response = self.json_api_call( - 'GET', - '/admin/v1/info/authentication_attempts', - params + 'GET', + '/admin/v1/info/authentication_attempts', + params ) return response @@ -2264,9 +2249,9 @@ def get_user_authentication_attempts(self, if maxtime is not None: params['maxtime'] = maxtime response = self.json_api_call( - 'GET', - '/admin/v1/info/user_authentication_attempts', - params + 'GET', + '/admin/v1/info/user_authentication_attempts', + params ) return response @@ -2275,9 +2260,9 @@ def get_groups_generator(self): Returns a generator yielding groups. """ return self.json_paging_api_call( - 'GET', - '/admin/v1/groups', - {} + 'GET', + '/admin/v1/groups', + {} ) def get_groups_by_group_ids(self, group_ids): @@ -2292,9 +2277,9 @@ def get_groups_by_group_ids(self, group_ids): """ group_id_list = json.dumps(group_ids) return self.json_api_call( - 'GET', - '/admin/v1/groups', - {'group_id_list': group_id_list} + 'GET', + '/admin/v1/groups', + {'group_id_list': group_id_list} ) def get_groups(self, limit=None, offset=0): @@ -2313,9 +2298,9 @@ def get_groups(self, limit=None, offset=0): (limit, offset) = self.normalize_paging_args(limit, offset) if limit: return self.json_api_call( - 'GET', - '/admin/v1/groups', - {'limit': limit, 'offset': offset} + 'GET', + '/admin/v1/groups', + {'limit': limit, 'offset': offset} ) return list(self.get_groups_generator()) @@ -2335,10 +2320,10 @@ def get_group(self, group_id, api_version=1): if api_version == 1: url = '/admin/v1/groups/' warnings.warn( - 'The v1 Admin API for group details will be deprecated ' - 'in a future release of the Duo Admin API. Please migrate to ' - 'the v2 API.', - DeprecationWarning) + 'The v1 Admin API for group details will be deprecated ' + 'in a future release of the Duo Admin API. Please migrate to ' + 'the v2 API.', + DeprecationWarning) elif api_version == 2: url = '/admin/v2/groups/' else: @@ -2358,12 +2343,12 @@ def get_group_users(self, group_id, limit=None, offset=0): (limit, offset) = self.normalize_paging_args(limit, offset) if limit: return self.json_api_call( - 'GET', - '/admin/v2/groups/' + group_id + '/users', - { - 'limit': limit, - 'offset': offset, - }) + 'GET', + '/admin/v2/groups/' + group_id + '/users', + { + 'limit': limit, + 'offset': offset, + }) return list(self.get_group_users_iterator(group_id)) def get_group_users_iterator(self, group_id): @@ -2373,20 +2358,20 @@ def get_group_users_iterator(self, group_id): group_id - The id of the group (Required) """ return self.json_paging_api_call( - 'GET', - '/admin/v2/groups/' + group_id + '/users', - {} + 'GET', + '/admin/v2/groups/' + group_id + '/users', + {} ) def create_group(self, name, - desc=None, - status=None, - push_enabled=None, - sms_enabled=None, - voice_enabled=None, - mobile_otp_enabled=None, - u2f_enabled=None, - ): + desc=None, + status=None, + push_enabled=None, + sms_enabled=None, + voice_enabled=None, + mobile_otp_enabled=None, + u2f_enabled=None, + ): """ Create a new group. @@ -2399,7 +2384,7 @@ def create_group(self, name, mobile_otp_enabled - Mobile OTP restriction <>True/False (Optional) """ params = { - 'name': name, + 'name': name, } if desc is not None: params['desc'] = desc @@ -2416,9 +2401,9 @@ def create_group(self, name, if u2f_enabled is not None: params['u2f_enabled'] = '1' if u2f_enabled else '0' response = self.json_api_call( - 'POST', - '/admin/v1/groups', - params + 'POST', + '/admin/v1/groups', + params ) return response @@ -2429,9 +2414,9 @@ def delete_group(self, group_id): group_id - The id of the group (Required) """ return self.json_api_call( - 'DELETE', - '/admin/v1/groups/' + group_id, - {} + 'DELETE', + '/admin/v1/groups/' + group_id, + {} ) def modify_group(self, @@ -2444,7 +2429,7 @@ def modify_group(self, voice_enabled=None, mobile_otp_enabled=None, u2f_enabled=None, - ): + ): """ Modify a group @@ -2476,9 +2461,9 @@ def modify_group(self, if u2f_enabled is not None: params['u2f_enabled'] = '1' if u2f_enabled else '0' response = self.json_api_call( - 'POST', - '/admin/v1/groups/' + group_id, - params + 'POST', + '/admin/v1/groups/' + group_id, + params ) return response @@ -2487,9 +2472,9 @@ def get_integrations_generator(self): Returns a generator yielding integrations. """ return self.json_paging_api_call( - 'GET', - '/admin/v2/integrations', - {}, + 'GET', + '/admin/v2/integrations', + {}, ) def get_integrations(self, limit=None, offset=0): @@ -2507,9 +2492,9 @@ def get_integrations(self, limit=None, offset=0): (limit, offset) = self.normalize_paging_args(limit, offset) if limit: return self.json_api_call( - 'GET', - '/admin/v2/integrations', - {'limit': limit, 'offset': offset}, + 'GET', + '/admin/v2/integrations', + {'limit': limit, 'offset': offset}, ) return list(self.get_integrations_generator()) @@ -2526,9 +2511,9 @@ def get_integration(self, integration_key): """ params = {} response = self.json_api_call( - 'GET', - '/admin/v2/integrations/' + integration_key, - params, + 'GET', + '/admin/v2/integrations/' + integration_key, + params, ) return response @@ -2618,12 +2603,12 @@ def create_integration(self, params['adminapi_read_log'] = '1' if adminapi_read_log else '0' if adminapi_read_resource is not None: params['adminapi_read_resource'] = ( - '1' if adminapi_read_resource else '0') + '1' if adminapi_read_resource else '0') if adminapi_settings is not None: params['adminapi_settings'] = '1' if adminapi_settings else '0' if adminapi_write_resource is not None: params['adminapi_write_resource'] = ( - '1' if adminapi_write_resource else '0') + '1' if adminapi_write_resource else '0') if groups_allowed is not None: params['groups_allowed'] = groups_allowed if self_service_allowed is not None: @@ -2633,7 +2618,7 @@ def create_integration(self, response = self.json_api_call('POST', '/admin/v2/integrations', params, - ) + ) return response def delete_integration(self, integration_key): @@ -2647,9 +2632,9 @@ def delete_integration(self, integration_key): integration_key = six.moves.urllib.parse.quote_plus(str(integration_key)) path = '/admin/v2/integrations/%s' % integration_key return self.json_api_call( - 'DELETE', - path, - {}, + 'DELETE', + path, + {}, ) def update_integration(self, @@ -2742,12 +2727,12 @@ def update_integration(self, params['adminapi_read_log'] = '1' if adminapi_read_log else '0' if adminapi_read_resource is not None: params['adminapi_read_resource'] = ( - '1' if adminapi_read_resource else '0') + '1' if adminapi_read_resource else '0') if adminapi_settings is not None: params['adminapi_settings'] = '1' if adminapi_settings else '0' if adminapi_write_resource is not None: params['adminapi_write_resource'] = ( - '1' if adminapi_write_resource else '0') + '1' if adminapi_write_resource else '0') if reset_secret_key is not None: params['reset_secret_key'] = '1' if groups_allowed is not None: @@ -2761,9 +2746,9 @@ def update_integration(self, raise TypeError("No new values were provided") response = self.json_api_call( - 'POST', - path, - params, + 'POST', + path, + params, ) return response @@ -2784,9 +2769,9 @@ def get_admins(self, limit=None, offset=0): (limit, offset) = self.normalize_paging_args(limit, offset) if limit: return self.json_api_call( - 'GET', - '/admin/v1/admins', - {'limit': limit, 'offset': offset} + 'GET', + '/admin/v1/admins', + {'limit': limit, 'offset': offset} ) iterator = self.get_admins_iterator() @@ -2968,13 +2953,13 @@ def get_external_password_mgmt_statuses(self, limit=None, offset=0): (limit, offset) = self.normalize_paging_args(limit, offset) if limit: return self.json_api_call( - 'GET', - '/admin/v1/admins/password_mgmt', - {'limit': limit, 'offset': offset} + 'GET', + '/admin/v1/admins/password_mgmt', + {'limit': limit, 'offset': offset} ) iterator = self.json_paging_api_call( - 'GET', '/admin/v1/admins/password_mgmt', {}) + 'GET', '/admin/v1/admins/password_mgmt', {}) return list(iterator) @@ -2996,8 +2981,8 @@ def get_external_password_mgmt_status_for_admin(self, admin_id): return response def update_admin_password_mgmt_status( - self, admin_id, has_external_password_mgmt=None, - password=None): + self, admin_id, has_external_password_mgmt=None, + password=None): """ Enable or disable an admin for external password management, and optionally set the password for an admin @@ -3048,7 +3033,7 @@ def update_logo(self, logo): Raises RuntimeError on error. """ params = { - 'logo': base64.b64encode(logo).decode(), + 'logo': base64.b64encode(logo).decode(), } return self.json_api_call('POST', '/admin/v1/logo', params) @@ -3197,9 +3182,9 @@ def get_bypass_codes_generator(self): Returns a generator yielding bypass codes. """ return self.json_paging_api_call( - 'GET', - '/admin/v1/bypass_codes', - {} + 'GET', + '/admin/v1/bypass_codes', + {} ) def get_bypass_codes(self, limit=None, offset=0): @@ -3218,9 +3203,9 @@ def get_bypass_codes(self, limit=None, offset=0): (limit, offset) = self.normalize_paging_args(limit, offset) if limit: return self.json_api_call( - 'GET', - '/admin/v1/bypass_codes', - {'limit': limit, 'offset': offset} + 'GET', + '/admin/v1/bypass_codes', + {'limit': limit, 'offset': offset} ) return list(self.get_bypass_codes_generator()) @@ -3252,19 +3237,19 @@ def sync_user(self, username, directory_key): Raises RuntimeError on error. """ params = { - 'username': username, + 'username': username, } directory_key = six.moves.urllib.parse.quote_plus(directory_key) path = ( - '/admin/v1/users/directorysync/{directory_key}/syncuser').format( + '/admin/v1/users/directorysync/{directory_key}/syncuser').format( directory_key=directory_key) return self.json_api_call('POST', path, params) def get_trust_monitor_events_iterator( - self, - mintime, - maxtime, - event_type=None, + self, + mintime, + maxtime, + event_type=None, ): """ Returns a generator which yields trust monitor events. @@ -3287,27 +3272,27 @@ def get_trust_monitor_events_iterator( """ params = { - "mintime": "{}".format(mintime), - "maxtime": "{}".format(maxtime), + "mintime": "{}".format(mintime), + "maxtime": "{}".format(maxtime), } if event_type is not None: params["type"] = event_type return self.json_cursor_api_call( - "GET", - "/admin/v1/trust_monitor/events", - params, - lambda resp: resp["events"], + "GET", + "/admin/v1/trust_monitor/events", + params, + lambda resp: resp["events"], ) def get_trust_monitor_events_by_offset( - self, - mintime, - maxtime, - limit=None, - offset=None, - event_type=None, + self, + mintime, + maxtime, + limit=None, + offset=None, + event_type=None, ): """ Fetch Duo Trust Monitor Events from the Admin API. @@ -3334,8 +3319,8 @@ def get_trust_monitor_events_by_offset( """ params = { - "mintime": "{}".format(mintime), - "maxtime": "{}".format(maxtime), + "mintime": "{}".format(mintime), + "maxtime": "{}".format(maxtime), } if limit is not None: @@ -3348,9 +3333,9 @@ def get_trust_monitor_events_by_offset( params["type"] = event_type return self.json_api_call( - "GET", - "/admin/v1/trust_monitor/events", - params, + "GET", + "/admin/v1/trust_monitor/events", + params, ) def _quote_policy_id(self, policy_key): @@ -3363,9 +3348,9 @@ def get_policies_v2_iterator(self): """ return self.json_paging_api_call( - "GET", - "/admin/v2/policies", - {}, + "GET", + "/admin/v2/policies", + {}, ) def get_policies_v2(self, limit=None, offset=0): @@ -3381,9 +3366,9 @@ def get_policies_v2(self, limit=None, offset=0): (limit, offset) = self.normalize_paging_args(limit, offset) if limit: return self.json_api_call( - "GET", - "/admin/v2/policies", - {"limit": limit, "offset": offset}, + "GET", + "/admin/v2/policies", + {"limit": limit, "offset": offset}, ) return list(self.get_policies_v2_iterator()) @@ -3411,8 +3396,8 @@ def update_policy_v2(self, policy_key, json_request): path = "/admin/v2/policies/" + self._quote_policy_id(policy_key) response = self.json_api_call("PUT", path, json_request) return response - - def update_policies_v2(self, sections, sections_to_delete, + + def update_policies_v2(self, sections, sections_to_delete, edit_list, edit_all_policies=False): """ Update the contents of multiple policies. @@ -3428,14 +3413,14 @@ def update_policies_v2(self, sections, sections_to_delete, """ path = "/admin/v2/policies/update" params = { - "policies_to_update": { - "edit_all_policies": edit_all_policies, - "edit_list": edit_list, - }, - "policy_changes": { - "sections": sections, - "sections_to_delete": sections_to_delete, - }, + "policies_to_update": { + "edit_all_policies": edit_all_policies, + "edit_list": edit_list, + }, + "policy_changes": { + "sections": sections, + "sections_to_delete": sections_to_delete, + }, } response = self.json_api_call("PUT", path, params) return response @@ -3450,7 +3435,7 @@ def create_policy_v2(self, json_request): path = "/admin/v2/policies" response = self.json_api_call("POST", path, json_request) return response - + def copy_policy_v2(self, policy_key, new_policy_names_list): """ Copy policy to multiple new policies. @@ -3464,8 +3449,8 @@ def copy_policy_v2(self, policy_key, new_policy_names_list): """ path = "/admin/v2/policies/copy" params = { - "policy_key": policy_key, - "new_policy_names_list": new_policy_names_list + "policy_key": policy_key, + "new_policy_names_list": new_policy_names_list } response = self.json_api_call("POST", path, params) return response @@ -3480,7 +3465,7 @@ def get_policy_v2(self, policy_key): path = "/admin/v2/policies/" + self._quote_policy_id(policy_key) response = self.json_api_call("GET", path, {}) return response - + def get_policy_summary_v2(self): """ Returns (dict) - summary of all policies and the applications @@ -3503,17 +3488,17 @@ def __init__(self, account_id, child_api_host=None, **kwargs): See the Client base class for other parameters. """ if not child_api_host: - child_api_host = Accounts.child_map.get(account_id, None) + child_api_host = Accounts.child_map.get(account_id, None) if child_api_host is None: child_api_host = kwargs.get('host') try: accounts_api = Accounts(**kwargs) accounts_api.get_child_accounts() - child_api_host = Accounts.child_map.get(account_id, kwargs['host']) + child_api_host = Accounts.child_map.get(account_id, kwargs['host']) except RuntimeError: pass kwargs['host'] = child_api_host - + super(AccountAdmin, self).__init__(**kwargs) self.account_id = account_id @@ -3541,7 +3526,7 @@ def set_edition(self, edition): Raises RuntimeError on error. """ params = { - 'edition': edition, + 'edition': edition, } return self.json_api_call('POST', @@ -3557,8 +3542,9 @@ def get_telephony_credits(self): Raises RuntimeError on error. """ return self.json_api_call('GET', - '/admin/v1/billing/telephony_credits', - params={}) + '/admin/v1/billing/telephony_credits', + params={}) + def set_telephony_credits(self, credits): """ @@ -3572,8 +3558,8 @@ def set_telephony_credits(self, credits): Raises RuntimeError on error. """ params = { - 'credits': str(credits), + 'credits': str(credits), } return self.json_api_call('POST', - '/admin/v1/billing/telephony_credits', - params) + '/admin/v1/billing/telephony_credits', + params) diff --git a/examples/report_user_by_email.py b/examples/report_user_by_email.py index d8e135a..8704772 100755 --- a/examples/report_user_by_email.py +++ b/examples/report_user_by_email.py @@ -35,7 +35,13 @@ def main(): # Retrieve user info from API: email_address = get_next_arg('E-mail address of user to retrieve: ') - user = admin_api.get_user_by_email(email_address) + req_params = {"email": email_address} + # There is no get_user_by_email in the duo_client_python library, so we use the generic api_call + user = admin_api.json_api_call( + method='GET', + path='/admin/v1/users', + params=req_params + ) if user: pprint(user, indent=2) From d436244baa823f10705795ef4afdb5f8e3584d73 Mon Sep 17 00:00:00 2001 From: Mark Tripod Date: Tue, 12 Dec 2023 09:12:18 -0500 Subject: [PATCH 4/7] feat: add get_user_by_email() method to admin.py doc: add report_user_by_email.py to examples --- duo_client/admin.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/duo_client/admin.py b/duo_client/admin.py index 884c6c3..8e44953 100644 --- a/duo_client/admin.py +++ b/duo_client/admin.py @@ -728,6 +728,24 @@ def get_user_by_id(self, user_id): response = self.json_api_call('GET', path, {}) return response + def get_user_by_email(self, email): + """ + Returns user specified by email. + + email - User to fetch + + Returns user object. + + Raises RuntimeError on error. + """ + params = { + 'email': email, + } + response = self.json_api_call('GET', + '/admin/v1/users', + params) + return response + def get_users_by_name(self, username): """ Returns user specified by username. From 9a398af29df293f1e985b42e03d1658f96e1fe10 Mon Sep 17 00:00:00 2001 From: Mark Tripod Date: Tue, 12 Dec 2023 11:56:31 -0500 Subject: [PATCH 5/7] doc: update report_user_by_email.py example to use get_user_by_email() method instead of generic json_api_call() method. --- examples/report_user_by_email.py | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/examples/report_user_by_email.py b/examples/report_user_by_email.py index 8704772..6c971a3 100755 --- a/examples/report_user_by_email.py +++ b/examples/report_user_by_email.py @@ -5,7 +5,6 @@ from __future__ import absolute_import, print_function import sys import getpass -from pprint import pprint import duo_client from six.moves import input @@ -35,16 +34,10 @@ def main(): # Retrieve user info from API: email_address = get_next_arg('E-mail address of user to retrieve: ') - req_params = {"email": email_address} - # There is no get_user_by_email in the duo_client_python library, so we use the generic api_call - user = admin_api.json_api_call( - method='GET', - path='/admin/v1/users', - params=req_params - ) + user = admin_api.get_user_by_email(email_address) if user: - pprint(user, indent=2) + print(user) else: print(f"User with email [{email_address}] could not be found.") From 7a81bd77bfe479f1c0b2c368215a0cf3be0517d9 Mon Sep 17 00:00:00 2001 From: Mark Tripod Date: Fri, 15 Dec 2023 12:38:35 -0500 Subject: [PATCH 6/7] chore: reorganize examples into client specific folders and add Auth API examples for user authentication --- .../create_integration_sso_generic.py | 0 .../{ => Admin API}/create_user_and_phone.py | 0 .../get_billing_and_telephony_credits.py | 0 examples/{ => Admin API}/log_examples.py | 0 examples/{ => Admin API}/policies.py | 0 .../report_auths_by_country.py | 0 .../{ => Admin API}/report_user_by_email.py | 0 .../report_users_and_phones.py | 0 .../{ => Admin API}/trust_monitor_events.py | 0 examples/Auth API/async_basic_user_mfa.py | 96 ++++++++++++++++++ examples/Auth API/basic_user_mfa.py | 99 +++++++++++++++++++ examples/Auth API/basic_user_mfa_token.py | 93 +++++++++++++++++ 12 files changed, 288 insertions(+) rename examples/{ => Admin API}/create_integration_sso_generic.py (100%) rename examples/{ => Admin API}/create_user_and_phone.py (100%) rename examples/{ => Admin API}/get_billing_and_telephony_credits.py (100%) rename examples/{ => Admin API}/log_examples.py (100%) rename examples/{ => Admin API}/policies.py (100%) rename examples/{ => Admin API}/report_auths_by_country.py (100%) rename examples/{ => Admin API}/report_user_by_email.py (100%) rename examples/{ => Admin API}/report_users_and_phones.py (100%) rename examples/{ => Admin API}/trust_monitor_events.py (100%) create mode 100644 examples/Auth API/async_basic_user_mfa.py create mode 100644 examples/Auth API/basic_user_mfa.py create mode 100644 examples/Auth API/basic_user_mfa_token.py diff --git a/examples/create_integration_sso_generic.py b/examples/Admin API/create_integration_sso_generic.py similarity index 100% rename from examples/create_integration_sso_generic.py rename to examples/Admin API/create_integration_sso_generic.py diff --git a/examples/create_user_and_phone.py b/examples/Admin API/create_user_and_phone.py similarity index 100% rename from examples/create_user_and_phone.py rename to examples/Admin API/create_user_and_phone.py diff --git a/examples/get_billing_and_telephony_credits.py b/examples/Admin API/get_billing_and_telephony_credits.py similarity index 100% rename from examples/get_billing_and_telephony_credits.py rename to examples/Admin API/get_billing_and_telephony_credits.py diff --git a/examples/log_examples.py b/examples/Admin API/log_examples.py similarity index 100% rename from examples/log_examples.py rename to examples/Admin API/log_examples.py diff --git a/examples/policies.py b/examples/Admin API/policies.py similarity index 100% rename from examples/policies.py rename to examples/Admin API/policies.py diff --git a/examples/report_auths_by_country.py b/examples/Admin API/report_auths_by_country.py similarity index 100% rename from examples/report_auths_by_country.py rename to examples/Admin API/report_auths_by_country.py diff --git a/examples/report_user_by_email.py b/examples/Admin API/report_user_by_email.py similarity index 100% rename from examples/report_user_by_email.py rename to examples/Admin API/report_user_by_email.py diff --git a/examples/report_users_and_phones.py b/examples/Admin API/report_users_and_phones.py similarity index 100% rename from examples/report_users_and_phones.py rename to examples/Admin API/report_users_and_phones.py diff --git a/examples/trust_monitor_events.py b/examples/Admin API/trust_monitor_events.py similarity index 100% rename from examples/trust_monitor_events.py rename to examples/Admin API/trust_monitor_events.py diff --git a/examples/Auth API/async_basic_user_mfa.py b/examples/Auth API/async_basic_user_mfa.py new file mode 100644 index 0000000..77da9b3 --- /dev/null +++ b/examples/Auth API/async_basic_user_mfa.py @@ -0,0 +1,96 @@ +""" +Example of Duo Auth API user authentication using asynchronous resquest/response methods +""" + +import duo_client +import os +import sys +import getpass + + +argv_iter = iter(sys.argv[1:]) + + +def _get_next_arg(prompt, secure=False): + """Read information from STDIN, using getpass when sensitive information should not be echoed to tty""" + try: + return next(argv_iter) + except StopIteration: + if secure is True: + return getpass.getpass(prompt) + else: + return input(prompt) + + +def prompt_for_credentials() -> dict: + """Collect required API credentials from command line prompts + + :return: dictionary containing Duo Admin API ikey, skey and hostname strings + """ + + ikey = _get_next_arg('Duo Admin API integration key ("DI..."): ') + skey = _get_next_arg('Duo Admin API integration secret key: ', secure=True) + host = _get_next_arg('Duo Admin API hostname ("api-....duosecurity.com"): ') + username = _get_next_arg('Duo Username: ') + + return {"USERNAME": username, "IKEY": ikey, "SKEY": skey, "APIHOST": host} + + +def main(): + """Main program entry point""" + + inputs = prompt_for_credentials() + + auth_client = duo_client.Auth( + ikey=inputs['IKEY'], + skey=inputs['SKEY'], + host=inputs['APIHOST'] + ) + + # Verify that the Duo service is available + duo_ping = auth_client.ping() + if 'time' in duo_ping: + print("\nDuo service check completed successfully.") + else: + print(f"Error: {duo_ping}") + + # Verify that IKEY and SKEY information provided are valid + duo_check= auth_client.check() + if 'time' in duo_check: + print("IKEY and SKEY provided have been verified.") + else: + print(f"Error: {duo_check}") + + # Execute pre-authentication for given user + print(f"\nExecuting pre-authentication for {inputs['USERNAME']}...") + pre_auth = auth_client.preauth(username=inputs['USERNAME']) + + if pre_auth['result'] == "auth": + try: + print(f"Executing authentication action for {inputs['USERNAME']}...") + auth = auth_client.auth(factor="push", username=inputs['USERNAME'], device="auto", async_txn=True) + if 'txid' in auth: + waiting = True + # Collect the authentication result + print("Getting authentication result...") + # Repeat long polling for async authentication status until no longer in a 'waiting' state + while waiting is True: + # Poll Duo Auth API for the status of the async authentication based upon transaction ID + auth_status = auth_client.auth_status(auth['txid']) + print(f"Auth status: {auth_status}") + if auth_status['waiting'] is not True: + # Waiting for response too async authentication is no longer 'True', so break the loop + waiting = False + # Parse response for the 'status' dictionary key to determine whether to allow or deny + print(auth_status) + else: + # Some kind of unexpected error occurred + print(f"Error: an unknown error occurred attempting authentication for [{inputs['USERNAME']}]") + except Exception as e_str: + print(e_str) + else: + print(pre_auth['status_msg']) + + +if __name__ == '__main__': + main() diff --git a/examples/Auth API/basic_user_mfa.py b/examples/Auth API/basic_user_mfa.py new file mode 100644 index 0000000..8e84984 --- /dev/null +++ b/examples/Auth API/basic_user_mfa.py @@ -0,0 +1,99 @@ +""" +Example of Duo Auth API uaer authentication with synchronous request/response +""" + +import duo_client +import os +import sys +import getpass + +from pprint import pprint + + +argv_iter = iter(sys.argv[1:]) + + +def _get_next_arg(prompt, secure=False): + """Read information from STDIN, using getpass when sensitive information should not be echoed to tty""" + try: + return next(argv_iter) + except StopIteration: + if secure is True: + return getpass.getpass(prompt) + else: + return input(prompt) + + +def prompt_for_credentials() -> dict: + """Collect required API credentials from command line prompts + + :return: dictionary containing Duo Admin API ikey, skey and hostname strings + """ + + ikey = _get_next_arg('Duo Admin API integration key ("DI..."): ') + skey = _get_next_arg('Duo Admin API integration secret key: ', secure=True) + host = _get_next_arg('Duo Admin API hostname ("api-....duosecurity.com"): ') + username = _get_next_arg('Duo Username: ') + + return {"USERNAME": username, "IKEY": ikey, "SKEY": skey, "APIHOST": host} + + +def main(): + """Main program entry point""" + + inputs = prompt_for_credentials() + + auth_client = duo_client.Auth( + ikey=inputs['IKEY'], + skey=inputs['SKEY'], + host=inputs['APIHOST'] + ) + + # Verify that the Duo service is available + duo_ping = auth_client.ping() + if 'time' in duo_ping: + print("\nDuo service check completed successfully.") + else: + print(f"Error: {duo_ping}") + + # Verify that IKEY and SKEY information provided are valid + duo_check= auth_client.check() + if 'time' in duo_check: + print("IKEY and SKEY provided have been verified.") + else: + print(f"Error: {duo_check}") + + # Execute pre-authentication for given user + print(f"\nExecuting pre-authentication for {inputs['USERNAME']}...") + pre_auth = auth_client.preauth(username=inputs['USERNAME']) + + if pre_auth['result'] == "auth": + try: + # User exists and has an MFA device enrolled + print(f"Executing authentication action for {inputs['USERNAME']}...") + # "auto" is selected for the factor in this example, however the pre_auth['devices'] dictionary + # element contains a list of factors available for the provided user, if an alternate method is desired + auth = auth_client.auth(factor="auto", username=inputs['USERNAME'], device="auto") + print(f"\n{auth['status_msg']}") + except Exception as e_str: + print(e_str) + elif pre_auth['result'] == "allow": + # User is in bypass mode + print(pre_auth['status_msg']) + elif pre_auth['result'] == "enroll": + # User is unknown and not enrolled in Duo with a 'New User' policy setting of 'Require Enrollment' + # Setting a 'New User' policy to 'Require Enrollment' should only be done for Group level policies where + # the intent is to capture "partially enrolled" users. "Parially enrolled" users are those that Duo has a + # defined username for but does not have an MFA device enrolled. + print("Please enroll in Duo using the following URL.") + print(pre_auth['enroll_portal_url']) + elif pre_auth['result'] == "deny": + # User is denied by policy setting + print(pre_auth['status_msg']) + else: + print("Error: an unexpected error occurred") + print(pre_auth) + + +if __name__ == '__main__': + main() diff --git a/examples/Auth API/basic_user_mfa_token.py b/examples/Auth API/basic_user_mfa_token.py new file mode 100644 index 0000000..d376628 --- /dev/null +++ b/examples/Auth API/basic_user_mfa_token.py @@ -0,0 +1,93 @@ +""" +Example of Duo Auth API uaer authentication with synchronous request/response using an assigned token +as the MFA factor +""" + +import duo_client +import os +import sys +import getpass + +from pprint import pprint + + +argv_iter = iter(sys.argv[1:]) + + +def _get_next_arg(prompt, secure=False): + """Read information from STDIN, using getpass when sensitive information should not be echoed to tty""" + try: + return next(argv_iter) + except StopIteration: + if secure is True: + return getpass.getpass(prompt) + else: + return input(prompt) + + +def prompt_for_credentials() -> dict: + """Collect required API credentials from command line prompts + + :return: dictionary containing Duo Admin API ikey, skey and hostname strings + """ + + ikey = _get_next_arg('Duo Admin API integration key ("DI..."): ') + skey = _get_next_arg('Duo Admin API integration secret key: ', secure=True) + host = _get_next_arg('Duo Admin API hostname ("api-....duosecurity.com"): ') + username = _get_next_arg('Duo Username: ') + + return {"USERNAME": username, "IKEY": ikey, "SKEY": skey, "APIHOST": host} + + +def main(): + """Main program entry point""" + + inputs = prompt_for_credentials() + + auth_client = duo_client.Auth( + ikey=inputs['IKEY'], + skey=inputs['SKEY'], + host=inputs['APIHOST'] + ) + + # Verify that the Duo service is available + duo_ping = auth_client.ping() + if 'time' in duo_ping: + print("\nDuo service check completed successfully.") + else: + print(f"Error: {duo_ping}") + + # Verify that IKEY and SKEY information provided are valid + duo_check= auth_client.check() + if 'time' in duo_check: + print("IKEY and SKEY provided have been verified.") + else: + print(f"Error: {duo_check}") + + # Execute pre-authentication for given user + print(f"\nExecuting pre-authentication for {inputs['USERNAME']}...") + pre_auth = auth_client.preauth(username=inputs['USERNAME']) + + print("\n" + "=" * 30) + pprint(f"Pre-Auth result: {pre_auth}") + print("=" * 30 + "\n") + + for device in pre_auth['devices']: + pprint(device) + print() + + if pre_auth['result'] == "auth": + try: + print(f"Executing authentication action for {inputs['USERNAME']}...") + # Prompt for the hardware token passcode + passcode = _get_next_arg('Duo token passcode: ') + auth = auth_client.auth(factor="passcode", username=inputs['USERNAME'], passcode=passcode) + print(f"\n{auth['status_msg']}") + except Exception as e_str: + print(e_str) + else: + print(pre_auth) + + +if __name__ == '__main__': + main() From 8a0747b6a09c2d811aea7bea072686cdd89083d3 Mon Sep 17 00:00:00 2001 From: Mark Tripod Date: Mon, 18 Dec 2023 12:46:21 -0500 Subject: [PATCH 7/7] chore: reorganize examples into client specific folders. add Auth API examples for user authentication and Accounts API examples for managing child accounts --- examples/Accounts API/create_child_account.py | 63 +++++++++++++++++ examples/Accounts API/delete_child_account.py | 70 +++++++++++++++++++ .../Accounts API/retrieve_account_list.py | 64 +++++++++++++++++ examples/Auth API/async_basic_user_mfa.py | 8 +-- examples/Auth API/basic_user_mfa.py | 10 +-- examples/Auth API/basic_user_mfa_token.py | 8 +-- 6 files changed, 210 insertions(+), 13 deletions(-) create mode 100644 examples/Accounts API/create_child_account.py create mode 100644 examples/Accounts API/delete_child_account.py create mode 100644 examples/Accounts API/retrieve_account_list.py diff --git a/examples/Accounts API/create_child_account.py b/examples/Accounts API/create_child_account.py new file mode 100644 index 0000000..38e6966 --- /dev/null +++ b/examples/Accounts API/create_child_account.py @@ -0,0 +1,63 @@ +""" +Example of Duo Accounts API child account creation +""" + +import duo_client +import os +import sys +import getpass + +from pprint import pprint + + +argv_iter = iter(sys.argv[1:]) + + +def _get_next_arg(prompt, secure=False): + """Read information from STDIN, using getpass when sensitive information should not be echoed to tty""" + try: + return next(argv_iter) + except StopIteration: + if secure is True: + return getpass.getpass(prompt) + else: + return input(prompt) + + +def prompt_for_credentials() -> dict: + """Collect required API credentials from command line prompts + + :return: dictionary containing Duo Accounts API ikey, skey and hostname strings + """ + + ikey = _get_next_arg('Duo Accounts API integration key ("DI..."): ') + skey = _get_next_arg('Duo Accounts API integration secret key: ', secure=True) + host = _get_next_arg('Duo Accounts API hostname ("api-....duosecurity.com"): ') + account_name = _get_next_arg('Name for new child account: ') + + return {"IKEY": ikey, "SKEY": skey, "APIHOST": host, "ACCOUNT_NAME": account_name} + + +def main(): + """Main program entry point""" + + inputs = prompt_for_credentials() + + account_client = duo_client.Accounts( + ikey=inputs['IKEY'], + skey=inputs['SKEY'], + host=inputs['APIHOST'] + ) + + print(f"Creating child account with name [{inputs['ACCOUNT_NAME']}]") + child_account = account_client.create_account(inputs['ACCOUNT_NAME']) + + if 'account_id' in child_account: + print(f"Child account for [{inputs['ACCOUNT_NAME']}] created successfully.") + else: + print(f"An unexpected error occurred while creating child account for {inputs['ACCOUNT_NAME']}") + print(child_account) + + +if __name__ == '__main__': + main() diff --git a/examples/Accounts API/delete_child_account.py b/examples/Accounts API/delete_child_account.py new file mode 100644 index 0000000..07ea663 --- /dev/null +++ b/examples/Accounts API/delete_child_account.py @@ -0,0 +1,70 @@ +""" +Example of Duo Accounts API child account deletiom +""" + +import duo_client +import os +import sys +import getpass + +from pprint import pprint + + +argv_iter = iter(sys.argv[1:]) + + +def _get_next_arg(prompt, secure=False): + """Read information from STDIN, using getpass when sensitive information should not be echoed to tty""" + try: + return next(argv_iter) + except StopIteration: + if secure is True: + return getpass.getpass(prompt) + else: + return input(prompt) + + +def prompt_for_credentials() -> dict: + """Collect required API credentials from command line prompts + + :return: dictionary containing Duo Accounts API ikey, skey and hostname strings + """ + + ikey = _get_next_arg('Duo Accounts API integration key ("DI..."): ') + skey = _get_next_arg('Duo Accounts API integration secret key: ', secure=True) + host = _get_next_arg('Duo Accounts API hostname ("api-....duosecurity.com"): ') + account_id = _get_next_arg('ID of child account to delete: ') + + return {"IKEY": ikey, "SKEY": skey, "APIHOST": host, "ACCOUNT_ID": account_id} + + +def main(): + """Main program entry point""" + + inputs = prompt_for_credentials() + + account_client = duo_client.Accounts( + ikey=inputs['IKEY'], + skey=inputs['SKEY'], + host=inputs['APIHOST'] + ) + + account_name = None + child_account_list = account_client.get_child_accounts() + for account in child_account_list: + if account['account_id'] == inputs['ACCOUNT_ID']: + account_name = account['name'] + if account_name is None: + print(f"Unable to find account with ID [{inputs['ACCOUNT_ID']}]") + sys.exit() + + print(f"Deleting child account with name [{account_name}]") + deleted_account = account_client.delete_account(inputs['ACCOUNT_ID']) + if deleted_account == '': + print(f"Account {inputs['ACCOUNT_ID']} was deleted successfully.") + else: + print(f"An unexpected error occurred while deleting account [{account_name}: {deleted_account}]") + + +if __name__ == '__main__': + main() diff --git a/examples/Accounts API/retrieve_account_list.py b/examples/Accounts API/retrieve_account_list.py new file mode 100644 index 0000000..b8f1474 --- /dev/null +++ b/examples/Accounts API/retrieve_account_list.py @@ -0,0 +1,64 @@ +""" +Example of Duo account API uaer accountentication with synchronous request/response +""" + +import duo_client +import os +import sys +import getpass + +from pprint import pprint + + +argv_iter = iter(sys.argv[1:]) + + +def _get_next_arg(prompt, secure=False): + """Read information from STDIN, using getpass when sensitive information should not be echoed to tty""" + try: + return next(argv_iter) + except StopIteration: + if secure is True: + return getpass.getpass(prompt) + else: + return input(prompt) + + +def prompt_for_credentials() -> dict: + """Collect required API credentials from command line prompts + + :return: dictionary containing Duo Accounts API ikey, skey and hostname strings + """ + + ikey = _get_next_arg('Duo Accounts API integration key ("DI..."): ') + skey = _get_next_arg('Duo Accounts API integration secret key: ', secure=True) + host = _get_next_arg('Duo Accounts API hostname ("api-....duosecurity.com"): ') + + return {"IKEY": ikey, "SKEY": skey, "APIHOST": host} + + +def main(): + """Main program entry point""" + + inputs = prompt_for_credentials() + + account_client = duo_client.Accounts( + ikey=inputs['IKEY'], + skey=inputs['SKEY'], + host=inputs['APIHOST'] + ) + + child_accounts = account_client.get_child_accounts() + + if isinstance(child_accounts, list): + # Expected list of child accounts returned + for child_account in child_accounts: + print(child_account) + + if isinstance(child_accounts, dict): + # Non-successful response returned + print(child_accounts) + + +if __name__ == '__main__': + main() diff --git a/examples/Auth API/async_basic_user_mfa.py b/examples/Auth API/async_basic_user_mfa.py index 77da9b3..10b7106 100644 --- a/examples/Auth API/async_basic_user_mfa.py +++ b/examples/Auth API/async_basic_user_mfa.py @@ -25,12 +25,12 @@ def _get_next_arg(prompt, secure=False): def prompt_for_credentials() -> dict: """Collect required API credentials from command line prompts - :return: dictionary containing Duo Admin API ikey, skey and hostname strings + :return: dictionary containing Duo Auth API ikey, skey and hostname strings """ - ikey = _get_next_arg('Duo Admin API integration key ("DI..."): ') - skey = _get_next_arg('Duo Admin API integration secret key: ', secure=True) - host = _get_next_arg('Duo Admin API hostname ("api-....duosecurity.com"): ') + ikey = _get_next_arg('Duo Auth API integration key ("DI..."): ') + skey = _get_next_arg('Duo Auth API integration secret key: ', secure=True) + host = _get_next_arg('Duo Auth API hostname ("api-....duosecurity.com"): ') username = _get_next_arg('Duo Username: ') return {"USERNAME": username, "IKEY": ikey, "SKEY": skey, "APIHOST": host} diff --git a/examples/Auth API/basic_user_mfa.py b/examples/Auth API/basic_user_mfa.py index 8e84984..31604cc 100644 --- a/examples/Auth API/basic_user_mfa.py +++ b/examples/Auth API/basic_user_mfa.py @@ -27,12 +27,12 @@ def _get_next_arg(prompt, secure=False): def prompt_for_credentials() -> dict: """Collect required API credentials from command line prompts - :return: dictionary containing Duo Admin API ikey, skey and hostname strings + :return: dictionary containing Duo Auth API ikey, skey and hostname strings """ - ikey = _get_next_arg('Duo Admin API integration key ("DI..."): ') - skey = _get_next_arg('Duo Admin API integration secret key: ', secure=True) - host = _get_next_arg('Duo Admin API hostname ("api-....duosecurity.com"): ') + ikey = _get_next_arg('Duo Auth API integration key ("DI..."): ') + skey = _get_next_arg('Duo Auth API integration secret key: ', secure=True) + host = _get_next_arg('Duo Auth API hostname ("api-....duosecurity.com"): ') username = _get_next_arg('Duo Username: ') return {"USERNAME": username, "IKEY": ikey, "SKEY": skey, "APIHOST": host} @@ -84,7 +84,7 @@ def main(): # User is unknown and not enrolled in Duo with a 'New User' policy setting of 'Require Enrollment' # Setting a 'New User' policy to 'Require Enrollment' should only be done for Group level policies where # the intent is to capture "partially enrolled" users. "Parially enrolled" users are those that Duo has a - # defined username for but does not have an MFA device enrolled. + # defined username but does not have an MFA device enrolled. print("Please enroll in Duo using the following URL.") print(pre_auth['enroll_portal_url']) elif pre_auth['result'] == "deny": diff --git a/examples/Auth API/basic_user_mfa_token.py b/examples/Auth API/basic_user_mfa_token.py index d376628..8632085 100644 --- a/examples/Auth API/basic_user_mfa_token.py +++ b/examples/Auth API/basic_user_mfa_token.py @@ -28,12 +28,12 @@ def _get_next_arg(prompt, secure=False): def prompt_for_credentials() -> dict: """Collect required API credentials from command line prompts - :return: dictionary containing Duo Admin API ikey, skey and hostname strings + :return: dictionary containing Duo Auth API ikey, skey and hostname strings """ - ikey = _get_next_arg('Duo Admin API integration key ("DI..."): ') - skey = _get_next_arg('Duo Admin API integration secret key: ', secure=True) - host = _get_next_arg('Duo Admin API hostname ("api-....duosecurity.com"): ') + ikey = _get_next_arg('Duo Auth API integration key ("DI..."): ') + skey = _get_next_arg('Duo Auth API integration secret key: ', secure=True) + host = _get_next_arg('Duo Auth API hostname ("api-....duosecurity.com"): ') username = _get_next_arg('Duo Username: ') return {"USERNAME": username, "IKEY": ikey, "SKEY": skey, "APIHOST": host}