Skip to content

Commit 8f5e5ba

Browse files
committed
removed the connection.close() from that test, put debug statement before and after TelemetryClientFactory lock
Signed-off-by: Sai Shree Pradhan <saishree.pradhan@databricks.com>
1 parent 6b1d1b8 commit 8f5e5ba

File tree

2 files changed

+80
-79
lines changed

2 files changed

+80
-79
lines changed

src/databricks/sql/telemetry/telemetry_client.py

Lines changed: 80 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -31,79 +31,79 @@
3131
logger = logging.getLogger(__name__)
3232

3333

34-
class DebugLock:
35-
"""A wrapper around threading.Lock that provides detailed debugging for lock acquisition/release"""
36-
37-
def __init__(self, name: str = "DebugLock"):
38-
self._lock = threading.Lock()
39-
self._name = name
40-
self._owner: Optional[str] = None
41-
self._waiters: List[str] = []
42-
self._debug_logger = logging.getLogger(f"{__name__}.{name}")
43-
# Ensure debug logging is visible
44-
if not self._debug_logger.handlers:
45-
handler = logging.StreamHandler()
46-
formatter = logging.Formatter(
47-
":lock: %(asctime)s [%(threadName)s-%(thread)d] LOCK-%(name)s: %(message)s"
48-
)
49-
handler.setFormatter(formatter)
50-
self._debug_logger.addHandler(handler)
51-
self._debug_logger.setLevel(logging.DEBUG)
52-
53-
def acquire(self, blocking=True, timeout=-1):
54-
current = threading.current_thread()
55-
thread_info = f"{current.name}-{current.ident}"
56-
if self._owner:
57-
self._debug_logger.warning(
58-
f": WAITING: {thread_info} waiting for lock held by {self._owner}"
59-
)
60-
self._waiters.append(thread_info)
61-
else:
62-
self._debug_logger.debug(
63-
f": TRYING: {thread_info} attempting to acquire lock"
64-
)
65-
# Try to acquire the lock
66-
acquired = self._lock.acquire(blocking, timeout)
67-
if acquired:
68-
self._owner = thread_info
69-
self._debug_logger.info(f": ACQUIRED: {thread_info} got the lock")
70-
if self._waiters:
71-
self._debug_logger.info(
72-
f": WAITERS: {len(self._waiters)} threads waiting: {self._waiters}"
73-
)
74-
else:
75-
self._debug_logger.error(
76-
f": FAILED: {thread_info} failed to acquire lock (timeout)"
77-
)
78-
if thread_info in self._waiters:
79-
self._waiters.remove(thread_info)
80-
return acquired
81-
82-
def release(self):
83-
current = threading.current_thread()
84-
thread_info = f"{current.name}-{current.ident}"
85-
if self._owner != thread_info:
86-
self._debug_logger.error(
87-
f": ERROR: {thread_info} trying to release lock owned by {self._owner}"
88-
)
89-
else:
90-
self._debug_logger.info(f": RELEASED: {thread_info} released the lock")
91-
self._owner = None
92-
# Remove from waiters if present
93-
if thread_info in self._waiters:
94-
self._waiters.remove(thread_info)
95-
if self._waiters:
96-
self._debug_logger.info(
97-
f": NEXT: {len(self._waiters)} threads still waiting: {self._waiters}"
98-
)
99-
self._lock.release()
100-
101-
def __enter__(self):
102-
self.acquire()
103-
return self
104-
105-
def __exit__(self, exc_type, exc_val, exc_tb):
106-
self.release()
34+
# class DebugLock:
35+
# """A wrapper around threading.Lock that provides detailed debugging for lock acquisition/release"""
36+
37+
# def __init__(self, name: str = "DebugLock"):
38+
# self._lock = threading.Lock()
39+
# self._name = name
40+
# self._owner: Optional[str] = None
41+
# self._waiters: List[str] = []
42+
# self._debug_logger = logging.getLogger(f"{__name__}.{name}")
43+
# # Ensure debug logging is visible
44+
# if not self._debug_logger.handlers:
45+
# handler = logging.StreamHandler()
46+
# formatter = logging.Formatter(
47+
# ":lock: %(asctime)s [%(threadName)s-%(thread)d] LOCK-%(name)s: %(message)s"
48+
# )
49+
# handler.setFormatter(formatter)
50+
# self._debug_logger.addHandler(handler)
51+
# self._debug_logger.setLevel(logging.DEBUG)
52+
53+
# def acquire(self, blocking=True, timeout=-1):
54+
# current = threading.current_thread()
55+
# thread_info = f"{current.name}-{current.ident}"
56+
# if self._owner:
57+
# self._debug_logger.warning(
58+
# f": WAITING: {thread_info} waiting for lock held by {self._owner}"
59+
# )
60+
# self._waiters.append(thread_info)
61+
# else:
62+
# self._debug_logger.debug(
63+
# f": TRYING: {thread_info} attempting to acquire lock"
64+
# )
65+
# # Try to acquire the lock
66+
# acquired = self._lock.acquire(blocking, timeout)
67+
# if acquired:
68+
# self._owner = thread_info
69+
# self._debug_logger.info(f": ACQUIRED: {thread_info} got the lock")
70+
# if self._waiters:
71+
# self._debug_logger.info(
72+
# f": WAITERS: {len(self._waiters)} threads waiting: {self._waiters}"
73+
# )
74+
# else:
75+
# self._debug_logger.error(
76+
# f": FAILED: {thread_info} failed to acquire lock (timeout)"
77+
# )
78+
# if thread_info in self._waiters:
79+
# self._waiters.remove(thread_info)
80+
# return acquired
81+
82+
# def release(self):
83+
# current = threading.current_thread()
84+
# thread_info = f"{current.name}-{current.ident}"
85+
# if self._owner != thread_info:
86+
# self._debug_logger.error(
87+
# f": ERROR: {thread_info} trying to release lock owned by {self._owner}"
88+
# )
89+
# else:
90+
# self._debug_logger.info(f": RELEASED: {thread_info} released the lock")
91+
# self._owner = None
92+
# # Remove from waiters if present
93+
# if thread_info in self._waiters:
94+
# self._waiters.remove(thread_info)
95+
# if self._waiters:
96+
# self._debug_logger.info(
97+
# f": NEXT: {len(self._waiters)} threads still waiting: {self._waiters}"
98+
# )
99+
# self._lock.release()
100+
101+
# def __enter__(self):
102+
# self.acquire()
103+
# return self
104+
105+
# def __exit__(self, exc_type, exc_val, exc_tb):
106+
# self.release()
107107

