Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 100 additions & 48 deletions TM1py/Services/RestService.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,13 @@ def __init__(self, **kwargs):
- **impersonate** (str): Name of user to impersonate.
- **re_connect_on_session_timeout** (bool): Attempt to reconnect once if session is timed out.
- **re_connect_on_remote_disconnect** (bool): Attempt to reconnect once if connection is aborted by remote end.
- **remote_disconnect_max_retries** (int): Maximum number of retry attempts after remote disconnect (default: 5).
- **remote_disconnect_retry_delay** (float): Initial delay in seconds before first retry attempt (default: 1).
- **remote_disconnect_max_delay** (float): Maximum delay cap in seconds between retry attempts (default: 30).
- **remote_disconnect_backoff_factor** (float): Multiplier for exponential backoff between retry attempts (default: 2).
- **async_polling_initial_delay** (float): Initial polling delay in seconds for async operations (default: 0.1).
- **async_polling_max_delay** (float): Maximum polling delay cap in seconds for async operations (default: 1.0).
- **async_polling_backoff_factor** (float): Multiplier for exponential backoff in async polling (default: 2).
- **proxies** (dict): Dictionary with proxies, e.g. {'http': 'http://proxy.example.com:8080', 'https': 'http://secureproxy.example.com:8090'}.
- **ssl_context**: User-defined SSL context.
- **cert** (str|tuple): (Optional) If string, path to SSL client cert file (.pem). If tuple, ('cert', 'key') pair.
Expand Down Expand Up @@ -158,6 +165,13 @@ def __init__(self, **kwargs):
self._pool_connections = int(kwargs.get("pool_connections", self.DEFAULT_POOL_CONNECTIONS))
self._re_connect_on_session_timeout = kwargs.get("re_connect_on_session_timeout", True)
self._re_connect_on_remote_disconnect = kwargs.get("re_connect_on_remote_disconnect", True)
self._remote_disconnect_max_retries = int(kwargs.get("remote_disconnect_max_retries", 5))
self._remote_disconnect_retry_delay = float(kwargs.get("remote_disconnect_retry_delay", 1))
self._remote_disconnect_max_delay = float(kwargs.get("remote_disconnect_max_delay", 30))
self._remote_disconnect_backoff_factor = float(kwargs.get("remote_disconnect_backoff_factor", 2))
self._async_polling_initial_delay = float(kwargs.get("async_polling_initial_delay", 0.1))
self._async_polling_max_delay = float(kwargs.get("async_polling_max_delay", 1.0))
self._async_polling_backoff_factor = float(kwargs.get("async_polling_backoff_factor", 2))
# is retrieved on demand and then cached
self._sandboxing_disabled = None
# optional verbose logging to stdout
Expand Down Expand Up @@ -390,7 +404,7 @@ def _poll_async_response(self, async_id: str, timeout: float, cancel_at_timeout:
"""
Poll for async operation completion
"""
for wait in RestService.wait_time_generator(timeout):
for wait in self.wait_time_generator(timeout):
response = self.retrieve_async_response(async_id)
if response.status_code in [200, 201]:
return response
Expand Down Expand Up @@ -431,52 +445,77 @@ def _handle_remote_disconnect(
**kwargs,
):
"""
Handle remote disconnect errors with reconnection and retry logic
Handle remote disconnect errors with reconnection and retry logic using exponential backoff.

Retries up to `remote_disconnect_max_retries` times with capped exponential backoff delay.
The delay is calculated as: min(delay * backoff_factor^(attempt-1), max_delay).
"""
warnings.warn(f"Connection aborted due to remote disconnect. Attempting to reconnect: {original_error}")
warnings.warn(f"Connection aborted due to remote disconnect: {original_error}")

try:
# Reconnect
self._manage_http_adapter()
self.connect()
for attempt in range(1, self._remote_disconnect_max_retries + 1):
# Calculate delay with exponential backoff: delay * backoff_factor^(attempt-1), capped at max_delay
current_delay = min(
self._remote_disconnect_retry_delay * (self._remote_disconnect_backoff_factor ** (attempt - 1)),
self._remote_disconnect_max_delay
)

# Only retry if idempotent
if not idempotent:
warnings.warn(
f"Successfully reconnected but not retrying {method.upper()} request (idempotent={idempotent})"
)
raise original_error
warnings.warn(
f"Retry attempt {attempt}/{self._remote_disconnect_max_retries} "
f"after {current_delay:.1f}s delay..."
)

warnings.warn(f"Successfully reconnected. Retrying {method.upper()} request...")
time.sleep(current_delay)

# Retry the request using the same execution path
if not async_requests_mode:
response = self._execute_sync_request(method=method, url=url, data=data, timeout=timeout, **kwargs)
else:
response = self._execute_async_request(
method=method,
url=url,
data=data,
timeout=timeout,
cancel_at_timeout=cancel_at_timeout,
return_async_id=return_async_id,
**kwargs,
)
try:
# Reconnect
self._manage_http_adapter()
self.connect()

# Only retry if idempotent
if not idempotent:
warnings.warn(
f"Successfully reconnected but not retrying {method.upper()} request (idempotent={idempotent})"
)
raise original_error

# Verify and encode response
self.verify_response(response=response)
response.encoding = encoding
return response
warnings.warn(f"Successfully reconnected. Retrying {method.upper()} request...")

# Retry the request using the same execution path
if not async_requests_mode:
response = self._execute_sync_request(method=method, url=url, data=data, timeout=timeout, **kwargs)
else:
response = self._execute_async_request(
method=method,
url=url,
data=data,
timeout=timeout,
cancel_at_timeout=cancel_at_timeout,
return_async_id=return_async_id,
**kwargs,
)

# Verify and encode response
self.verify_response(response=response)
response.encoding = encoding
return response

except TM1pyTimeout:
# Re-raise timeout exceptions as-is
raise
except TM1pyRestException:
# Re-raise TM1 exceptions as-is
raise
except Exception as retry_error:
warnings.warn(f"Failed to reconnect or retry after remote disconnect: {retry_error}")
raise original_error
except TM1pyTimeout:
# Re-raise timeout exceptions as-is
raise
except TM1pyRestException:
# Re-raise TM1 exceptions as-is
raise
except Exception as retry_error:
warnings.warn(
f"Retry attempt {attempt}/{self._remote_disconnect_max_retries} failed: {retry_error}"
)
continue

# All retries exhausted
warnings.warn(
f"All {self._remote_disconnect_max_retries} retry attempts failed after remote disconnect"
)
raise original_error

def connect(self):
if "session_id" in self._kwargs:
Expand Down Expand Up @@ -1257,17 +1296,30 @@ def build_response_from_binary_response(data: bytes) -> Response:

return requests_response

@staticmethod
def wait_time_generator(timeout: int):
yield 0.1
yield 0.3
yield 0.6
def wait_time_generator(self, timeout: float):
"""
Generate wait times for async polling with capped exponential backoff.

Uses configurable parameters:
- async_polling_initial_delay: Starting delay
- async_polling_max_delay: Maximum delay cap
- async_polling_backoff_factor: Multiplier for each iteration

Default behavior (0.1s initial, 1.0s max, 2x factor) produces:
0.1s -> 0.2s -> 0.4s -> 0.8s -> 1.0s -> 1.0s -> ...
"""
delay = self._async_polling_initial_delay
elapsed = 0.0

if timeout:
for _ in range(1, int(timeout)):
yield 1
while elapsed < timeout:
yield delay
elapsed += delay
delay = min(delay * self._async_polling_backoff_factor, self._async_polling_max_delay)
else:
while True:
yield 1
yield delay
delay = min(delay * self._async_polling_backoff_factor, self._async_polling_max_delay)

def _determine_ssl_based_on_base_url(self) -> bool:
if self._base_url.startswith("https"):
Expand Down
11 changes: 9 additions & 2 deletions TM1py/Services/TM1Service.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,15 @@ def __init__(self, **kwargs):
- **integrated_login_host** (str): Host name for Service Principal Name.
- **integrated_login_delegate** (bool): Delegate user credentials to the server.
- **impersonate** (str): Name of the user to impersonate.
- **re_connect_on_session_timeout** (bool): Attempt to reconnect if the session times out.
- **re_connect_on_remote_disconnect** (bool): Attempt to reconnect if the connection is aborted.
- **re_connect_on_session_timeout** (bool): Attempt to reconnect once if session is timed out.
- **re_connect_on_remote_disconnect** (bool): Attempt to reconnect once if connection is aborted by remote end.
- **remote_disconnect_max_retries** (int): Maximum number of retry attempts after remote disconnect (default: 5).
- **remote_disconnect_retry_delay** (float): Initial delay in seconds before first retry attempt (default: 1).
- **remote_disconnect_max_delay** (float): Maximum delay cap in seconds between retry attempts (default: 30).
- **remote_disconnect_backoff_factor** (float): Multiplier for exponential backoff between retry attempts (default: 2).
- **async_polling_initial_delay** (float): Initial polling delay in seconds for async operations (default: 0.1).
- **async_polling_max_delay** (float): Maximum polling delay cap in seconds for async operations (default: 1.0).
- **async_polling_backoff_factor** (float): Multiplier for exponential backoff in async polling (default: 2).
- **proxies** (dict): Dictionary of proxies, e.g., {'http': 'http://proxy.example.com:8080'}.
- **ssl_context**: User-defined SSL context.
- **cert** (str or tuple): Path to SSL client cert file or ('cert', 'key') pair.
Expand Down
84 changes: 76 additions & 8 deletions Tests/RestService_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,88 @@ def test_is_connected(self):
self.assertTrue(self.tm1._tm1_rest.is_connected())

def test_wait_time_generator_with_float_timeout(self):
self.assertEqual([0.1, 0.3, 0.6, 1, 1, 1, 1, 1, 1, 1, 1, 1], list(self.tm1._tm1_rest.wait_time_generator(10.0)))
self.assertEqual(sum(self.tm1._tm1_rest.wait_time_generator(10)), 10)
# With default params (0.1s initial, 1.0s max, 2x factor): 0.1 -> 0.2 -> 0.4 -> 0.8 -> 1.0 -> 1.0...
expected = [0.1, 0.2, 0.4, 0.8, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0]
self.assertEqual(expected, list(self.tm1._tm1_rest.wait_time_generator(10.0)))
self.assertEqual(10.5, sum(self.tm1._tm1_rest.wait_time_generator(10.0)))

def test_wait_time_generator_with_timeout(self):
self.assertEqual([0.1, 0.3, 0.6, 1, 1, 1, 1, 1, 1, 1, 1, 1], list(self.tm1._tm1_rest.wait_time_generator(10)))
self.assertEqual(sum(self.tm1._tm1_rest.wait_time_generator(10)), 10)
# With default params (0.1s initial, 1.0s max, 2x factor): 0.1 -> 0.2 -> 0.4 -> 0.8 -> 1.0 -> 1.0...
expected = [0.1, 0.2, 0.4, 0.8, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0]
self.assertEqual(expected, list(self.tm1._tm1_rest.wait_time_generator(10)))
self.assertEqual(10.5, sum(self.tm1._tm1_rest.wait_time_generator(10)))

def test_wait_time_generator_without_timeout(self):
# With default params (0.1s initial, 1.0s max, 2x factor): 0.1 -> 0.2 -> 0.4 -> 0.8 -> 1.0 -> 1.0...
generator = self.tm1._tm1_rest.wait_time_generator(None)
self.assertEqual(0.1, next(generator))
self.assertEqual(0.3, next(generator))
self.assertEqual(0.6, next(generator))
self.assertEqual(1, next(generator))
self.assertEqual(1, next(generator))
self.assertEqual(0.2, next(generator))
self.assertEqual(0.4, next(generator))
self.assertEqual(0.8, next(generator))
self.assertEqual(1.0, next(generator))
self.assertEqual(1.0, next(generator))

def test_wait_time_generator_custom_max_delay(self):
# Test with custom max_delay for long-running operations
original_max_delay = self.tm1._tm1_rest._async_polling_max_delay
try:
self.tm1._tm1_rest._async_polling_max_delay = 30.0
# With 0.1s initial, 30s max, 2x factor: 0.1 -> 0.2 -> 0.4 -> 0.8 -> 1.6 -> 3.2 -> 6.4 -> 12.8 -> 25.6 -> 30.0...
generator = self.tm1._tm1_rest.wait_time_generator(None)
self.assertEqual(0.1, next(generator))
self.assertEqual(0.2, next(generator))
self.assertEqual(0.4, next(generator))
self.assertEqual(0.8, next(generator))
self.assertEqual(1.6, next(generator))
self.assertEqual(3.2, next(generator))
self.assertEqual(6.4, next(generator))
self.assertEqual(12.8, next(generator))
self.assertEqual(25.6, next(generator))
self.assertEqual(30.0, next(generator))
self.assertEqual(30.0, next(generator))
finally:
self.tm1._tm1_rest._async_polling_max_delay = original_max_delay

def test_wait_time_generator_custom_backoff_factor(self):
# Test with custom backoff factor (3x instead of 2x)
original_factor = self.tm1._tm1_rest._async_polling_backoff_factor
try:
self.tm1._tm1_rest._async_polling_backoff_factor = 3.0
# With 0.1s initial, 1.0s max, 3x factor: 0.1 -> 0.3 -> 0.9 -> 1.0 -> 1.0...
generator = self.tm1._tm1_rest.wait_time_generator(None)
self.assertEqual(0.1, next(generator))
self.assertAlmostEqual(0.3, next(generator), places=5)
self.assertAlmostEqual(0.9, next(generator), places=5)
self.assertEqual(1.0, next(generator))
self.assertEqual(1.0, next(generator))
finally:
self.tm1._tm1_rest._async_polling_backoff_factor = original_factor

def test_wait_time_generator_custom_initial_delay(self):
# Test with custom initial delay
original_initial = self.tm1._tm1_rest._async_polling_initial_delay
try:
self.tm1._tm1_rest._async_polling_initial_delay = 0.5
# With 0.5s initial, 1.0s max, 2x factor: 0.5 -> 1.0 -> 1.0...
generator = self.tm1._tm1_rest.wait_time_generator(None)
self.assertEqual(0.5, next(generator))
self.assertEqual(1.0, next(generator))
self.assertEqual(1.0, next(generator))
finally:
self.tm1._tm1_rest._async_polling_initial_delay = original_initial

def test_default_remote_disconnect_parameters(self):
# Verify default values for remote disconnect retry parameters
self.assertEqual(5, self.tm1._tm1_rest._remote_disconnect_max_retries)
self.assertEqual(1.0, self.tm1._tm1_rest._remote_disconnect_retry_delay)
self.assertEqual(30.0, self.tm1._tm1_rest._remote_disconnect_max_delay)
self.assertEqual(2.0, self.tm1._tm1_rest._remote_disconnect_backoff_factor)

def test_default_async_polling_parameters(self):
# Verify default values for async polling parameters
self.assertEqual(0.1, self.tm1._tm1_rest._async_polling_initial_delay)
self.assertEqual(1.0, self.tm1._tm1_rest._async_polling_max_delay)
self.assertEqual(2.0, self.tm1._tm1_rest._async_polling_backoff_factor)

def test_build_response_from_async_response_ok(self):
response_content = (
Expand Down