11import time
2- from typing import Dict , Tuple , List , Optional , Any , Union , Sequence
2+ from typing import Dict , Tuple , List , Optional , Any , Union , Sequence , BinaryIO
33import pandas
44
55try :
@@ -455,6 +455,7 @@ def __init__(
455455 self .active_command_id = None
456456 self .escaper = ParamEscaper ()
457457 self .lastrowid = None
458+ self ._input_stream_data = None
458459
459460 self .ASYNC_DEFAULT_POLLING_INTERVAL = 2
460461
@@ -625,6 +626,33 @@ def _handle_staging_operation(
625626 is not descended from staging_allowed_local_path.
626627 """
627628
629+ assert self .active_result_set is not None
630+ row = self .active_result_set .fetchone ()
631+ assert row is not None
632+
633+ # Parse headers
634+ headers = (
635+ json .loads (row .headers ) if isinstance (row .headers , str ) else row .headers
636+ )
637+ headers = dict (headers ) if headers else {}
638+
639+ # Handle __input_stream__ token for PUT operations
640+ if (
641+ row .operation == "PUT" and
642+ getattr (row , "localFile" , None ) == "__input_stream__"
643+ ):
644+ if not self ._input_stream_data :
645+ raise ProgrammingError (
646+ "No input stream provided for streaming operation" ,
647+ session_id_hex = self .connection .get_session_id_hex ()
648+ )
649+ return self ._handle_staging_put_stream (
650+ presigned_url = row .presignedUrl ,
651+ stream = self ._input_stream_data ,
652+ headers = headers
653+ )
654+
655+ # For non-streaming operations, validate staging_allowed_local_path
628656 if isinstance (staging_allowed_local_path , type (str ())):
629657 _staging_allowed_local_paths = [staging_allowed_local_path ]
630658 elif isinstance (staging_allowed_local_path , type (list ())):
@@ -639,10 +667,6 @@ def _handle_staging_operation(
639667 os .path .abspath (i ) for i in _staging_allowed_local_paths
640668 ]
641669
642- assert self .active_result_set is not None
643- row = self .active_result_set .fetchone ()
644- assert row is not None
645-
646670 # Must set to None in cases where server response does not include localFile
647671 abs_localFile = None
648672
@@ -665,15 +689,10 @@ def _handle_staging_operation(
665689 session_id_hex = self .connection .get_session_id_hex (),
666690 )
667691
668- # May be real headers, or could be json string
669- headers = (
670- json .loads (row .headers ) if isinstance (row .headers , str ) else row .headers
671- )
672-
673692 handler_args = {
674693 "presigned_url" : row .presignedUrl ,
675694 "local_file" : abs_localFile ,
676- "headers" : dict ( headers ) or {} ,
695+ "headers" : headers ,
677696 }
678697
679698 logger .debug (
@@ -696,6 +715,60 @@ def _handle_staging_operation(
696715 session_id_hex = self .connection .get_session_id_hex (),
697716 )
698717
718+ @log_latency (StatementType .SQL )
719+ def _handle_staging_put_stream (
720+ self ,
721+ presigned_url : str ,
722+ stream : BinaryIO ,
723+ headers : Optional [dict ] = None ,
724+ ) -> None :
725+ """Handle PUT operation with streaming data.
726+
727+ Args:
728+ presigned_url: The presigned URL for upload
729+ stream: Binary stream to upload
730+ headers: Optional HTTP headers
731+
732+ Raises:
733+ OperationalError: If the upload fails
734+ """
735+
736+ # Prepare headers
737+ http_headers = dict (headers ) if headers else {}
738+
739+ try :
740+ # Stream directly to presigned URL
741+ response = requests .put (
742+ url = presigned_url ,
743+ data = stream ,
744+ headers = http_headers ,
745+ timeout = 300 # 5 minute timeout
746+ )
747+
748+ # Check response codes
749+ OK = requests .codes .ok # 200
750+ CREATED = requests .codes .created # 201
751+ ACCEPTED = requests .codes .accepted # 202
752+ NO_CONTENT = requests .codes .no_content # 204
753+
754+ if response .status_code not in [OK , CREATED , NO_CONTENT , ACCEPTED ]:
755+ raise OperationalError (
756+ f"Staging operation over HTTP was unsuccessful: { response .status_code } -{ response .text } " ,
757+ session_id_hex = self .connection .get_session_id_hex ()
758+ )
759+
760+ if response .status_code == ACCEPTED :
761+ logger .debug (
762+ f"Response code { ACCEPTED } from server indicates upload was accepted "
763+ "but not yet applied on the server. It's possible this command may fail later."
764+ )
765+
766+ except requests .exceptions .RequestException as e :
767+ raise OperationalError (
768+ f"HTTP request failed during stream upload: { str (e )} " ,
769+ session_id_hex = self .connection .get_session_id_hex ()
770+ ) from e
771+
699772 @log_latency (StatementType .SQL )
700773 def _handle_staging_put (
701774 self , presigned_url : str , local_file : str , headers : Optional [dict ] = None
@@ -783,6 +856,7 @@ def execute(
783856 self ,
784857 operation : str ,
785858 parameters : Optional [TParameterCollection ] = None ,
859+ input_stream : Optional [BinaryIO ] = None ,
786860 enforce_embedded_schema_correctness = False ,
787861 ) -> "Cursor" :
788862 """
@@ -820,47 +894,60 @@ def execute(
820894 logger .debug (
821895 "Cursor.execute(operation=%s, parameters=%s)" , operation , parameters
822896 )
897+ try :
898+ # Store stream data if provided
899+ self ._input_stream_data = None
900+ if input_stream is not None :
901+ # Validate stream has required methods
902+ if not hasattr (input_stream , 'read' ):
903+ raise TypeError (
904+ "input_stream must be a binary stream with read() method"
905+ )
906+ self ._input_stream_data = input_stream
823907
824- param_approach = self ._determine_parameter_approach (parameters )
825- if param_approach == ParameterApproach .NONE :
826- prepared_params = NO_NATIVE_PARAMS
827- prepared_operation = operation
908+ param_approach = self ._determine_parameter_approach (parameters )
909+ if param_approach == ParameterApproach .NONE :
910+ prepared_params = NO_NATIVE_PARAMS
911+ prepared_operation = operation
828912
829- elif param_approach == ParameterApproach .INLINE :
830- prepared_operation , prepared_params = self ._prepare_inline_parameters (
831- operation , parameters
832- )
833- elif param_approach == ParameterApproach .NATIVE :
834- normalized_parameters = self ._normalize_tparametercollection (parameters )
835- param_structure = self ._determine_parameter_structure (normalized_parameters )
836- transformed_operation = transform_paramstyle (
837- operation , normalized_parameters , param_structure
838- )
839- prepared_operation , prepared_params = self ._prepare_native_parameters (
840- transformed_operation , normalized_parameters , param_structure
841- )
842-
843- self ._check_not_closed ()
844- self ._close_and_clear_active_result_set ()
845- self .active_result_set = self .backend .execute_command (
846- operation = prepared_operation ,
847- session_id = self .connection .session .session_id ,
848- max_rows = self .arraysize ,
849- max_bytes = self .buffer_size_bytes ,
850- lz4_compression = self .connection .lz4_compression ,
851- cursor = self ,
852- use_cloud_fetch = self .connection .use_cloud_fetch ,
853- parameters = prepared_params ,
854- async_op = False ,
855- enforce_embedded_schema_correctness = enforce_embedded_schema_correctness ,
856- )
913+ elif param_approach == ParameterApproach .INLINE :
914+ prepared_operation , prepared_params = self ._prepare_inline_parameters (
915+ operation , parameters
916+ )
917+ elif param_approach == ParameterApproach .NATIVE :
918+ normalized_parameters = self ._normalize_tparametercollection (parameters )
919+ param_structure = self ._determine_parameter_structure (normalized_parameters )
920+ transformed_operation = transform_paramstyle (
921+ operation , normalized_parameters , param_structure
922+ )
923+ prepared_operation , prepared_params = self ._prepare_native_parameters (
924+ transformed_operation , normalized_parameters , param_structure
925+ )
857926
858- if self .active_result_set and self .active_result_set .is_staging_operation :
859- self ._handle_staging_operation (
860- staging_allowed_local_path = self .connection .staging_allowed_local_path
927+ self ._check_not_closed ()
928+ self ._close_and_clear_active_result_set ()
929+ self .active_result_set = self .backend .execute_command (
930+ operation = prepared_operation ,
931+ session_id = self .connection .session .session_id ,
932+ max_rows = self .arraysize ,
933+ max_bytes = self .buffer_size_bytes ,
934+ lz4_compression = self .connection .lz4_compression ,
935+ cursor = self ,
936+ use_cloud_fetch = self .connection .use_cloud_fetch ,
937+ parameters = prepared_params ,
938+ async_op = False ,
939+ enforce_embedded_schema_correctness = enforce_embedded_schema_correctness ,
861940 )
862941
863- return self
942+ if self .active_result_set and self .active_result_set .is_staging_operation :
943+ self ._handle_staging_operation (
944+ staging_allowed_local_path = self .connection .staging_allowed_local_path
945+ )
946+
947+ return self
948+ finally :
949+ # Clean up stream data
950+ self ._input_stream_data = None
864951
865952 @log_latency (StatementType .QUERY )
866953 def execute_async (
0 commit comments