108108

109109
class TelemetryHelper:
@@ -430,10 +430,10 @@ class TelemetryClientFactory:
430430
] = {} # Map of session_id_hex -> BaseTelemetryClient
431431
_executor: Optional[ThreadPoolExecutor] = None
432432
_initialized: bool = False
433-
# _lock = threading.Lock() # Thread safety for factory operations
434-
_lock = DebugLock(
435-
"TelemetryClientFactory"
436-
) # Thread safety for factory operations with debugging
433+
_lock = threading.Lock() # Thread safety for factory operations
434+
# _lock = DebugLock(
435+
# "TelemetryClientFactory"
436+
# ) # Thread safety for factory operations with debugging
437437
_original_excepthook = None
438438
_excepthook_installed = False
439439

@@ -483,8 +483,9 @@ def initialize_telemetry_client(
483483
):
484484
"""Initialize a telemetry client for a specific connection if telemetry is enabled"""
485485
try:
486-
486+
print("Initializing telemetry client: %s", session_id_hex)
487487
with TelemetryClientFactory._lock:
488+
print("Initializing telemetry client, got lock: %s", session_id_hex)
488489
TelemetryClientFactory._initialize()
489490

490491
if session_id_hex not in TelemetryClientFactory._clients:
@@ -506,6 +507,7 @@ def initialize_telemetry_client(
506507
TelemetryClientFactory._clients[
507508
session_id_hex
508509
] = NoopTelemetryClient()
510+
print("Telemetry client initialized: %s", session_id_hex)
509511
except Exception as e:
510512
logger.debug("Failed to initialize telemetry client: %s", e)
511513
# Fallback to NoopTelemetryClient to ensure connection doesn't fail

tests/unit/test_client.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -264,7 +264,6 @@ def test_arraysize_buffer_size_passthrough(
264264

265265
self.assertEqual(kwargs["arraysize"], 999)
266266
self.assertEqual(kwargs["result_buffer_size_bytes"], 1234)
267-
connection.close()
268267

269268
def test_closing_result_set_with_closed_connection_soft_closes_commands(self):
270269
mock_connection = Mock()

0 commit comments

Comments
 (0)