@@ -262,7 +262,15 @@ def executemany(self: "Cursor", operation, param_sets) -> "Cursor":
262262 self ._redshift_row_count = - 1 if - 1 in redshift_rowcounts else sum (rowcounts )
263263 return self
264264
265- def insert_data_bulk (self : "Cursor" , filename , table_name , column_indexes , column_names , delimeter ) -> "Cursor" :
265+ def insert_data_bulk (
266+ self : "Cursor" ,
267+ filename : str ,
268+ table_name : str ,
269+ parameter_indices : typing .List [int ],
270+ column_names : typing .List [str ],
271+ delimiter : str ,
272+ batch_size : int = 1 ,
273+ ) -> "Cursor" :
266274
267275 """runs a single bulk insert statement into the database.
268276 This method is native to redshift_connector.
@@ -272,40 +280,55 @@ def insert_data_bulk(self: "Cursor", filename, table_name, column_indexes, colum
272280 The name of the table to insert to.
273281 :param column_names:list
274282 The name of the columns in the table to insert to.
275- :param column_indexes:list
276- The indexes of the columns in the table to insert to.
277- :param delimeter: str
278- The delimeter to use when reading the file.
283+ :param parameter_indices:list
284+ The indexes of the columns in the file to insert to.
285+ :param delimiter: str
286+ The delimiter to use when reading the file.
287+ :param batch_size: int
288+ The number of rows to insert per insert statement. Minimum allowed value is 1.
279289 Returns
280290 -------
281291 The Cursor object used for executing the specified database operation: :class:`Cursor`
282292 """
293+ if batch_size < 1 :
294+ raise InterfaceError ("batch_size must be greater than 1" )
283295 if not self .__is_valid_table (table_name ):
284296 raise InterfaceError ("Invalid table name passed to insert_data_bulk: {}" .format (table_name ))
285297 if not self .__has_valid_columns (table_name , column_names ):
286298 raise InterfaceError ("Invalid column names passed to insert_data_bulk: {}" .format (table_name ))
287299 orig_paramstyle = self .paramstyle
288300 import csv
289301
290- if len (column_names ) != len (column_indexes ):
291- raise InterfaceError ("Column names and indexes must be the same length" )
292- sql_query = f"INSERT INTO { table_name } ("
293- sql_query += ", " .join (column_names )
294- sql_query += ") VALUES "
295- sql_param_list_template = "(" + ", " .join (["%s" ] * len (column_indexes )) + ")"
302+ if len (column_names ) != len (parameter_indices ):
303+ raise InterfaceError ("Column names and parameter indexes must be the same length" )
304+ base_stmt = f"INSERT INTO { table_name } ("
305+ base_stmt += ", " .join (column_names )
306+ base_stmt += ") VALUES "
307+ sql_param_list_template = "(" + ", " .join (["%s" ] * len (parameter_indices )) + ")"
296308 try :
297309 with open (filename ) as csv_file :
298- reader = csv .reader (csv_file , delimiter = delimeter )
310+ reader = csv .reader (csv_file , delimiter = delimiter )
299311 next (reader )
300- values_list = []
312+ values_list : typing . List [ str ] = []
301313 row_count = 0
302314 for row in reader :
303- for column_index in column_indexes :
315+ if row_count == batch_size :
316+ sql_param_lists = [sql_param_list_template ] * row_count
317+ insert_stmt = base_stmt + ", " .join (sql_param_lists ) + ";"
318+ self .execute (insert_stmt , values_list )
319+ row_count = 0
320+ values_list .clear ()
321+
322+ for column_index in parameter_indices :
304323 values_list .append (row [column_index ])
324+
305325 row_count += 1
306- sql_param_lists = [sql_param_list_template ] * row_count
307- sql_query += ", " .join (sql_param_lists ) + ";"
308- self .execute (sql_query , values_list )
326+
327+ if row_count :
328+ sql_param_lists = [sql_param_list_template ] * row_count
329+ insert_stmt = base_stmt + ", " .join (sql_param_lists ) + ";"
330+ self .execute (insert_stmt , values_list )
331+
309332 except Exception as e :
310333 raise InterfaceError (e )
311334 finally :
0 commit comments