diff --git a/duo_client/admin.py b/duo_client/admin.py index 369b845..7deb218 100644 --- a/duo_client/admin.py +++ b/duo_client/admin.py @@ -182,8 +182,6 @@ from datetime import datetime, timedelta, timezone from . import Accounts, client -from .logs.telephony import Telephony - USER_STATUS_ACTIVE = "active" USER_STATUS_BYPASS = "bypass" USER_STATUS_DISABLED = "disabled" @@ -629,12 +627,21 @@ def get_activity_logs(self, **kwargs): row['host'] = self.host return response - def get_telephony_log(self, mintime=0, api_version=1, **kwargs): + def get_telephony_log(self, mintime: int = 0, api_version: int = 1, maxtime: Optional[int] = 0, + limit: Optional[int] = 100, sort: Optional[str] = 'desc', + next_offset: Optional[str] = None, account_id = None, + filters = None, **kwargs): """ Returns telephony log events. mintime - Fetch events only >= mintime (to avoid duplicate records that have already been fetched) + maxtime - Fetch events only <= maxtime. (API Version 2 only) + limit - Number of results to limit to, default 100, max 1000. (API Version 2 only) + sort - Sort order to be applied, default 'desc'. (API Version 2 only) + next_offset - Used to grab the next set of results from a previous response. (API Version 2 only) + account_id - Filter by account_id. (API Version 2 only) Type undocumented. + filters - Filter by filters. (API Version 2 only) Type undocumented. api_version - The API version of the handler to use. Currently, the default api version is v1, but the v1 API will be deprecated in a future version of the Duo Admin API. @@ -683,11 +690,42 @@ def get_telephony_log(self, mintime=0, api_version=1, **kwargs): if api_version not in [1,2]: raise ValueError("Invalid API Version") + + params = {} - if api_version == 2: - return Telephony.get_telephony_logs_v2(self.json_api_call, self.host, **kwargs) - return Telephony.get_telephony_logs_v1(self.json_api_call, self.host, mintime=mintime) + today = datetime.now(tz=timezone.utc) + # If mintime is not provided, the script defaults it to 180 days in past + mintime = int((today - timedelta(days=180)).timestamp() * (1000 if api_version == 2 else 1)) if not mintime else mintime + params["mintime"] = f"{int(mintime)}" + + if api_version == 2: # Add additional parameters for API Version 2 + if limit > 1000: + limit = 1000 # Limit is capped at 1000 + params["limit"] = f"{int(limit)}" + + params["sort"] = 'ts:desc' if sort.lower() == 'desc' else 'ts:asc' + # if maxtime is not provided, the script defaults it to now + maxtime = int(today.timestamp() * 1000) - 120 if not maxtime else maxtime + params["maxtime"] = f"{int(maxtime)}" + if next_offset: + params["next_offset"] = next_offset + if account_id: + params["account_id"] = account_id + if filters: + params["filters"] = filters + response = self.json_api_call("GET", '/admin/v{}/logs/telephony'.format(api_version), params) + if api_version == 1: + for row in response: + row["eventtype"] = "telephony" + row["host"] = self.host + else: + for row in response["items"]: + row["eventtype"] = "telephony" + row["host"] = self.host + + return response + def get_users_iterator(self): """ Returns iterator of user objects. @@ -3804,4 +3842,4 @@ def set_telephony_credits(self, credits): } return self.json_api_call('POST', '/admin/v1/billing/telephony_credits', - params) + params) \ No newline at end of file diff --git a/examples/Admin/log_examples.py b/examples/Admin/log_examples.py index e467d04..43286ef 100644 --- a/examples/Admin/log_examples.py +++ b/examples/Admin/log_examples.py @@ -79,7 +79,7 @@ def get_next_arg(prompt, default=None): ] ) if log_type == "telephony_v2": - telephony_logs = admin_api.get_telephony_log(api_version=2, kwargs=params) + telephony_logs = admin_api.get_telephony_log(api_version=2, **params) reporter.writerow(("telephony_id", "txid", "credits", "context", "phone", "type")) for log in telephony_logs["items"]: diff --git a/tests/admin/test_telephony.py b/tests/admin/test_telephony.py index 20a6d0f..5fdc9b2 100644 --- a/tests/admin/test_telephony.py +++ b/tests/admin/test_telephony.py @@ -33,6 +33,41 @@ def test_get_telephony_logs_v2_no_args(self): self.assertEqual(uri, "/admin/v2/logs/telephony") self.assertEqual(param_dict["mintime"], [expected_mintime]) self.assertEqual(param_dict["maxtime"], [expected_maxtime]) + self.assertAlmostEqual(param_dict["sort"], ["ts:desc"]) + self.assertEqual(param_dict["limit"], ["100"]) + + @freeze_time("2022-10-01") + def test_get_telephony_logs_v2_with_args(self): + mintime = datetime(2022, 9, 1, 0, 0, 0, tzinfo=pytz.utc) + expected_mintime = str(int(mintime.timestamp() * 1000)) + maxtime = datetime(2022, 10, 1, 0, 0, 0, tzinfo=pytz.utc) + expected_maxtime = str(int(maxtime.timestamp() * 1000) - 120) + params = {"mintime": expected_mintime, "sort": "asc", "limit": 900} + response = self.items_response_client.get_telephony_log(api_version=2, + **params) + uri, args = response["uri"].split("?") + param_dict = util.params_to_dict(args) + self.assertEqual(response["method"], "GET") + self.assertEqual(uri, "/admin/v2/logs/telephony") + self.assertEqual(param_dict["mintime"], [expected_mintime]) + self.assertEqual(param_dict["maxtime"], [expected_maxtime]) + self.assertEqual(param_dict["sort"], ["ts:asc"]) + self.assertEqual(param_dict["limit"], ["900"]) + + @freeze_time("2022-10-01") + def test_get_telephony_logs_v2_with_unsupported_args(self): + params = { + "unsupported": "argument", + "non_existent": "argument" + } + response = self.items_response_client.get_telephony_log(api_version=2, + **params) + uri, args = response["uri"].split("?") + param_dict = util.params_to_dict(args) + self.assertEqual(response["method"], "GET") + self.assertEqual(uri, "/admin/v2/logs/telephony") + self.assertNotIn("unsupported", param_dict) + self.assertNotIn("non_existent", param_dict) @freeze_time("2022-10-01") def test_get_telephony_logs_v1_no_args(self): @@ -40,3 +75,32 @@ def test_get_telephony_logs_v1_no_args(self): uri, args = response[0]["uri"].split("?") self.assertEqual(response[0]["method"], "GET") self.assertEqual(uri, "/admin/v1/logs/telephony") + + @freeze_time("2022-10-01") + def test_get_telephony_logs_v1_with_args(self): + freezed_time = datetime(2022, 9, 1, 0, 0, 0, tzinfo=pytz.utc) + expected_mintime = str( + int((freezed_time - timedelta(days=180)).timestamp()) + ) + response = self.client_list.get_telephony_log(mintime=expected_mintime) + uri, args = response[0]["uri"].split("?") + param_dict = util.params_to_dict(args) + self.assertEqual(response[0]["method"], "GET") + self.assertEqual(uri, "/admin/v1/logs/telephony") + self.assertEqual(param_dict["mintime"], [expected_mintime]) + + @freeze_time("2022-10-01") + def test_get_telephony_logs_v1_ignore_v2_args(self): + freezed_time = datetime(2022, 9, 1, 0, 0, 0, tzinfo=pytz.utc) + expected_mintime = str( + int((freezed_time - timedelta(days=180)).timestamp()) + ) + params = {"mintime": expected_mintime, "limit": 20, "sort": "ts:asc"} + response = self.client_list.get_telephony_log(**params) + uri, args = response[0]["uri"].split("?") + param_dict = util.params_to_dict(args) + self.assertEqual(response[0]["method"], "GET") + self.assertEqual(uri, "/admin/v1/logs/telephony") + self.assertEqual(param_dict["mintime"], [expected_mintime]) + self.assertNotIn(param_dict["limit"], ["limit"]) + self.assertNotIn(param_dict["sort"], ["sort"]) \ No newline at end of file