-
Notifications
You must be signed in to change notification settings - Fork 96
Description
Custom function is not working
`from pyspark.sql import DataFrame
from pyspark.sql.functions import lit, col, from_json, explode_outer, expr
from pyspark.sql.types import StructType, ArrayType, MapType, VariantType
from src.Config import Config
from src.dataflow_pipeline import DataflowPipeline
def event_hub_parsing(df: DataFrame, _dataFlowSpec) -> DataFrame:
source_format = _dataFlowSpec.sourceFormat
if source_format == 'eventhub':
df = df.withColumn("records", col("value").cast("string"))
.withColumn("parsed_records", expr("parse_json(records)"))
.withColumn("eventdate", expr("to_timestamp(parsed_records:eventdate)"))
.withColumn("eventtype", expr("CAST(parsed_records:eventtype AS STRING)"))
.withColumn('eventstatus', expr("CAST(parsed_records:eventstatus AS STRING)"))
.withColumn('eventcreationtype', expr("CAST(parsed_records:eventcreationtype AS STRING)"))
.withColumn("eh_partition", expr("CAST(partition as int)"))
.withColumn("eh_enqueued_timestamp", expr("timestamp"))
.withColumn("eh_enqueued_date", expr("to_timestamp(timestamp)"))
.withColumn("etl_processed_timestamp", current_timestamp())
.withColumn("etl_rec_uuid", expr("uuid()"))
return df`
layer = spark.conf.get("layer", None) DataflowPipeline.invoke_dlt_pipeline(spark, layer, bronze_custom_transform_func=event_hub_parsing)
Final Table still without the required column in function assume the funciton is never executed