44This test suite verifies:
551. Circuit breaker opens after rate limit failures (429/503)
662. Circuit breaker blocks subsequent calls while open
7- 3. Circuit breaker transitions through states correctly
8- 4. Circuit breaker does not trigger for non-rate-limit errors
9- 5. Circuit breaker can be disabled via configuration flag
10- 6. Circuit breaker closes after reset timeout
7+ 3. Circuit breaker does not trigger for non-rate-limit errors
8+ 4. Circuit breaker can be disabled via configuration flag
9+ 5. Circuit breaker closes after reset timeout
1110
1211Run with:
1312 pytest tests/e2e/test_circuit_breaker.py -v -s
@@ -32,20 +31,16 @@ def aggressive_circuit_breaker_config():
3231 """
3332 from databricks .sql .telemetry import circuit_breaker_manager
3433
35- # Store original values
3634 original_minimum_calls = circuit_breaker_manager .MINIMUM_CALLS
3735 original_reset_timeout = circuit_breaker_manager .RESET_TIMEOUT
3836
39- # Patch with aggressive test values
4037 circuit_breaker_manager .MINIMUM_CALLS = 2
4138 circuit_breaker_manager .RESET_TIMEOUT = 5
4239
43- # Reset all circuit breakers before test
4440 CircuitBreakerManager ._instances .clear ()
4541
4642 yield
4743
48- # Cleanup: restore original values and reset breakers
4944 circuit_breaker_manager .MINIMUM_CALLS = original_minimum_calls
5045 circuit_breaker_manager .RESET_TIMEOUT = original_reset_timeout
5146 CircuitBreakerManager ._instances .clear ()
@@ -59,23 +54,35 @@ def get_details(self, connection_details):
5954 """Get connection details from pytest fixture"""
6055 self .arguments = connection_details .copy ()
6156
62- def test_circuit_breaker_opens_after_rate_limit_errors (self ):
57+ def create_mock_response (self , status_code ):
58+ """Helper to create mock HTTP response."""
59+ response = MagicMock (spec = HTTPResponse )
60+ response .status = status_code
61+ response .data = {
62+ 429 : b"Too Many Requests" ,
63+ 503 : b"Service Unavailable" ,
64+ 500 : b"Internal Server Error" ,
65+ }.get (status_code , b"Response" )
66+ return response
67+
68+ @pytest .mark .parametrize ("status_code,should_trigger" , [
69+ (429 , True ),
70+ (503 , True ),
71+ (500 , False ),
72+ ])
73+ def test_circuit_breaker_triggers_for_rate_limit_codes (self , status_code , should_trigger ):
6374 """
64- Verify circuit breaker opens after 429/503 errors and blocks subsequent calls .
75+ Verify circuit breaker opens for rate-limit codes ( 429/503) but not others (500) .
6576 """
6677 request_count = {"count" : 0 }
6778
68- def mock_rate_limited_request (* args , ** kwargs ):
69- """Mock that returns 429 rate limit response"""
79+ def mock_request (* args , ** kwargs ):
7080 request_count ["count" ] += 1
71- response = MagicMock (spec = HTTPResponse )
72- response .status = 429
73- response .data = b"Too Many Requests"
74- return response
81+ return self .create_mock_response (status_code )
7582
7683 with patch (
7784 "databricks.sql.telemetry.telemetry_push_client.TelemetryPushClient.request" ,
78- side_effect = mock_rate_limited_request ,
85+ side_effect = mock_request ,
7986 ):
8087 with sql .connect (
8188 server_hostname = self .arguments ["host" ],
@@ -89,88 +96,34 @@ def mock_rate_limited_request(*args, **kwargs):
8996 self .arguments ["host" ]
9097 )
9198
92- # Initial state should be CLOSED
9399 assert circuit_breaker .current_state == STATE_CLOSED
94100
95101 cursor = conn .cursor ()
96102
97- # Execute queries to trigger telemetry failures
98- cursor .execute ("SELECT 1" )
99- cursor .fetchone ()
100- time .sleep (1 )
101-
102- cursor .execute ("SELECT 2" )
103- cursor .fetchone ()
104- time .sleep (2 )
105-
106- # Circuit should now be OPEN after 2 failures
107- assert circuit_breaker .current_state == STATE_OPEN
108- assert circuit_breaker .fail_counter == 2
109-
110- # Track requests before executing another query
111- requests_before = request_count ["count" ]
112-
113- # Execute another query - circuit breaker should block telemetry
114- cursor .execute ("SELECT 3" )
115- cursor .fetchone ()
116- time .sleep (1 )
117-
118- requests_after = request_count ["count" ]
119-
120- # No new telemetry requests should be made (circuit is open)
121- assert (
122- requests_after == requests_before
123- ), "Circuit breaker should block requests while OPEN"
124-
125- def test_circuit_breaker_does_not_trigger_for_non_rate_limit_errors (self ):
126- """
127- Verify circuit breaker does NOT open for errors other than 429/503.
128- Only rate limit errors should trigger the circuit breaker.
129- """
130- request_count = {"count" : 0 }
131-
132- def mock_server_error_request (* args , ** kwargs ):
133- """Mock that returns 500 server error (not rate limit)"""
134- request_count ["count" ] += 1
135- response = MagicMock (spec = HTTPResponse )
136- response .status = 500 # Server error - should NOT trigger CB
137- response .data = b"Internal Server Error"
138- return response
139-
140- with patch (
141- "databricks.sql.telemetry.telemetry_push_client.TelemetryPushClient.request" ,
142- side_effect = mock_server_error_request ,
143- ):
144- with sql .connect (
145- server_hostname = self .arguments ["host" ],
146- http_path = self .arguments ["http_path" ],
147- access_token = self .arguments .get ("access_token" ),
148- force_enable_telemetry = True ,
149- telemetry_batch_size = 1 ,
150- _telemetry_circuit_breaker_enabled = True ,
151- ) as conn :
152- circuit_breaker = CircuitBreakerManager .get_circuit_breaker (
153- self .arguments ["host" ]
154- )
155-
156- cursor = conn .cursor ()
157-
158- # Execute multiple queries with 500 errors
159- for i in range (5 ):
103+ # Execute queries to trigger telemetry
104+ for i in range (1 , 6 ):
160105 cursor .execute (f"SELECT { i } " )
161106 cursor .fetchone ()
162107 time .sleep (0.5 )
163108
164- # Circuit should remain CLOSED (500 errors don't trigger CB)
165- assert (
166- circuit_breaker .current_state == STATE_CLOSED
167- ), "Circuit should stay CLOSED for non-rate-limit errors"
168- assert (
169- circuit_breaker .fail_counter == 0
170- ), "Non-rate-limit errors should not increment fail counter"
109+ if should_trigger :
110+ # Circuit should be OPEN after 2 rate-limit failures
111+ assert circuit_breaker .current_state == STATE_OPEN
112+ assert circuit_breaker .fail_counter == 2
113+
114+ # Track requests before another query
115+ requests_before = request_count ["count" ]
116+ cursor .execute ("SELECT 99" )
117+ cursor .fetchone ()
118+ time .sleep (1 )
171119
172- # Requests should still go through
173- assert request_count ["count" ] >= 5 , "Requests should not be blocked"
120+ # No new telemetry requests (circuit is open)
121+ assert request_count ["count" ] == requests_before
122+ else :
123+ # Circuit should remain CLOSED for non-rate-limit errors
124+ assert circuit_breaker .current_state == STATE_CLOSED
125+ assert circuit_breaker .fail_counter == 0
126+ assert request_count ["count" ] >= 5
174127
175128 def test_circuit_breaker_disabled_allows_all_calls (self ):
176129 """
@@ -180,12 +133,8 @@ def test_circuit_breaker_disabled_allows_all_calls(self):
180133 request_count = {"count" : 0 }
181134
182135 def mock_rate_limited_request (* args , ** kwargs ):
183- """Mock that returns 429"""
184136 request_count ["count" ] += 1
185- response = MagicMock (spec = HTTPResponse )
186- response .status = 429
187- response .data = b"Too Many Requests"
188- return response
137+ return self .create_mock_response (429 )
189138
190139 with patch (
191140 "databricks.sql.telemetry.telemetry_push_client.TelemetryPushClient.request" ,
@@ -201,16 +150,12 @@ def mock_rate_limited_request(*args, **kwargs):
201150 ) as conn :
202151 cursor = conn .cursor ()
203152
204- # Execute multiple queries
205153 for i in range (5 ):
206154 cursor .execute (f"SELECT { i } " )
207155 cursor .fetchone ()
208156 time .sleep (0.3 )
209157
210- # All requests should go through (no circuit breaker)
211- assert (
212- request_count ["count" ] >= 5
213- ), "All requests should go through when CB disabled"
158+ assert request_count ["count" ] >= 5
214159
215160 def test_circuit_breaker_recovers_after_reset_timeout (self ):
216161 """
@@ -221,20 +166,9 @@ def test_circuit_breaker_recovers_after_reset_timeout(self):
221166 fail_requests = {"enabled" : True }
222167
223168 def mock_conditional_request (* args , ** kwargs ):
224- """Mock that fails initially, then succeeds"""
225169 request_count ["count" ] += 1
226- response = MagicMock (spec = HTTPResponse )
227-
228- if fail_requests ["enabled" ]:
229- # Return 429 to trigger circuit breaker
230- response .status = 429
231- response .data = b"Too Many Requests"
232- else :
233- # Return success
234- response .status = 200
235- response .data = b"OK"
236-
237- return response
170+ status = 429 if fail_requests ["enabled" ] else 200
171+ return self .create_mock_response (status )
238172
239173 with patch (
240174 "databricks.sql.telemetry.telemetry_push_client.TelemetryPushClient.request" ,
@@ -263,7 +197,6 @@ def mock_conditional_request(*args, **kwargs):
263197 cursor .fetchone ()
264198 time .sleep (2 )
265199
266- # Circuit should be OPEN
267200 assert circuit_breaker .current_state == STATE_OPEN
268201
269202 # Wait for reset timeout (5 seconds in test)
@@ -277,7 +210,7 @@ def mock_conditional_request(*args, **kwargs):
277210 cursor .fetchone ()
278211 time .sleep (1 )
279212
280- # Circuit should be HALF_OPEN or CLOSED (testing recovery)
213+ # Circuit should be recovering
281214 assert circuit_breaker .current_state in [
282215 STATE_HALF_OPEN ,
283216 STATE_CLOSED ,
@@ -288,61 +221,12 @@ def mock_conditional_request(*args, **kwargs):
288221 cursor .fetchone ()
289222 time .sleep (1 )
290223
291- # Eventually should be CLOSED if requests succeed
292- # (may take a few successful requests to close from HALF_OPEN)
293224 current_state = circuit_breaker .current_state
294225 assert current_state in [
295226 STATE_CLOSED ,
296227 STATE_HALF_OPEN ,
297228 ], f"Circuit should recover to CLOSED or HALF_OPEN, got { current_state } "
298229
299- def test_circuit_breaker_503_also_triggers_circuit (self ):
300- """
301- Verify circuit breaker opens for 503 Service Unavailable errors
302- in addition to 429 rate limit errors.
303- """
304- request_count = {"count" : 0 }
305-
306- def mock_service_unavailable_request (* args , ** kwargs ):
307- """Mock that returns 503 service unavailable"""
308- request_count ["count" ] += 1
309- response = MagicMock (spec = HTTPResponse )
310- response .status = 503 # Service unavailable - should trigger CB
311- response .data = b"Service Unavailable"
312- return response
313-
314- with patch (
315- "databricks.sql.telemetry.telemetry_push_client.TelemetryPushClient.request" ,
316- side_effect = mock_service_unavailable_request ,
317- ):
318- with sql .connect (
319- server_hostname = self .arguments ["host" ],
320- http_path = self .arguments ["http_path" ],
321- access_token = self .arguments .get ("access_token" ),
322- force_enable_telemetry = True ,
323- telemetry_batch_size = 1 ,
324- _telemetry_circuit_breaker_enabled = True ,
325- ) as conn :
326- circuit_breaker = CircuitBreakerManager .get_circuit_breaker (
327- self .arguments ["host" ]
328- )
329-
330- cursor = conn .cursor ()
331-
332- # Execute queries to trigger 503 failures
333- cursor .execute ("SELECT 1" )
334- cursor .fetchone ()
335- time .sleep (1 )
336-
337- cursor .execute ("SELECT 2" )
338- cursor .fetchone ()
339- time .sleep (2 )
340-
341- # Circuit should be OPEN after 2 x 503 errors
342- assert (
343- circuit_breaker .current_state == STATE_OPEN
344- ), "503 errors should trigger circuit breaker"
345-
346230
347231if __name__ == "__main__" :
348232 pytest .main ([__file__ , "-v" , "-s" ])
0 commit comments