Skip to content

Commit 9fa5a89

Browse files
committed
finally
Signed-off-by: Sai Shree Pradhan <saishree.pradhan@databricks.com>
1 parent ed1db9d commit 9fa5a89

File tree

3 files changed

+3
-149
lines changed

3 files changed

+3
-149
lines changed

.github/workflows/code-quality-checks.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ jobs:
112112
# run test suite
113113
#----------------------------------------------
114114
- name: Run tests
115-
run: poetry run python -m pytest tests/unit -v -s
115+
run: poetry run python -m pytest tests/unit
116116
check-linting:
117117
runs-on: ubuntu-latest
118118
strategy:

src/databricks/sql/client.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -449,7 +449,7 @@ def _close(self, close_cursors=True) -> None:
449449
if close_cursors:
450450
for cursor in self._cursors:
451451
cursor.close()
452-
print(f"Closing session {self.get_session_id_hex()}")
452+
453453
logger.info(f"Closing session {self.get_session_id_hex()}")
454454
if not self.open:
455455
logger.debug("Session appears to have been closed already")

src/databricks/sql/telemetry/telemetry_client.py

Lines changed: 1 addition & 147 deletions
Original file line numberDiff line numberDiff line change
@@ -31,81 +31,6 @@
3131
logger = logging.getLogger(__name__)
3232

3333

34-
# class DebugLock:
35-
# """A wrapper around threading.Lock that provides detailed debugging for lock acquisition/release"""
36-
37-
# def __init__(self, name: str = "DebugLock"):
38-
# self._lock = threading.Lock()
39-
# self._name = name
40-
# self._owner: Optional[str] = None
41-
# self._waiters: List[str] = []
42-
# self._debug_logger = logging.getLogger(f"{__name__}.{name}")
43-
# # Ensure debug logging is visible
44-
# if not self._debug_logger.handlers:
45-
# handler = logging.StreamHandler()
46-
# formatter = logging.Formatter(
47-
# ":lock: %(asctime)s [%(threadName)s-%(thread)d] LOCK-%(name)s: %(message)s"
48-
# )
49-
# handler.setFormatter(formatter)
50-
# self._debug_logger.addHandler(handler)
51-
# self._debug_logger.setLevel(logging.DEBUG)
52-
53-
# def acquire(self, blocking=True, timeout=-1):
54-
# current = threading.current_thread()
55-
# thread_info = f"{current.name}-{current.ident}"
56-
# if self._owner:
57-
# self._debug_logger.warning(
58-
# f": WAITING: {thread_info} waiting for lock held by {self._owner}"
59-
# )
60-
# self._waiters.append(thread_info)
61-
# else:
62-
# self._debug_logger.debug(
63-
# f": TRYING: {thread_info} attempting to acquire lock"
64-
# )
65-
# # Try to acquire the lock
66-
# acquired = self._lock.acquire(blocking, timeout)
67-
# if acquired:
68-
# self._owner = thread_info
69-
# self._debug_logger.info(f": ACQUIRED: {thread_info} got the lock")
70-
# if self._waiters:
71-
# self._debug_logger.info(
72-
# f": WAITERS: {len(self._waiters)} threads waiting: {self._waiters}"
73-
# )
74-
# else:
75-
# self._debug_logger.error(
76-
# f": FAILED: {thread_info} failed to acquire lock (timeout)"
77-
# )
78-
# if thread_info in self._waiters:
79-
# self._waiters.remove(thread_info)
80-
# return acquired
81-
82-
# def release(self):
83-
# current = threading.current_thread()
84-
# thread_info = f"{current.name}-{current.ident}"
85-
# if self._owner != thread_info:
86-
# self._debug_logger.error(
87-
# f": ERROR: {thread_info} trying to release lock owned by {self._owner}"
88-
# )
89-
# else:
90-
# self._debug_logger.info(f": RELEASED: {thread_info} released the lock")
91-
# self._owner = None
92-
# # Remove from waiters if present
93-
# if thread_info in self._waiters:
94-
# self._waiters.remove(thread_info)
95-
# if self._waiters:
96-
# self._debug_logger.info(
97-
# f": NEXT: {len(self._waiters)} threads still waiting: {self._waiters}"
98-
# )
99-
# self._lock.release()
100-
101-
# def __enter__(self):
102-
# self.acquire()
103-
# return self
104-
105-
# def __exit__(self, exc_type, exc_val, exc_tb):
106-
# self.release()
107-
108-
10934
class TelemetryHelper:
11035
"""Helper class for getting telemetry related information."""
11136

