From b79ca86e3e4f41ac3cf69b84a242bf0fd1e3970e Mon Sep 17 00:00:00 2001 From: Sreekanth Vadigi Date: Mon, 21 Jul 2025 19:07:46 +0530 Subject: [PATCH 1/3] streaming ingestion support for PUT operation Signed-off-by: Sreekanth Vadigi --- examples/streaming_put.py | 36 +++++ src/databricks/sql/client.py | 181 +++++++++++++++------ tests/e2e/common/streaming_put_tests.py | 51 ++++++ tests/e2e/test_driver.py | 3 +- tests/unit/test_streaming_put.py | 201 ++++++++++++++++++++++++ 5 files changed, 424 insertions(+), 48 deletions(-) create mode 100644 examples/streaming_put.py create mode 100644 tests/e2e/common/streaming_put_tests.py create mode 100644 tests/unit/test_streaming_put.py diff --git a/examples/streaming_put.py b/examples/streaming_put.py new file mode 100644 index 000000000..3f4aeef90 --- /dev/null +++ b/examples/streaming_put.py @@ -0,0 +1,36 @@ +#!/usr/bin/env python3 +""" +Simple example of streaming PUT operations. + +This demonstrates the basic usage of streaming PUT with the __input_stream__ token. +""" + +import io +import os +from databricks import sql + +def main(): + """Simple streaming PUT example.""" + + # Connect to Databricks + connection = sql.connect( + server_hostname=os.getenv("DATABRICKS_SERVER_HOSTNAME"), + http_path=os.getenv("DATABRICKS_HTTP_PATH"), + access_token=os.getenv("DATABRICKS_TOKEN"), + ) + + with connection.cursor() as cursor: + # Create a simple data stream + data = b"Hello, streaming world!" + stream = io.BytesIO(data) + + # Upload to Unity Catalog volume + cursor.execute( + "PUT '__input_stream__' INTO '/Volumes/my_catalog/my_schema/my_volume/hello.txt'", + input_stream=stream + ) + + print("File uploaded successfully!") + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/src/databricks/sql/client.py b/src/databricks/sql/client.py index e4166f117..91cb9a759 100755 --- a/src/databricks/sql/client.py +++ b/src/databricks/sql/client.py @@ -1,5 +1,5 @@ import time -from typing import Dict, Tuple, List, Optional, Any, Union, Sequence +from typing import Dict, Tuple, List, Optional, Any, Union, Sequence, BinaryIO import pandas try: @@ -455,6 +455,7 @@ def __init__( self.active_command_id = None self.escaper = ParamEscaper() self.lastrowid = None + self._input_stream_data = None self.ASYNC_DEFAULT_POLLING_INTERVAL = 2 @@ -625,6 +626,33 @@ def _handle_staging_operation( is not descended from staging_allowed_local_path. """ + assert self.active_result_set is not None + row = self.active_result_set.fetchone() + assert row is not None + + # Parse headers + headers = ( + json.loads(row.headers) if isinstance(row.headers, str) else row.headers + ) + headers = dict(headers) if headers else {} + + # Handle __input_stream__ token for PUT operations + if ( + row.operation == "PUT" and + getattr(row, "localFile", None) == "__input_stream__" + ): + if not self._input_stream_data: + raise ProgrammingError( + "No input stream provided for streaming operation", + session_id_hex=self.connection.get_session_id_hex() + ) + return self._handle_staging_put_stream( + presigned_url=row.presignedUrl, + stream=self._input_stream_data, + headers=headers + ) + + # For non-streaming operations, validate staging_allowed_local_path if isinstance(staging_allowed_local_path, type(str())): _staging_allowed_local_paths = [staging_allowed_local_path] elif isinstance(staging_allowed_local_path, type(list())): @@ -639,10 +667,6 @@ def _handle_staging_operation( os.path.abspath(i) for i in _staging_allowed_local_paths ] - assert self.active_result_set is not None - row = self.active_result_set.fetchone() - assert row is not None - # Must set to None in cases where server response does not include localFile abs_localFile = None @@ -665,15 +689,10 @@ def _handle_staging_operation( session_id_hex=self.connection.get_session_id_hex(), ) - # May be real headers, or could be json string - headers = ( - json.loads(row.headers) if isinstance(row.headers, str) else row.headers - ) - handler_args = { "presigned_url": row.presignedUrl, "local_file": abs_localFile, - "headers": dict(headers) or {}, + "headers": headers, } logger.debug( @@ -696,6 +715,60 @@ def _handle_staging_operation( session_id_hex=self.connection.get_session_id_hex(), ) + @log_latency(StatementType.SQL) + def _handle_staging_put_stream( + self, + presigned_url: str, + stream: BinaryIO, + headers: Optional[dict] = None, + ) -> None: + """Handle PUT operation with streaming data. + + Args: + presigned_url: The presigned URL for upload + stream: Binary stream to upload + headers: Optional HTTP headers + + Raises: + OperationalError: If the upload fails + """ + + # Prepare headers + http_headers = dict(headers) if headers else {} + + try: + # Stream directly to presigned URL + response = requests.put( + url=presigned_url, + data=stream, + headers=http_headers, + timeout=300 # 5 minute timeout + ) + + # Check response codes + OK = requests.codes.ok # 200 + CREATED = requests.codes.created # 201 + ACCEPTED = requests.codes.accepted # 202 + NO_CONTENT = requests.codes.no_content # 204 + + if response.status_code not in [OK, CREATED, NO_CONTENT, ACCEPTED]: + raise OperationalError( + f"Staging operation over HTTP was unsuccessful: {response.status_code}-{response.text}", + session_id_hex=self.connection.get_session_id_hex() + ) + + if response.status_code == ACCEPTED: + logger.debug( + f"Response code {ACCEPTED} from server indicates upload was accepted " + "but not yet applied on the server. It's possible this command may fail later." + ) + + except requests.exceptions.RequestException as e: + raise OperationalError( + f"HTTP request failed during stream upload: {str(e)}", + session_id_hex=self.connection.get_session_id_hex() + ) from e + @log_latency(StatementType.SQL) def _handle_staging_put( self, presigned_url: str, local_file: str, headers: Optional[dict] = None @@ -783,6 +856,7 @@ def execute( self, operation: str, parameters: Optional[TParameterCollection] = None, + input_stream: Optional[BinaryIO] = None, enforce_embedded_schema_correctness=False, ) -> "Cursor": """ @@ -820,47 +894,60 @@ def execute( logger.debug( "Cursor.execute(operation=%s, parameters=%s)", operation, parameters ) + try: + # Store stream data if provided + self._input_stream_data = None + if input_stream is not None: + # Validate stream has required methods + if not hasattr(input_stream, 'read'): + raise TypeError( + "input_stream must be a binary stream with read() method" + ) + self._input_stream_data = input_stream - param_approach = self._determine_parameter_approach(parameters) - if param_approach == ParameterApproach.NONE: - prepared_params = NO_NATIVE_PARAMS - prepared_operation = operation + param_approach = self._determine_parameter_approach(parameters) + if param_approach == ParameterApproach.NONE: + prepared_params = NO_NATIVE_PARAMS + prepared_operation = operation - elif param_approach == ParameterApproach.INLINE: - prepared_operation, prepared_params = self._prepare_inline_parameters( - operation, parameters - ) - elif param_approach == ParameterApproach.NATIVE: - normalized_parameters = self._normalize_tparametercollection(parameters) - param_structure = self._determine_parameter_structure(normalized_parameters) - transformed_operation = transform_paramstyle( - operation, normalized_parameters, param_structure - ) - prepared_operation, prepared_params = self._prepare_native_parameters( - transformed_operation, normalized_parameters, param_structure - ) - - self._check_not_closed() - self._close_and_clear_active_result_set() - self.active_result_set = self.backend.execute_command( - operation=prepared_operation, - session_id=self.connection.session.session_id, - max_rows=self.arraysize, - max_bytes=self.buffer_size_bytes, - lz4_compression=self.connection.lz4_compression, - cursor=self, - use_cloud_fetch=self.connection.use_cloud_fetch, - parameters=prepared_params, - async_op=False, - enforce_embedded_schema_correctness=enforce_embedded_schema_correctness, - ) + elif param_approach == ParameterApproach.INLINE: + prepared_operation, prepared_params = self._prepare_inline_parameters( + operation, parameters + ) + elif param_approach == ParameterApproach.NATIVE: + normalized_parameters = self._normalize_tparametercollection(parameters) + param_structure = self._determine_parameter_structure(normalized_parameters) + transformed_operation = transform_paramstyle( + operation, normalized_parameters, param_structure + ) + prepared_operation, prepared_params = self._prepare_native_parameters( + transformed_operation, normalized_parameters, param_structure + ) - if self.active_result_set and self.active_result_set.is_staging_operation: - self._handle_staging_operation( - staging_allowed_local_path=self.connection.staging_allowed_local_path + self._check_not_closed() + self._close_and_clear_active_result_set() + self.active_result_set = self.backend.execute_command( + operation=prepared_operation, + session_id=self.connection.session.session_id, + max_rows=self.arraysize, + max_bytes=self.buffer_size_bytes, + lz4_compression=self.connection.lz4_compression, + cursor=self, + use_cloud_fetch=self.connection.use_cloud_fetch, + parameters=prepared_params, + async_op=False, + enforce_embedded_schema_correctness=enforce_embedded_schema_correctness, ) - return self + if self.active_result_set and self.active_result_set.is_staging_operation: + self._handle_staging_operation( + staging_allowed_local_path=self.connection.staging_allowed_local_path + ) + + return self + finally: + # Clean up stream data + self._input_stream_data = None @log_latency(StatementType.QUERY) def execute_async( diff --git a/tests/e2e/common/streaming_put_tests.py b/tests/e2e/common/streaming_put_tests.py new file mode 100644 index 000000000..7b69e1ac5 --- /dev/null +++ b/tests/e2e/common/streaming_put_tests.py @@ -0,0 +1,51 @@ +#!/usr/bin/env python3 +""" +E2E tests for streaming PUT operations. +""" + +import io +import pytest +from datetime import datetime + + +class PySQLStreamingPutTestSuiteMixin: + """Test suite for streaming PUT operations.""" + + def test_streaming_put_basic(self, catalog, schema): + """Test basic streaming PUT functionality.""" + + # Create test data + test_data = b"Hello, streaming world! This is test data." + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + filename = f"stream_test_{timestamp}.txt" + + with self.connection() as conn: + with conn.cursor() as cursor: + with io.BytesIO(test_data) as stream: + cursor.execute( + f"PUT '__input_stream__' INTO '/Volumes/{catalog}/{schema}/e2etests/{filename}'", + input_stream=stream + ) + + # Verify file exists + cursor.execute(f"LIST '/Volumes/{catalog}/{schema}/e2etests/'") + files = cursor.fetchall() + + # Check if our file is in the list + file_paths = [row[0] for row in files] + expected_path = f"/Volumes/{catalog}/{schema}/e2etests/{filename}" + + assert expected_path in file_paths, f"File {expected_path} not found in {file_paths}" + + + def test_streaming_put_missing_stream(self, catalog, schema): + """Test that missing stream raises appropriate error.""" + + with self.connection() as conn: + with conn.cursor() as cursor: + # Test without providing stream + with pytest.raises(Exception): # Should fail + cursor.execute( + f"PUT '__input_stream__' INTO '/Volumes/{catalog}/{schema}/e2etests/test.txt'" + # Note: No input_stream parameter + ) \ No newline at end of file diff --git a/tests/e2e/test_driver.py b/tests/e2e/test_driver.py index 042fcc10a..7a7041094 100644 --- a/tests/e2e/test_driver.py +++ b/tests/e2e/test_driver.py @@ -47,8 +47,8 @@ ) from tests.e2e.common.staging_ingestion_tests import PySQLStagingIngestionTestSuiteMixin from tests.e2e.common.retry_test_mixins import PySQLRetryTestsMixin - from tests.e2e.common.uc_volume_tests import PySQLUCVolumeTestSuiteMixin +from tests.e2e.common.streaming_put_tests import PySQLStreamingPutTestSuiteMixin from databricks.sql.exc import SessionAlreadyClosedError @@ -256,6 +256,7 @@ class TestPySQLCoreSuite( PySQLStagingIngestionTestSuiteMixin, PySQLRetryTestsMixin, PySQLUCVolumeTestSuiteMixin, + PySQLStreamingPutTestSuiteMixin, ): validate_row_value_type = True validate_result = True diff --git a/tests/unit/test_streaming_put.py b/tests/unit/test_streaming_put.py new file mode 100644 index 000000000..180f444d2 --- /dev/null +++ b/tests/unit/test_streaming_put.py @@ -0,0 +1,201 @@ + +import io +import unittest +from unittest.mock import patch, Mock, MagicMock +import databricks.sql.client as client +from databricks.sql import ProgrammingError +import requests + + +class TestStreamingPutUnit(unittest.TestCase): + """Unit tests for streaming PUT functionality.""" + + def setUp(self): + """Set up test fixtures.""" + self.mock_connection = Mock() + self.mock_backend = Mock() + self.cursor = client.Cursor( + connection=self.mock_connection, + backend=self.mock_backend + ) + + def _setup_mock_staging_put_stream_response(self): + """Helper method to set up mock staging PUT stream response.""" + mock_result_set = Mock() + mock_result_set.is_staging_operation = True + self.mock_backend.execute_command.return_value = mock_result_set + + mock_row = Mock() + mock_row.operation = "PUT" + mock_row.localFile = "__input_stream__" + mock_row.presignedUrl = "https://example.com/upload" + mock_row.headers = "{}" + mock_result_set.fetchone.return_value = mock_row + + return mock_result_set + + + def test_execute_with_valid_stream(self): + """Test execute method with valid input stream.""" + + # Mock the backend response + self._setup_mock_staging_put_stream_response() + + # Test with valid stream + test_stream = io.BytesIO(b"test data") + + with patch.object(self.cursor, '_handle_staging_put_stream') as mock_handler: + self.cursor.execute( + "PUT '__input_stream__' INTO '/Volumes/test/cat/schema/vol/file.txt'", + input_stream=test_stream + ) + + # Verify staging handler was called + mock_handler.assert_called_once() + + # Verify the finally block cleanup + self.assertIsNone(self.cursor._input_stream_data) + + + def test_execute_with_invalid_stream_types(self): + """Test execute method rejects all invalid stream types.""" + + # Test all invalid input types in one place + invalid_inputs = [ + "not a stream", + [1, 2, 3], + {"key": "value"}, + 42, + True + ] + + for invalid_input in invalid_inputs: + with self.subTest(invalid_input=invalid_input): + with self.assertRaises(TypeError) as context: + self.cursor.execute( + "PUT '__input_stream__' INTO '/Volumes/test/cat/schema/vol/file.txt'", + input_stream=invalid_input + ) + error_msg = str(context.exception) + self.assertIn("input_stream must be a binary stream", error_msg) + + + def test_execute_with_none_stream_for_staging_put(self): + """Test execute method rejects None stream for streaming PUT operations.""" + + # Mock staging operation response for None case + self._setup_mock_staging_put_stream_response() + + # None with __input_stream__ raises ProgrammingError + with self.assertRaises(client.ProgrammingError) as context: + self.cursor.execute( + "PUT '__input_stream__' INTO '/Volumes/test/cat/schema/vol/file.txt'", + input_stream=None + ) + error_msg = str(context.exception) + self.assertIn("No input stream provided for streaming operation", error_msg) + + + def test_handle_staging_put_stream_success(self): + """Test successful streaming PUT operation.""" + + test_stream = io.BytesIO(b"test data") + presigned_url = "https://example.com/upload" + headers = {"Content-Type": "text/plain"} + + with patch('requests.put') as mock_put: + mock_response = Mock() + mock_response.status_code = 200 + mock_put.return_value = mock_response + + self.cursor._handle_staging_put_stream( + presigned_url=presigned_url, + stream=test_stream, + headers=headers + ) + + # Verify HTTP PUT was called correctly + mock_put.assert_called_once_with( + url=presigned_url, + data=test_stream, + headers=headers, + timeout=300 + ) + + + def test_handle_staging_put_stream_http_error(self): + """Test streaming PUT operation with HTTP error.""" + + test_stream = io.BytesIO(b"test data") + presigned_url = "https://example.com/upload" + + with patch('requests.put') as mock_put: + mock_response = Mock() + mock_response.status_code = 500 + mock_response.text = "Internal Server Error" + mock_put.return_value = mock_response + + with self.assertRaises(client.OperationalError) as context: + self.cursor._handle_staging_put_stream( + presigned_url=presigned_url, + stream=test_stream + ) + + # Check for the actual error message format + self.assertIn("500", str(context.exception)) + + + def test_handle_staging_put_stream_network_error(self): + """Test streaming PUT operation with network error.""" + + test_stream = io.BytesIO(b"test data") + presigned_url = "https://example.com/upload" + + with patch('requests.put') as mock_put: + # Use requests.exceptions.RequestException instead of generic Exception + mock_put.side_effect = requests.exceptions.RequestException("Network error") + + with self.assertRaises(client.OperationalError) as context: + self.cursor._handle_staging_put_stream( + presigned_url=presigned_url, + stream=test_stream + ) + + self.assertIn("HTTP request failed", str(context.exception)) + + + def test_stream_cleanup_after_execute(self): + """Test that stream data is cleaned up after execute.""" + + # Mock the backend response + mock_result_set = Mock() + mock_result_set.is_staging_operation = False + self.mock_backend.execute_command.return_value = mock_result_set + + test_stream = io.BytesIO(b"test data") + + # Execute with stream + self.cursor.execute("SELECT 1", input_stream=test_stream) + + # Verify stream data is cleaned up + self.assertIsNone(self.cursor._input_stream_data) + + + def test_stream_cleanup_after_exception(self): + """Test that stream data is cleaned up even after exception.""" + + # Mock the backend to raise exception + self.mock_backend.execute_command.side_effect = Exception("Backend error") + + test_stream = io.BytesIO(b"test data") + + # Execute should raise exception but cleanup should still happen + with self.assertRaises(Exception): + self.cursor.execute("SELECT 1", input_stream=test_stream) + + # Verify stream data is still cleaned up + self.assertIsNone(self.cursor._input_stream_data) + + +if __name__ == '__main__': + unittest.main() \ No newline at end of file From 4cd2cee3d793760ebab964191136bbd19b33d399 Mon Sep 17 00:00:00 2001 From: Sreekanth Vadigi Date: Mon, 21 Jul 2025 19:28:20 +0530 Subject: [PATCH 2/3] code formatter Signed-off-by: Sreekanth Vadigi --- src/databricks/sql/client.py | 44 +++++++++++++++++++----------------- 1 file changed, 23 insertions(+), 21 deletions(-) diff --git a/src/databricks/sql/client.py b/src/databricks/sql/client.py index 91cb9a759..d2d6639bf 100755 --- a/src/databricks/sql/client.py +++ b/src/databricks/sql/client.py @@ -638,18 +638,18 @@ def _handle_staging_operation( # Handle __input_stream__ token for PUT operations if ( - row.operation == "PUT" and - getattr(row, "localFile", None) == "__input_stream__" + row.operation == "PUT" + and getattr(row, "localFile", None) == "__input_stream__" ): if not self._input_stream_data: raise ProgrammingError( "No input stream provided for streaming operation", - session_id_hex=self.connection.get_session_id_hex() + session_id_hex=self.connection.get_session_id_hex(), ) return self._handle_staging_put_stream( presigned_url=row.presignedUrl, stream=self._input_stream_data, - headers=headers + headers=headers, ) # For non-streaming operations, validate staging_allowed_local_path @@ -723,50 +723,50 @@ def _handle_staging_put_stream( headers: Optional[dict] = None, ) -> None: """Handle PUT operation with streaming data. - + Args: presigned_url: The presigned URL for upload stream: Binary stream to upload headers: Optional HTTP headers - + Raises: OperationalError: If the upload fails """ - + # Prepare headers http_headers = dict(headers) if headers else {} - + try: # Stream directly to presigned URL response = requests.put( url=presigned_url, data=stream, headers=http_headers, - timeout=300 # 5 minute timeout + timeout=300, # 5 minute timeout ) - + # Check response codes - OK = requests.codes.ok # 200 - CREATED = requests.codes.created # 201 - ACCEPTED = requests.codes.accepted # 202 - NO_CONTENT = requests.codes.no_content # 204 - + OK = requests.codes.ok # 200 + CREATED = requests.codes.created # 201 + ACCEPTED = requests.codes.accepted # 202 + NO_CONTENT = requests.codes.no_content # 204 + if response.status_code not in [OK, CREATED, NO_CONTENT, ACCEPTED]: raise OperationalError( f"Staging operation over HTTP was unsuccessful: {response.status_code}-{response.text}", - session_id_hex=self.connection.get_session_id_hex() + session_id_hex=self.connection.get_session_id_hex(), ) - + if response.status_code == ACCEPTED: logger.debug( f"Response code {ACCEPTED} from server indicates upload was accepted " "but not yet applied on the server. It's possible this command may fail later." ) - + except requests.exceptions.RequestException as e: raise OperationalError( f"HTTP request failed during stream upload: {str(e)}", - session_id_hex=self.connection.get_session_id_hex() + session_id_hex=self.connection.get_session_id_hex(), ) from e @log_latency(StatementType.SQL) @@ -899,7 +899,7 @@ def execute( self._input_stream_data = None if input_stream is not None: # Validate stream has required methods - if not hasattr(input_stream, 'read'): + if not hasattr(input_stream, "read"): raise TypeError( "input_stream must be a binary stream with read() method" ) @@ -916,7 +916,9 @@ def execute( ) elif param_approach == ParameterApproach.NATIVE: normalized_parameters = self._normalize_tparametercollection(parameters) - param_structure = self._determine_parameter_structure(normalized_parameters) + param_structure = self._determine_parameter_structure( + normalized_parameters + ) transformed_operation = transform_paramstyle( operation, normalized_parameters, param_structure ) From 8ae220cac751052dd340795f073795919825d56d Mon Sep 17 00:00:00 2001 From: Sreekanth Vadigi Date: Tue, 22 Jul 2025 12:18:20 +0530 Subject: [PATCH 3/3] type error fix Signed-off-by: Sreekanth Vadigi --- src/databricks/sql/client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/databricks/sql/client.py b/src/databricks/sql/client.py index d2d6639bf..0893afbd5 100755 --- a/src/databricks/sql/client.py +++ b/src/databricks/sql/client.py @@ -455,7 +455,7 @@ def __init__( self.active_command_id = None self.escaper = ParamEscaper() self.lastrowid = None - self._input_stream_data = None + self._input_stream_data: Optional[BinaryIO] = None self.ASYNC_DEFAULT_POLLING_INTERVAL = 2