diff --git a/duo_client/admin.py b/duo_client/admin.py index 491c60c..d95006e 100644 --- a/duo_client/admin.py +++ b/duo_client/admin.py @@ -178,8 +178,8 @@ import time import urllib.parse import warnings -from typing import List, Optional from datetime import datetime, timedelta, timezone +from typing import List, Optional from . import Accounts, client from .logs.telephony import Telephony @@ -2743,6 +2743,121 @@ def delete_registered_device(self, registered_device_id): params = {} return self.json_api_call('DELETE', path, params) + def get_blocked_devices_generator(self): + """ + Returns a generator yielding Duo Desktop blocked registered devices. + """ + return self.json_paging_api_call('GET', '/admin/v1/registered_devices/blocked', {}) + + def get_blocked_devices(self, limit=None, offset=0): + """ + Retrieves a list of Duo Desktop blocked registered devices. + + Args: + limit: The max number of registered devices to fetch at once. [Default: None] + offset: If a 'limit' is passed, the offset to start retrieval. + [Default: 0] + + Returns: + list of blocked registered devices + + Raises: + RuntimeError on error. + + """ + (limit, offset) = self.normalize_paging_args(limit, offset) + if limit: + return self.json_api_call('GET', '/admin/v1/registered_devices/blocked', {'limit': limit, 'offset': offset}) + + return list(self.get_blocked_devices_generator()) + + def get_blocked_device_by_id(self, registered_device_id): + """ + Returns a Duo Desktop blocked registered device specified by registered_device_id (compkey). + + Args: + registered_device_id - Duo Desktop registered device compkey + + Returns: + registered device object. + + Raises: + RuntimeError on error. + """ + path = '/admin/v1/registered_devices/blocked/' + registered_device_id + response = self.json_api_call('GET', path, {}) + return response + + def block_registered_devices(self, registered_device_key_list): + """ + Blocks devices from accessing any application protected by Duo policy that requires device registration. + + Args: + registered_device_key_list (list): A list of Duo Desktop registered device IDs (compkey). + + Returns: + a list of registered devices that were blocked. + + Raises: + RuntimeError on error. + """ + path = '/admin/v1/registered_devices/blocked' + params = {'registered_device_key_list': json.dumps(registered_device_key_list)} + response = self.json_api_call('POST', path, params) + return response + + def block_registered_device_by_id(self, compkey): + """ + Blocks a device from accessing any application protected by Duo policy that requires device registration. + + Args: + compkey (str): A Duo Desktop registered device ID (compkey). + + Returns: + the registered device object that was blocked. + + Raises: + RuntimeError on error. + """ + path = '/admin/v1/registered_devices/blocked/' + urllib.parse.quote_plus(compkey) + response = self.json_api_call('POST', path, {}) + return response + + def unblock_registered_devices(self, registered_device_key_list): + """ + Unblocks devices from accessing any application protected by Duo policy that requires device registration. + + Args: + registered_device_key_list (list): A list of Duo Desktop registered device IDs (compkey). + + Returns: + a list of registered devices that were blocked. + + Raises: + RuntimeError on error. + """ + path = '/admin/v1/registered_devices/blocked' + params = {'registered_device_key_list': json.dumps(registered_device_key_list)} + response = self.json_api_call('DELETE', path, params) + return response + + def unblock_registered_device_by_id(self, compkey): + """ + Unblocks a device from accessing any application protected by Duo policy that requires device registration. + + Args: + compkey (str): A Duo Desktop registered device ID (compkey). + + Returns: + the registered device object that was blocked. + + Raises: + RuntimeError on error. + """ + path = '/admin/v1/registered_devices/blocked/' + urllib.parse.quote_plus(compkey) + response = self.json_api_call('DELETE', path, {}) + return response + def get_secret_key(self, integration_key): """Returns the secret key of the specified integration. diff --git a/tests/admin/test_registered_devices.py b/tests/admin/test_registered_devices.py index 2caf026..4d5ae92 100644 --- a/tests/admin/test_registered_devices.py +++ b/tests/admin/test_registered_devices.py @@ -1,10 +1,12 @@ +import json + from .base import TestAdmin from .. import util class TestRegisteredDevices(TestAdmin): def test_get_registered_devices_generator(self): - """ Test to get desktop tokens generator. + """ Test to get registered devices generator. """ generator = self.client_list.get_registered_devices_generator() response = next(generator) @@ -16,7 +18,7 @@ def test_get_registered_devices_generator(self): {'account_id': [self.client_list.account_id], 'limit': ['100'], 'offset': ['0'], }) def test_get_registered_devices(self): - """ Test to get desktop tokens without params. + """ Test to get registered devices without params. """ response = self.client_list.get_registered_devices()[0] uri, args = response['uri'].split('?') @@ -27,7 +29,7 @@ def test_get_registered_devices(self): {'account_id': [self.client_list.account_id], 'limit': ['100'], 'offset': ['0'], }) def test_get_registered_devices_limit(self): - """ Test to get desktop tokens with limit. + """ Test to get registered devices with limit. """ response = self.client_list.get_registered_devices(limit='20')[0] uri, args = response['uri'].split('?') @@ -38,7 +40,7 @@ def test_get_registered_devices_limit(self): {'account_id': [self.client_list.account_id], 'limit': ['20'], 'offset': ['0'], }) def test_get_registered_devices_offset(self): - """ Test to get desktop tokens with offset. + """ Test to get registered devices with offset. """ response = self.client_list.get_registered_devices(offset='20')[0] uri, args = response['uri'].split('?') @@ -49,7 +51,7 @@ def test_get_registered_devices_offset(self): {'account_id': [self.client_list.account_id], 'limit': ['100'], 'offset': ['0'], }) def test_get_registered_devices_limit_offset(self): - """ Test to get desktop tokens with limit and offset. + """ Test to get registered devices with limit and offset. """ response = self.client_list.get_registered_devices(limit='20', offset='2')[0] uri, args = response['uri'].split('?') @@ -68,3 +70,107 @@ def test_delete_registered_device(self): self.assertEqual(response['method'], 'DELETE') self.assertEqual(uri, '/admin/v1/registered_devices/CRSFWW1YWVNUXMBJ1J29') self.assertEqual(util.params_to_dict(args), {'account_id': [self.client.account_id]}) + + def test_get_blocked_devices_generator(self): + """ Test to get blocked devices generator. + """ + generator = self.client_list.get_blocked_devices_generator() + response = next(generator) + uri, args = response['uri'].split('?') + + self.assertEqual(response['method'], 'GET') + self.assertEqual(uri, '/admin/v1/registered_devices/blocked') + self.assertEqual( + util.params_to_dict(args), + {'account_id': [self.client_list.account_id], 'limit': ['100'], 'offset': ['0'], } + ) + + def test_get_blocked_devices(self): + """ Test to get blocked devices without params. + """ + response = self.client_list.get_blocked_devices()[0] + uri, args = response['uri'].split('?') + + self.assertEqual(response['method'], 'GET') + self.assertEqual(uri, '/admin/v1/registered_devices/blocked') + self.assertEqual( + util.params_to_dict(args), + {'account_id': [self.client_list.account_id], 'limit': ['100'], 'offset': ['0'], } + ) + + def test_get_blocked_devices_limit(self): + """ Test to get blocked devices with limit. + """ + response = self.client_list.get_blocked_devices(limit='20')[0] + uri, args = response['uri'].split('?') + + self.assertEqual(response['method'], 'GET') + self.assertEqual(uri, '/admin/v1/registered_devices/blocked') + self.assertEqual( + util.params_to_dict(args), + {'account_id': [self.client_list.account_id], 'limit': ['20'], 'offset': ['0'], } + ) + + def test_get_blocked_devices_offset(self): + """ Test to get blocked devices with offset. + """ + response = self.client_list.get_blocked_devices(offset='20')[0] + uri, args = response['uri'].split('?') + + self.assertEqual(response['method'], 'GET') + self.assertEqual(uri, '/admin/v1/registered_devices/blocked') + self.assertEqual( + util.params_to_dict(args), + {'account_id': [self.client_list.account_id], 'limit': ['100'], 'offset': ['0'], } + ) + + def test_get_blocked_devices_limit_offset(self): + """ Test to get blocked devices with limit and offset. + """ + response = self.client_list.get_blocked_devices(limit='20', offset='2')[0] + uri, args = response['uri'].split('?') + + self.assertEqual(response['method'], 'GET') + self.assertEqual(uri, '/admin/v1/registered_devices/blocked') + self.assertEqual( + util.params_to_dict(args), + {'account_id': [self.client_list.account_id], 'limit': ['20'], 'offset': ['2'], } + ) + + def test_block_registered_device_by_id(self): + """ Test to block registered device by registered device id. + """ + response = self.client.block_registered_device_by_id('CRSFWW1YWVNUXMBJ1J29') + + self.assertEqual(response['method'], 'POST') + self.assertEqual(response['uri'], '/admin/v1/registered_devices/blocked/CRSFWW1YWVNUXMBJ1J29') + self.assertEqual(json.loads(response['body'])['account_id'], self.client.account_id) + + def test_block_registered_devices(self): + """ Test to block registered devices. + """ + response = self.client.block_registered_devices(['CRSFWW1YWVNUXMBJ1J29', 'CRSFWW1YWVNUXMBJ1J30']) + + self.assertEqual(response['method'], 'POST') + self.assertEqual(response['uri'], '/admin/v1/registered_devices/blocked') + self.assertEqual(json.loads(response['body'])['account_id'], self.client.account_id) + + def test_unblock_registered_device_by_id(self): + """ Test to unblock registered device by registered device id. + """ + response = self.client.unblock_registered_device_by_id('CRSFWW1YWVNUXMBJ1J29') + uri, args = response['uri'].split('?') + + self.assertEqual(response['method'], 'DELETE') + self.assertEqual(uri, '/admin/v1/registered_devices/blocked/CRSFWW1YWVNUXMBJ1J29') + self.assertEqual(util.params_to_dict(args), {'account_id': [self.client.account_id]}) + + def test_unblock_registered_devices(self): + """ Test to unblock registered devices. + """ + response = self.client.unblock_registered_devices(['CRSFWW1YWVNUXMBJ1J29', 'CRSFWW1YWVNUXMBJ1J30']) + uri, args = response['uri'].split('?') + + self.assertEqual(response['method'], 'DELETE') + self.assertEqual(uri, '/admin/v1/registered_devices/blocked') + self.assertEqual(util.params_to_dict(args)['account_id'], [self.client.account_id])