-
Notifications
You must be signed in to change notification settings - Fork 242
Description
Hello,
If you have a dataframe which has a large number of fields (over 100) then when the memorysafecursorfetcher gets the rows it will trigger two calls to fetchmany. When the cursor is a spark_df_cursor the fetchmany function isn't deterministic and due to the nature of spark queries being lazy results in the dataframe being received back possibly not being the desired rows.
def fetchmany(self, size: int) -> tuple[tuple]:
rows = []
self.rowcount = self.df.count()
spark_rows: list[Row] = self.df.offset(self.cursor_index).limit(size).collect()
self.cursor_index += len(spark_rows)
for spark_row in spark_rows:
row = self.convert_spark_row_to_dbapi_row(spark_row)
rows.append(row)
return tuple(rows)
The problem code is that the self.df is recalculated each time and thus there is a chance that when the offset is used on the second run that the fields have changed order.
The possible solutions to this are either to rely on the PYSPARK DESCRIBE on a table having known column names and ordering by these:
def sql_get_table_columns(
self,
table_name: str,
included_columns: list[str] | None = None,
excluded_columns: list[str] | None = None,
):
return f"DESCRIBE {table_name} ORDERBY 'col_name'"
The orderby being the addition.
or to add the orderby in the fetch many
def fetchmany(self, size: int) -> tuple[tuple]:
rows = []
self.rowcount = self.df.count()
spark_rows: list[Row] = self.df.orderBy("col_name").offset(self.cursor_index).limit(size).collect()
self.cursor_index += len(spark_rows)
for spark_row in spark_rows:
row = self.convert_spark_row_to_dbapi_row(spark_row)
rows.append(row)
return tuple(rows)
I believe this is the explanation for #2129 but wasn't entirely sure so thought a separate issue would be good.