@@ -430,10 +355,7 @@ class TelemetryClientFactory:
430355
] = {} # Map of session_id_hex -> BaseTelemetryClient
431356
_executor: Optional[ThreadPoolExecutor] = None
432357
_initialized: bool = False
433-
_lock = threading.Lock() # Thread safety for factory operations
434-
# _lock = DebugLock(
435-
# "TelemetryClientFactory"
436-
# ) # Thread safety for factory operations with debugging
358+
_lock = threading.RLock() # Thread safety for factory operations
437359
_original_excepthook = None
438360
_excepthook_installed = False
439361

@@ -465,7 +387,6 @@ def _install_exception_hook(cls):
465387
def _handle_unhandled_exception(cls, exc_type, exc_value, exc_traceback):
466388
"""Handle unhandled exceptions by sending telemetry and flushing thread pool"""
467389
logger.debug("Handling unhandled exception: %s", exc_type.__name__)
468-
print("Handling unhandled exception: %s", exc_type.__name__)
469390
clients_to_close = list(cls._clients.values())
470391
for client in clients_to_close:
471392
client.close()
@@ -483,36 +404,15 @@ def initialize_telemetry_client(
483404
):
484405
"""Initialize a telemetry client for a specific connection if telemetry is enabled"""
485406
try:
486-
print(
487-
"\nWAITING: Initializing telemetry client: %s",
488-
session_id_hex,
489-
flush=True,
490-
)
491407
with TelemetryClientFactory._lock:
492-
print(
493-
"\nACQUIRED: Initializing telemetry client, got lock: %s",
494-
session_id_hex,
495-
flush=True,
496-
)
497408
TelemetryClientFactory._initialize()
498-
print(
499-
"\n TelemetryClientFactory initialized: %s",
500-
session_id_hex,
501-
flush=True,
502-
)
503409

504410
if session_id_hex not in TelemetryClientFactory._clients:
505-
print(
506-
"\n Session ID not in clients: %s",
507-
session_id_hex,
508-
flush=True,
509-
)
510411
logger.debug(
511412
"Creating new TelemetryClient for connection %s",
512413
session_id_hex,
513414
)
514415
if telemetry_enabled:
515-
print("\n Telemetry enabled: %s", session_id_hex, flush=True)
516416
TelemetryClientFactory._clients[
517417
session_id_hex
518418
] = TelemetryClient(
@@ -522,41 +422,11 @@ def initialize_telemetry_client(
522422
host_url=host_url,
523423
executor=TelemetryClientFactory._executor,
524424
)
525-
print(
526-
"\n Telemetry client initialized: %s",
527-
session_id_hex,
528-
flush=True,
529-
)
530425
else:
531-
print(
532-
"\n Telemetry disabled: %s", session_id_hex, flush=True
533-
)
534426
TelemetryClientFactory._clients[
535427
session_id_hex
536428
] = NoopTelemetryClient()
537-
print(
538-
"\n Noop Telemetry client initialized: %s",
539-
session_id_hex,
540-
flush=True,
541-
)
542-
else:
543-
print(
544-
"\n Session ID already in clients: %s",
545-
session_id_hex,
546-
flush=True,
547-
)
548-
print(
549-
"\nRELEASED: Telemetry client initialized: %s",
550-
session_id_hex,
551-
flush=True,
552-
)
553429
except Exception as e:
554-
print(
555-
"\nERROR: Failed to initialize telemetry client: %s due to %s",
556-
session_id_hex,
557-
e,
558-
flush=True,
559-
)
560430
logger.debug("Failed to initialize telemetry client: %s", e)
561431
# Fallback to NoopTelemetryClient to ensure connection doesn't fail
562432
TelemetryClientFactory._clients[session_id_hex] = NoopTelemetryClient()
@@ -580,13 +450,7 @@ def get_telemetry_client(session_id_hex):
580450
@staticmethod
581451
def close(session_id_hex):
582452
"""Close and remove the telemetry client for a specific connection"""
583-
print("\nWAITING: Closing telemetry client: %s", session_id_hex, flush=True)
584453
with TelemetryClientFactory._lock:
585-
print(
586-
"\nACQUIRED: Closing telemetry client, got lock: %s",
587-
session_id_hex,
588-
flush=True,
589-
)
590454
if (
591455
telemetry_client := TelemetryClientFactory._clients.pop(
592456
session_id_hex, None
@@ -602,16 +466,6 @@ def close(session_id_hex):
602466
logger.debug(
603467
"No more telemetry clients, shutting down thread pool executor"
604468
)
605-
print(
606-
"\nSHUTDOWN: Shutting down thread pool executor: %s",
607-
session_id_hex,
608-
flush=True,
609-
)
610469
TelemetryClientFactory._executor.shutdown(wait=True)
611470
TelemetryClientFactory._executor = None
612471
TelemetryClientFactory._initialized = False
613-
print(
614-
"\nRELEASED: Thread pool executor shut down: %s",
615-
session_id_hex,
616-
flush=True,
617-
)

0 commit comments

Comments
 (0)