1+ from pyspark .sql import SparkSession
2+ from pyspark .sql .types import *
3+ from pyspark .sql .functions import current_timestamp
4+ import sys
5+ import os
6+
7+ def spark_script ():
8+ print ("start..................." )
9+
10+ input_path = os .environ ['input_path' ]
11+ target_path = os .environ ['output_path' ]
12+ s3_bucket = os .environ ['s3_bucket' ]
13+
14+ aws_region = os .environ ['REGION' ]
15+ aws_access_key_id = os .environ ['ACCESS_KEY_ID' ]
16+ aws_secret_access_key = os .environ ['SECRET_ACCESS_KEY' ]
17+ session_token = os .environ ['SESSION_TOKEN' ]
18+
19+
20+ input_path = "s3a://" + s3_bucket + "/" + input_path
21+ target_path = "s3a://" + s3_bucket + "/" + target_path
22+
23+ print (" ******* Input path " ,input_path )
24+ print (" ******* Target path " ,target_path )
25+
26+ spark = SparkSession .builder \
27+ .appName ("Spark-on-AWS-Lambda" ) \
28+ .master ("local[*]" ) \
29+ .config ("spark.driver.bindAddress" , "127.0.0.1" ) \
30+ .config ("spark.driver.memory" , "5g" ) \
31+ .config ("spark.executor.memory" , "5g" ) \
32+ .config ("spark.serializer" , "org.apache.spark.serializer.KryoSerializer" ) \
33+ .config ("spark.sql.hive.convertMetastoreParquet" , "false" ) \
34+ .config ("spark.hadoop.hive.metastore.client.factory.class" , "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory" ) \
35+ .config ("hoodie.meta.sync.client.tool.class" , "org.apache.hudi.aws.sync.AwsGlueCatalogSyncTool" ) \
36+ .config ("spark.hadoop.fs.s3a.access.key" , aws_access_key_id ) \
37+ .config ("spark.hadoop.fs.s3a.secret.key" , aws_secret_access_key ) \
38+ .config ("spark.hadoop.fs.s3a.session.token" ,session_token ) \
39+ .config ("spark.hadoop.fs.s3a.aws.credentials.provider" ,"org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider" ) \
40+ .enableHiveSupport ().getOrCreate ()
41+
42+
43+
44+ print ("Started Reading the CSV file from S3 location " ,input_path )
45+
46+ df = spark .read .option ('header' ,'true' ).csv (input_path )
47+ df = df .withColumn ("last_upd_timestamp" , current_timestamp ())
48+ df .show ()
49+
50+ hudi_options = {
51+ 'hoodie.table.name' : 'customer_table' ,
52+ 'hoodie.datasource.write.recordkey.field' : 'Customer_ID' ,
53+ 'hoodie.datasource.write.precombine.field' : 'last_upd_timestamp' ,
54+ 'hoodie.insert.shuffle.parallelism' : 2 ,
55+ "hoodie.datasource.hive_sync.enable" : "false" ,
56+ "hoodie.datasource.hive_sync.database" : "default" ,
57+ "hoodie.datasource.hive_sync.table" : "customer_table" ,
58+ "hoodie.datasource.hive_sync.use_jdbc" : "false" ,
59+ "hoodie.datasource.hive_sync.mode" : "hms" ,
60+ "hoodie.write.markers.type" :"direct" , # It's not advisable to use this configuration. Working on workaround without using this config.
61+ "hoodie.embed.timeline.server" :"false" # It's not advisable to use this configuration. Working on workaround without using this config.
62+ }
63+
64+ print ("Started Writing the CSV file to Target hudi table " , target_path )
65+ df .write .format ("hudi" ).options (** hudi_options ).mode ("overwrite" ).save (target_path )
66+ # df.write.format("csv").save(target_path)
67+
68+ if __name__ == '__main__' :
69+ spark_script ()
0 commit comments