44import sys
55import tempfile
66import copy
7-
8-
97from pyspark .sql .functions import lit , expr
108import pyspark .sql .types as T
119from pyspark .sql import DataFrame
2018dlt = MagicMock ()
2119dlt .expect_all_or_drop = MagicMock ()
2220dlt .apply_changes_from_snapshot = MagicMock ()
23-
24-
25-
26-
27-
2821raw_delta_table_stream = MagicMock ()
2922
3023
@@ -198,12 +191,6 @@ def test_invoke_dlt_pipeline_silver_positive(self, run_dlt):
198191 f"{ database } .{ silver_dataflow_table } " ,
199192 )
200193 self .spark .sql ("CREATE DATABASE IF NOT EXISTS bronze" )
201-
202-
203-
204-
205-
206-
207194 options = {"rescuedDataColumn" : "_rescued_data" , "inferColumnTypes" : "true" , "multiline" : True }
208195 customers_parquet_df = self .spark .read .options (** options ).json ("tests/resources/data/customers" )
209196 (customers_parquet_df .withColumn ("_rescued_data" , lit ("Test" )).write .format ("delta" )
@@ -229,7 +216,6 @@ def test_run_dlt_pipeline_silver_positive(self, read):
229216 silver_spec_map .update (source_details )
230217 silver_dataflow_spec = SilverDataflowSpec (** silver_spec_map )
231218 self .spark .sql ("CREATE DATABASE IF NOT EXISTS bronze" )
232-
233219 options = {"rescuedDataColumn" : "_rescued_data" , "inferColumnTypes" : "true" , "multiline" : True }
234220 customers_parquet_df = self .spark .read .options (** options ).json ("tests/resources/data/customers" )
235221 (customers_parquet_df .withColumn ("_rescued_data" , lit ("Test" )).write .format ("delta" )
@@ -291,9 +277,6 @@ def test_get_silver_schema_positive(self):
291277 silver_spec_map .update (source_details )
292278 silver_dataflow_spec = SilverDataflowSpec (** silver_spec_map )
293279 self .spark .sql ("CREATE DATABASE IF NOT EXISTS bronze" )
294-
295-
296-
297280 options = {"rescuedDataColumn" : "_rescued_data" , "inferColumnTypes" : "true" , "multiline" : True }
298281 customers_parquet_df = self .spark .read .options (** options ).json ("tests/resources/data/customers" )
299282 (customers_parquet_df .withColumn ("_rescued_data" , lit ("Test" )).write .format ("delta" )
@@ -318,9 +301,6 @@ def test_get_silver_schema_where_clause(self):
318301 silver_dataflow_spec = SilverDataflowSpec (** silver_spec_map )
319302
320303 self .spark .sql ("CREATE DATABASE IF NOT EXISTS bronze" )
321-
322-
323-
324304 options = {"rescuedDataColumn" : "_rescued_data" , "inferColumnTypes" : "true" , "multiline" : True }
325305 customers_parquet_df = self .spark .read .options (** options ).json ("tests/resources/data/customers" )
326306 (customers_parquet_df .withColumn ("_rescued_data" , lit ("Test" )).write .format ("delta" )
@@ -354,9 +334,6 @@ def test_read_silver_positive(self):
354334 }
355335 silver_spec_map .update (source_details )
356336 self .spark .sql ("CREATE DATABASE IF NOT EXISTS bronze" )
357-
358-
359-
360337 options = {"rescuedDataColumn" : "_rescued_data" , "inferColumnTypes" : "true" , "multiline" : True }
361338 customers_parquet_df = self .spark .read .options (** options ).json ("tests/resources/data/customers" )
362339 (customers_parquet_df .withColumn ("_rescued_data" , lit ("Test" )).write .format ("delta" )
@@ -400,9 +377,6 @@ def test_read_silver_with_where(self, get_silver_schema):
400377 }
401378 silver_spec_map .update (source_details )
402379 self .spark .sql ("CREATE DATABASE IF NOT EXISTS bronze" )
403-
404-
405-
406380 options = {"rescuedDataColumn" : "_rescued_data" , "inferColumnTypes" : "true" , "multiline" : True }
407381 customers_parquet_df = self .spark .read .options (** options ).json ("tests/resources/data/customers" )
408382 (customers_parquet_df .withColumn ("_rescued_data" , lit ("Test" )).write .format ("delta" )
@@ -464,7 +438,6 @@ def test_cdc_apply_changes_scd_type2(self, cdc_apply_changes):
464438 silver_dataflow_spec = SilverDataflowSpec (** silver_spec_map )
465439 silver_dataflow_spec .cdcApplyChanges = json .dumps (self .silver_cdc_apply_changes_scd2 )
466440 self .spark .sql ("CREATE DATABASE IF NOT EXISTS bronze" )
467-
468441 options = {"rescuedDataColumn" : "_rescued_data" , "inferColumnTypes" : "true" , "multiline" : True }
469442 customers_parquet_df = self .spark .read .options (** options ).json ("tests/resources/data/customers" )
470443 (customers_parquet_df .withColumn ("_rescued_data" , lit ("Test" )).write .format ("delta" )
@@ -1338,4 +1311,3 @@ def test_get_silver_schema_uc_disabled(self, mock_read_stream):
13381311 # format="delta"
13391312 # )
13401313 # mock_read_stream.load.return_value.selectExpr.assert_called_once_with(*silver_dataflow_spec.selectExp)
1341-
0 commit comments