@@ -184,19 +184,16 @@ def nan_safe_dtype(dtype, value):
184184 for csv_name in self .key_columns :
185185 dataframe .loc [:, csv_name ] = dataframe [csv_name ].map (self .columns_and_types [csv_name ].dtype )
186186
187- num_columns = 2 + len (dataframe_columns_and_types ) + len (self .additional_fields )
188- value_placeholders = ', ' .join (['%s' ] * num_columns )
189187 col_names = [f'`{ i .sql_name } `' for i in dataframe_columns_and_types + self .additional_fields ]
190- columns = ', ' .join (col_names )
191- updates = ', ' .join (f'{ c } =new_values.{ c } ' for c in col_names )
192- # NOTE: list in `updates` presumes `publication_col_name` is part of the unique key and thus not needed in UPDATE
193- sql = f'INSERT INTO `{ self .table_name } ` (`id`, `{ self .publication_col_name } `, { columns } ) ' \
194- f'VALUES ({ value_placeholders } ) AS new_values ' \
195- f'ON DUPLICATE KEY UPDATE { updates } '
188+ value_placeholders = ', ' .join (['%s' ] * (2 + len (col_names ))) # extra 2 for `id` and `self.publication_col_name` cols
189+ columnstring = ', ' .join (col_names )
190+ sql = f'REPLACE INTO `{ self .table_name } ` (`id`, `{ self .publication_col_name } `, { columnstring } ) VALUES ({ value_placeholders } )'
196191 id_and_publication_date = (0 , publication_date )
192+ num_values = len (dataframe .index )
197193 if logger :
198- logger .info ('updating values' , count = len ( dataframe . index ) )
194+ logger .info ('updating values' , count = num_values )
199195 n = 0
196+ rows_affected = 0
200197 many_values = []
201198 with self .new_cursor () as cursor :
202199 for index , row in dataframe .iterrows ():
@@ -212,6 +209,7 @@ def nan_safe_dtype(dtype, value):
212209 if n % 5_000 == 0 :
213210 try :
214211 cursor .executemany (sql , many_values )
212+ rows_affected += cursor .rowcount
215213 many_values = []
216214 except Exception as e :
217215 if logger :
@@ -220,6 +218,11 @@ def nan_safe_dtype(dtype, value):
220218 # insert final batch
221219 if many_values :
222220 cursor .executemany (sql , many_values )
221+ rows_affected += cursor .rowcount
222+ if logger :
223+ # NOTE: REPLACE INTO marks 2 rows affected for a "replace" (one for a delete and one for a re-insert)
224+ # which allows us to count rows which were updated
225+ logger .info ('rows affected' , total = rows_affected , updated = rows_affected - num_values )
223226
224227 # deal with non/seldomly updated columns used like a fk table (if this database needs it)
225228 if hasattr (self , 'AGGREGATE_KEY_COLS' ):
0 commit comments