Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 116 additions & 1 deletion duo_client/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,8 @@
import time
import urllib.parse
import warnings
from typing import List, Optional
from datetime import datetime, timedelta, timezone
from typing import List, Optional

from . import Accounts, client
from .logs.telephony import Telephony
Expand Down Expand Up @@ -2743,6 +2743,121 @@ def delete_registered_device(self, registered_device_id):
params = {}
return self.json_api_call('DELETE', path, params)

def get_blocked_devices_generator(self):
"""
Returns a generator yielding Duo Desktop blocked registered devices.
"""
return self.json_paging_api_call('GET', '/admin/v1/registered_devices/blocked', {})

def get_blocked_devices(self, limit=None, offset=0):
"""
Retrieves a list of Duo Desktop blocked registered devices.

Args:
limit: The max number of registered devices to fetch at once. [Default: None]
offset: If a 'limit' is passed, the offset to start retrieval.
[Default: 0]

Returns:
list of blocked registered devices

Raises:
RuntimeError on error.

"""
(limit, offset) = self.normalize_paging_args(limit, offset)
if limit:
return self.json_api_call('GET', '/admin/v1/registered_devices/blocked', {'limit': limit, 'offset': offset})

return list(self.get_blocked_devices_generator())

def get_blocked_device_by_id(self, registered_device_id):
"""
Returns a Duo Desktop blocked registered device specified by registered_device_id (compkey).

Args:
registered_device_id - Duo Desktop registered device compkey

Returns:
registered device object.

Raises:
RuntimeError on error.
"""
path = '/admin/v1/registered_devices/blocked/' + registered_device_id
response = self.json_api_call('GET', path, {})
return response

def block_registered_devices(self, registered_device_key_list):
"""
Blocks devices from accessing any application protected by Duo policy that requires device registration.

Args:
registered_device_key_list (list): A list of Duo Desktop registered device IDs (compkey).

Returns:
a list of registered devices that were blocked.

Raises:
RuntimeError on error.
"""
path = '/admin/v1/registered_devices/blocked'
params = {'registered_device_key_list': json.dumps(registered_device_key_list)}
response = self.json_api_call('POST', path, params)
return response

def block_registered_device_by_id(self, compkey):
"""
Blocks a device from accessing any application protected by Duo policy that requires device registration.

Args:
compkey (str): A Duo Desktop registered device ID (compkey).

Returns:
the registered device object that was blocked.

Raises:
RuntimeError on error.
"""
path = '/admin/v1/registered_devices/blocked/' + urllib.parse.quote_plus(compkey)
response = self.json_api_call('POST', path, {})
return response

def unblock_registered_devices(self, registered_device_key_list):
"""
Unblocks devices from accessing any application protected by Duo policy that requires device registration.

Args:
registered_device_key_list (list): A list of Duo Desktop registered device IDs (compkey).

Returns:
a list of registered devices that were blocked.

Raises:
RuntimeError on error.
"""
path = '/admin/v1/registered_devices/blocked'
params = {'registered_device_key_list': json.dumps(registered_device_key_list)}
response = self.json_api_call('DELETE', path, params)
return response

def unblock_registered_device_by_id(self, compkey):
"""
Unblocks a device from accessing any application protected by Duo policy that requires device registration.

Args:
compkey (str): A Duo Desktop registered device ID (compkey).

Returns:
the registered device object that was blocked.

Raises:
RuntimeError on error.
"""
path = '/admin/v1/registered_devices/blocked/' + urllib.parse.quote_plus(compkey)
response = self.json_api_call('DELETE', path, {})
return response

def get_secret_key(self, integration_key):
"""Returns the secret key of the specified integration.

Expand Down
116 changes: 111 additions & 5 deletions tests/admin/test_registered_devices.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import json

from .base import TestAdmin
from .. import util


class TestRegisteredDevices(TestAdmin):
def test_get_registered_devices_generator(self):
""" Test to get desktop tokens generator.
""" Test to get registered devices generator.
"""
generator = self.client_list.get_registered_devices_generator()
response = next(generator)
Expand All @@ -16,7 +18,7 @@ def test_get_registered_devices_generator(self):
{'account_id': [self.client_list.account_id], 'limit': ['100'], 'offset': ['0'], })

def test_get_registered_devices(self):
""" Test to get desktop tokens without params.
""" Test to get registered devices without params.
"""
response = self.client_list.get_registered_devices()[0]
uri, args = response['uri'].split('?')
Expand All @@ -27,7 +29,7 @@ def test_get_registered_devices(self):
{'account_id': [self.client_list.account_id], 'limit': ['100'], 'offset': ['0'], })

def test_get_registered_devices_limit(self):
""" Test to get desktop tokens with limit.
""" Test to get registered devices with limit.
"""
response = self.client_list.get_registered_devices(limit='20')[0]
uri, args = response['uri'].split('?')
Expand All @@ -38,7 +40,7 @@ def test_get_registered_devices_limit(self):
{'account_id': [self.client_list.account_id], 'limit': ['20'], 'offset': ['0'], })

