From bbc41b329850f391358265b2b6df3eea4de16d18 Mon Sep 17 00:00:00 2001 From: Mark Tripod Date: Wed, 19 Mar 2025 12:45:29 -0400 Subject: [PATCH] feat: add support for managing blocked devices in admin API Added methods to block, unblock, and fetch blocked devices via the admin API. Updated test cases to validate the new functionality, including support for pagination and operations by device ID. --- duo_client/admin.py | 117 ++++++++++++++++++++++++- tests/admin/test_registered_devices.py | 116 ++++++++++++++++++++++-- 2 files changed, 227 insertions(+), 6 deletions(-) diff --git a/duo_client/admin.py b/duo_client/admin.py index 491c60c..d95006e 100644 --- a/duo_client/admin.py +++ b/duo_client/admin.py @@ -178,8 +178,8 @@ import time import urllib.parse import warnings -from typing import List, Optional from datetime import datetime, timedelta, timezone +from typing import List, Optional from . import Accounts, client from .logs.telephony import Telephony @@ -2743,6 +2743,121 @@ def delete_registered_device(self, registered_device_id): params = {} return self.json_api_call('DELETE', path, params) + def get_blocked_devices_generator(self): + """ + Returns a generator yielding Duo Desktop blocked registered devices. + """ + return self.json_paging_api_call('GET', '/admin/v1/registered_devices/blocked', {}) + + def get_blocked_devices(self, limit=None, offset=0): + """ + Retrieves a list of Duo Desktop blocked registered devices. + + Args: + limit: The max number of registered devices to fetch at once. [Default: None] + offset: If a 'limit' is passed, the offset to start retrieval. + [Default: 0] + + Returns: + list of blocked registered devices + + Raises: + RuntimeError on error. + + """ + (limit, offset) = self.normalize_paging_args(limit, offset) + if limit: + return self.json_api_call('GET', '/admin/v1/registered_devices/blocked', {'limit': limit, 'offset': offset}) + + return list(self.get_blocked_devices_generator()) + + def get_blocked_device_by_id(self, registered_device_id): + """ + Returns a Duo Desktop blocked registered device specified by registered_device_id (compkey). + + Args: + registered_device_id - Duo Desktop registered device compkey + + Returns: + registered device object. + + Raises: + RuntimeError on error. + """ + path = '/admin/v1/registered_devices/blocked/' + registered_device_id + response = self.json_api_call('GET', path, {}) + return response + + def block_registered_devices(self, registered_device_key_list): + """ + Blocks devices from accessing any application protected by Duo policy that requires device registration. + + Args: + registered_device_key_list (list): A list of Duo Desktop registered device IDs (compkey). + + Returns: + a list of registered devices that were blocked. + + Raises: + RuntimeError on error. + """ + path = '/admin/v1/registered_devices/blocked' + params = {'registered_device_key_list': json.dumps(registered_device_key_list)} + response = self.json_api_call('POST', path, params) + return response + + def block_registered_device_by_id(self, compkey): + """ + Blocks a device from accessing any application protected by Duo policy that requires device registration. + + Args: + compkey (str): A Duo Desktop registered device ID (compkey). + + Returns: + the registered device object that was blocked. + + Raises: + RuntimeError on error. + """ + path = '/admin/v1/registered_devices/blocked/' + urllib.parse.quote_plus(compkey) + response = self.json_api_call('POST', path, {}) + return response + + def unblock_registered_devices(self, registered_device_key_list): + """ + Unblocks devices from accessing any application protected by Duo policy that requires device registration. + + Args: + registered_device_key_list (list): A list of Duo Desktop registered device IDs (compkey). + + Returns: + a list of registered devices that were blocked. + + Raises: + RuntimeError on error. + """ + path = '/admin/v1/registered_devices/blocked' + params = {'registered_device_key_list': json.dumps(registered_device_key_list)} + response = self.json_api_call('DELETE', path, params) + return response + + def unblock_registered_device_by_id(self, compkey): + """ + Unblocks a device from accessing any application protected by Duo policy that requires device registration. + + Args: + compkey (str): A Duo Desktop registered device ID (compkey). + + Returns: + the registered device object that was blocked. + + Raises: + RuntimeError on error. + """ + path = '/admin/v1/registered_devices/blocked/' + urllib.parse.quote_plus(compkey) + response = self.json_api_call('DELETE', path, {}) + return response + def get_secret_key(self, integration_key): """Returns the secret key of the specified integration. diff --git a/tests/admin/test_registered_devices.py b/tests/admin/test_registered_devices.py index 2caf026..4d5ae92 100644 --- a/tests/admin/test_registered_devices.py +++ b/tests/admin/test_registered_devices.py @@ -1,10 +1,12 @@ +import json + from .base import TestAdmin from .. import util class TestRegisteredDevices(TestAdmin): def test_get_registered_devices_generator(self): - """ Test to get desktop tokens generator. + """ Test to get registered devices generator. """ generator = self.client_list.get_registered_devices_generator() response = next(generator) @@ -16,7 +18,7 @@ def test_get_registered_devices_generator(self): {'account_id': [self.client_list.account_id], 'limit': ['100'], 'offset': ['0'], }) def test_get_registered_devices(self): - """ Test to get desktop tokens without params. + """ Test to get registered devices without params. """ response = self.client_list.get_registered_devices()[0] uri, args = response['uri'].split('?') @@ -27,7 +29,7 @@ def test_get_registered_devices(self): {'account_id': [self.client_list.account_id], 'limit': ['100'], 'offset': ['0'], }) def test_get_registered_devices_limit(self): - """ Test to get desktop tokens with limit. + """ Test to get registered devices with limit. """ response = self.client_list.get_registered_devices(limit='20')[0] uri, args = response['uri'].split('?') @@ -38,7 +40,7 @@ def test_get_registered_devices_limit(self): {'account_id': [self.client_list.account_id], 'limit': ['20'], 'offset': ['0'], }) def test_get_registered_devices_offset(self): - """ Test to get desktop tokens with offset. + """ Test to get registered devices with offset. """ response = self.client_list.get_registered_devices(offset='20')[0] uri, args = response['uri'].split('?') @@ -49,7 +51,7 @@ def test_get_registered_devices_offset(self): {'account_id': [self.client_list.account_id], 'limit': ['100'], 'offset': ['0'], }) def test_get_registered_devices_limit_offset(self): - """ Test to get desktop tokens with limit and offset. + """ Test to get registered devices with limit and offset. """ response = self.client_list.get_registered_devices(limit='20', offset='2')[0] uri, args = response['uri'].split('?') @@ -68,3 +70,107 @@ def test_delete_registered_device(self): self.assertEqual(response['method'], 'DELETE') self.assertEqual(uri, '/admin/v1/registered_devices/CRSFWW1YWVNUXMBJ1J29') self.assertEqual(util.params_to_dict(args), {'account_id': [self.client.account_id]}) + + def test_get_blocked_devices_generator(self): + """ Test to get blocked devices generator. + """ + generator = self.client_list.get_blocked_devices_generator() + response = next(generator) + uri, args = response['uri'].split('?') + + self.assertEqual(response['method'], 'GET') + self.assertEqual(uri, '/admin/v1/registered_devices/blocked') + self.assertEqual( + util.params_to_dict(args), + {'account_id': [self.client_list.account_id], 'limit': ['100'], 'offset': ['0'], } + ) + + def test_get_blocked_devices(self): + """ Test to get blocked devices without params. + """ + response = self.client_list.get_blocked_devices()[0] + uri, args = response['uri'].split('?') + + self.assertEqual(response['method'], 'GET') + self.assertEqual(uri, '/admin/v1/registered_devices/blocked') + self.assertEqual( + util.params_to_dict(args), + {'account_id': [self.client_list.account_id], 'limit': ['100'], 'offset': ['0'], } + ) + + def test_get_blocked_devices_limit(self): + """ Test to get blocked devices with limit. + """ + response = self.client_list.get_blocked_devices(limit='20')[0] + uri, args = response['uri'].split('?') + + self.assertEqual(response['method'], 'GET') + self.assertEqual(uri, '/admin/v1/registered_devices/blocked') + self.assertEqual( + util.params_to_dict(args), + {'account_id': [self.client_list.account_id], 'limit': ['20'], 'offset': ['0'], } + ) + + def test_get_blocked_devices_offset(self): + """ Test to get blocked devices with offset. + """ + response = self.client_list.get_blocked_devices(offset='20')[0] + uri, args = response['uri'].split('?') + + self.assertEqual(response['method'], 'GET') + self.assertEqual(uri, '/admin/v1/registered_devices/blocked') + self.assertEqual( + util.params_to_dict(args), + {'account_id': [self.client_list.account_id], 'limit': ['100'], 'offset': ['0'], } + ) + + def test_get_blocked_devices_limit_offset(self): + """ Test to get blocked devices with limit and offset. + """ + response = self.client_list.get_blocked_devices(limit='20', offset='2')[0] + uri, args = response['uri'].split('?') + + self.assertEqual(response['method'], 'GET') + self.assertEqual(uri, '/admin/v1/registered_devices/blocked') + self.assertEqual( + util.params_to_dict(args), + {'account_id': [self.client_list.account_id], 'limit': ['20'], 'offset': ['2'], } + ) + + def test_block_registered_device_by_id(self): + """ Test to block registered device by registered device id. + """ + response = self.client.block_registered_device_by_id('CRSFWW1YWVNUXMBJ1J29') + + self.assertEqual(response['method'], 'POST') + self.assertEqual(response['uri'], '/admin/v1/registered_devices/blocked/CRSFWW1YWVNUXMBJ1J29') + self.assertEqual(json.loads(response['body'])['account_id'], self.client.account_id) + + def test_block_registered_devices(self): + """ Test to block registered devices. + """ + response = self.client.block_registered_devices(['CRSFWW1YWVNUXMBJ1J29', 'CRSFWW1YWVNUXMBJ1J30']) + + self.assertEqual(response['method'], 'POST') + self.assertEqual(response['uri'], '/admin/v1/registered_devices/blocked') + self.assertEqual(json.loads(response['body'])['account_id'], self.client.account_id) + + def test_unblock_registered_device_by_id(self): + """ Test to unblock registered device by registered device id. + """ + response = self.client.unblock_registered_device_by_id('CRSFWW1YWVNUXMBJ1J29') + uri, args = response['uri'].split('?') + + self.assertEqual(response['method'], 'DELETE') + self.assertEqual(uri, '/admin/v1/registered_devices/blocked/CRSFWW1YWVNUXMBJ1J29') + self.assertEqual(util.params_to_dict(args), {'account_id': [self.client.account_id]}) + + def test_unblock_registered_devices(self): + """ Test to unblock registered devices. + """ + response = self.client.unblock_registered_devices(['CRSFWW1YWVNUXMBJ1J29', 'CRSFWW1YWVNUXMBJ1J30']) + uri, args = response['uri'].split('?') + + self.assertEqual(response['method'], 'DELETE') + self.assertEqual(uri, '/admin/v1/registered_devices/blocked') + self.assertEqual(util.params_to_dict(args)['account_id'], [self.client.account_id])