Skip to content

Bring Your Own Custom Transformation Function not triggering #227

@junhonglau

Description

@junhonglau

Custom function is not working

`from pyspark.sql import DataFrame
from pyspark.sql.functions import lit, col, from_json, explode_outer, expr
from pyspark.sql.types import StructType, ArrayType, MapType, VariantType
from src.Config import Config
from src.dataflow_pipeline import DataflowPipeline

def event_hub_parsing(df: DataFrame, _dataFlowSpec) -> DataFrame:
source_format = _dataFlowSpec.sourceFormat
if source_format == 'eventhub':
df = df.withColumn("records", col("value").cast("string"))
.withColumn("parsed_records", expr("parse_json(records)"))
.withColumn("eventdate", expr("to_timestamp(parsed_records:eventdate)"))
.withColumn("eventtype", expr("CAST(parsed_records:eventtype AS STRING)"))
.withColumn('eventstatus', expr("CAST(parsed_records:eventstatus AS STRING)"))
.withColumn('eventcreationtype', expr("CAST(parsed_records:eventcreationtype AS STRING)"))
.withColumn("eh_partition", expr("CAST(partition as int)"))
.withColumn("eh_enqueued_timestamp", expr("timestamp"))
.withColumn("eh_enqueued_date", expr("to_timestamp(timestamp)"))
.withColumn("etl_processed_timestamp", current_timestamp())
.withColumn("etl_rec_uuid", expr("uuid()"))
return df`

layer = spark.conf.get("layer", None) DataflowPipeline.invoke_dlt_pipeline(spark, layer, bronze_custom_transform_func=event_hub_parsing)

Final Table still without the required column in function assume the funciton is never executed
Image

Metadata

Metadata

Labels

bugSomething isn't working

Type

Projects

No projects

Relationships

None yet

Development

No branches or pull requests

Issue actions