def test_get_registered_devices_offset(self):
""" Test to get desktop tokens with offset.
""" Test to get registered devices with offset.
"""
response = self.client_list.get_registered_devices(offset='20')[0]
uri, args = response['uri'].split('?')
Expand All @@ -49,7 +51,7 @@ def test_get_registered_devices_offset(self):
{'account_id': [self.client_list.account_id], 'limit': ['100'], 'offset': ['0'], })

def test_get_registered_devices_limit_offset(self):
""" Test to get desktop tokens with limit and offset.
""" Test to get registered devices with limit and offset.
"""
response = self.client_list.get_registered_devices(limit='20', offset='2')[0]
uri, args = response['uri'].split('?')
Expand All @@ -68,3 +70,107 @@ def test_delete_registered_device(self):
self.assertEqual(response['method'], 'DELETE')
self.assertEqual(uri, '/admin/v1/registered_devices/CRSFWW1YWVNUXMBJ1J29')
self.assertEqual(util.params_to_dict(args), {'account_id': [self.client.account_id]})

def test_get_blocked_devices_generator(self):
""" Test to get blocked devices generator.
"""
generator = self.client_list.get_blocked_devices_generator()
response = next(generator)
uri, args = response['uri'].split('?')

self.assertEqual(response['method'], 'GET')
self.assertEqual(uri, '/admin/v1/registered_devices/blocked')
self.assertEqual(
util.params_to_dict(args),
{'account_id': [self.client_list.account_id], 'limit': ['100'], 'offset': ['0'], }
)

def test_get_blocked_devices(self):
""" Test to get blocked devices without params.
"""
response = self.client_list.get_blocked_devices()[0]
uri, args = response['uri'].split('?')

self.assertEqual(response['method'], 'GET')
self.assertEqual(uri, '/admin/v1/registered_devices/blocked')
self.assertEqual(
util.params_to_dict(args),
{'account_id': [self.client_list.account_id], 'limit': ['100'], 'offset': ['0'], }
)

def test_get_blocked_devices_limit(self):
""" Test to get blocked devices with limit.
"""
response = self.client_list.get_blocked_devices(limit='20')[0]
uri, args = response['uri'].split('?')

self.assertEqual(response['method'], 'GET')
self.assertEqual(uri, '/admin/v1/registered_devices/blocked')
self.assertEqual(
util.params_to_dict(args),
{'account_id': [self.client_list.account_id], 'limit': ['20'], 'offset': ['0'], }
)

def test_get_blocked_devices_offset(self):
""" Test to get blocked devices with offset.
"""
response = self.client_list.get_blocked_devices(offset='20')[0]
uri, args = response['uri'].split('?')

self.assertEqual(response['method'], 'GET')
self.assertEqual(uri, '/admin/v1/registered_devices/blocked')
self.assertEqual(
util.params_to_dict(args),
{'account_id': [self.client_list.account_id], 'limit': ['100'], 'offset': ['0'], }
)

def test_get_blocked_devices_limit_offset(self):
""" Test to get blocked devices with limit and offset.
"""
response = self.client_list.get_blocked_devices(limit='20', offset='2')[0]
uri, args = response['uri'].split('?')

self.assertEqual(response['method'], 'GET')
self.assertEqual(uri, '/admin/v1/registered_devices/blocked')
self.assertEqual(
util.params_to_dict(args),
{'account_id': [self.client_list.account_id], 'limit': ['20'], 'offset': ['2'], }
)

def test_block_registered_device_by_id(self):
""" Test to block registered device by registered device id.
"""
response = self.client.block_registered_device_by_id('CRSFWW1YWVNUXMBJ1J29')

self.assertEqual(response['method'], 'POST')
self.assertEqual(response['uri'], '/admin/v1/registered_devices/blocked/CRSFWW1YWVNUXMBJ1J29')
self.assertEqual(json.loads(response['body'])['account_id'], self.client.account_id)

def test_block_registered_devices(self):
""" Test to block registered devices.
"""
response = self.client.block_registered_devices(['CRSFWW1YWVNUXMBJ1J29', 'CRSFWW1YWVNUXMBJ1J30'])

self.assertEqual(response['method'], 'POST')
self.assertEqual(response['uri'], '/admin/v1/registered_devices/blocked')
self.assertEqual(json.loads(response['body'])['account_id'], self.client.account_id)

def test_unblock_registered_device_by_id(self):
""" Test to unblock registered device by registered device id.
"""
response = self.client.unblock_registered_device_by_id('CRSFWW1YWVNUXMBJ1J29')
uri, args = response['uri'].split('?')

self.assertEqual(response['method'], 'DELETE')
self.assertEqual(uri, '/admin/v1/registered_devices/blocked/CRSFWW1YWVNUXMBJ1J29')
self.assertEqual(util.params_to_dict(args), {'account_id': [self.client.account_id]})

def test_unblock_registered_devices(self):
""" Test to unblock registered devices.
"""
response = self.client.unblock_registered_devices(['CRSFWW1YWVNUXMBJ1J29', 'CRSFWW1YWVNUXMBJ1J30'])
uri, args = response['uri'].split('?')

self.assertEqual(response['method'], 'DELETE')
self.assertEqual(uri, '/admin/v1/registered_devices/blocked')
self.assertEqual(util.params_to_dict(args)['account_id'], [self.client.account_id])