Skip to content

Commit 74dced2

Browse files
authored
Merge pull request #30 from awslabs/spark-with-glue-table
Add Spark.create_glue_table() and Athena.repair_table()
2 parents 54a0d2c + d33ba61 commit 74dced2

File tree

10 files changed

+259
-20
lines changed

10 files changed

+259
-20
lines changed

README.md

Lines changed: 32 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,12 @@
2222
* Pandas -> Redshift (Parallel)
2323
* CSV (S3) -> Pandas (One shot or Batching)
2424
* Athena -> Pandas (One shot or Batching)
25-
* CloudWatch Logs Insights -> Pandas (NEW :star:)
26-
* Encrypt Pandas Dataframes on S3 with KMS keys (NEW :star:)
25+
* CloudWatch Logs Insights -> Pandas
26+
* Encrypt Pandas Dataframes on S3 with KMS keys
2727

2828
### PySpark
29-
* PySpark -> Redshift (Parallel) (NEW :star:)
29+
* PySpark -> Redshift (Parallel)
30+
* Register Glue table from Dataframe stored on S3 (NEW :star:)
3031

3132
### General
3233
* List S3 objects (Parallel)
@@ -35,7 +36,8 @@
3536
* Delete NOT listed S3 objects (Parallel)
3637
* Copy listed S3 objects (Parallel)
3738
* Get the size of S3 objects (Parallel)
38-
* Get CloudWatch Logs Insights query results (NEW :star:)
39+
* Get CloudWatch Logs Insights query results
40+
* Load partitions on Athena/Glue table (repair table) (NEW :star:)
3941

4042
## Installation
4143

@@ -166,6 +168,23 @@ session.spark.to_redshift(
166168
)
167169
```
168170

171+
#### Register Glue table from Dataframe stored on S3
172+
173+
```py3
174+
dataframe.write \
175+
.mode("overwrite") \
176+
.format("parquet") \
177+
.partitionBy(["year", "month"]) \
178+
.save(compression="gzip", path="s3://...")
179+
session = awswrangler.Session(spark_session=spark)
180+
session.spark.create_glue_table(dataframe=dataframe,
181+
file_format="parquet",
182+
partition_by=["year", "month"],
183+
path="s3://...",
184+
compression="gzip",
185+
database="my_database")
186+
```
187+
169188
### General
170189

171190
#### Deleting a bunch of S3 objects (parallel :rocket:)
@@ -185,6 +204,13 @@ results = session.cloudwatchlogs.query(
185204
)
186205
```
187206

207+
#### Load partitions on Athena/Glue table (repair table)
208+
209+
```py3
210+
session = awswrangler.Session()
211+
session.athena.repair_table(database="db_name", table="tbl_name")
212+
```
213+
188214
## Diving Deep
189215

190216
### Pandas to Redshift Flow
@@ -217,13 +243,13 @@ results = session.cloudwatchlogs.query(
217243

218244
* Fork the AWS Data Wrangler repository and clone that into your development environment
219245

220-
* Go to the project's directory create a Python's virtual environment for the project (**python -m venv venv && source source venv/bin/activate**)
246+
* Go to the project's directory create a Python's virtual environment for the project (**python -m venv venv && source venv/bin/activate**)
221247

222248
* Run **./install-dev.sh**
223249

224250
* Go to the *testing* directory
225251

226-
* Configure the parameters.json file with your AWS environment infos (Make sure that your Redshift will not be open for the World!)
252+
* Configure the parameters.json file with your AWS environment infos (Make sure that your Redshift will not be open for the World! Configure your security group to only give access for your IP.)
227253

228254
* Deploy the Cloudformation stack **./deploy-cloudformation.sh**
229255

awswrangler/athena.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,3 +118,25 @@ def wait_query(self, query_execution_id):
118118
raise QueryCancelled(
119119
response["QueryExecution"]["Status"].get("StateChangeReason"))
120120
return response
121+
122+
def repair_table(self, database, table, s3_output=None):
123+
"""
124+
Hive's metastore consistency check
125+
"MSCK REPAIR TABLE table;"
126+
Recovers partitions and data associated with partitions.
127+
Use this statement when you add partitions to the catalog.
128+
It is possible it will take some time to add all partitions.
129+
If this operation times out, it will be in an incomplete state
130+
where only a few partitions are added to the catalog.
131+
132+
:param database: Glue database name
133+
:param table: Glue table name
134+
:param s3_output: AWS S3 path
135+
:return: Query execution ID
136+
"""
137+
query = f"MSCK REPAIR TABLE {table};"
138+
query_id = self.run_query(query=query,
139+
database=database,
140+
s3_output=s3_output)
141+
self.wait_query(query_execution_id=query_id)
142+
return query_id

awswrangler/glue.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ def metadata_to_glue(self,
141141
partition_cols=partition_cols,
142142
preserve_index=preserve_index,
143143
cast_columns=cast_columns)
144-
table = table if table else Glue._parse_table_name(path)
144+
table = table if table else Glue.parse_table_name(path)
145145
table = table.lower().replace(".", "_")
146146
if mode == "overwrite":
147147
self.delete_table_if_exists(database=database, table=table)
@@ -301,7 +301,7 @@ def _build_schema(dataframe,
301301
return schema_built, partition_cols_schema_built
302302

303303
@staticmethod
304-
def _parse_table_name(path):
304+
def parse_table_name(path):
305305
if path[-1] == "/":
306306
path = path[:-1]
307307
return path.rpartition("/")[2]

awswrangler/pandas.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -576,12 +576,15 @@ def to_s3(self,
576576
"""
577577
if compression is not None:
578578
compression = compression.lower()
579+
file_format = file_format.lower()
580+
if file_format not in ["parquet", "csv"]:
581+
raise UnsupportedFileFormat(file_format)
579582
if file_format == "csv":
580583
if compression not in Pandas.VALID_CSV_COMPRESSIONS:
581584
raise InvalidCompression(
582585
f"{compression} isn't a valid CSV compression. Try: {Pandas.VALID_CSV_COMPRESSIONS}"
583586
)
584-
if file_format == "parquet":
587+
elif file_format == "parquet":
585588
if compression not in Pandas.VALID_PARQUET_COMPRESSIONS:
586589
raise InvalidCompression(
587590
f"{compression} isn't a valid PARQUET compression. Try: {Pandas.VALID_PARQUET_COMPRESSIONS}"
@@ -641,9 +644,6 @@ def data_to_s3(self,
641644
logger.debug(f"procs_io_bound: {procs_io_bound}")
642645
if path[-1] == "/":
643646
path = path[:-1]
644-
file_format = file_format.lower()
645-
if file_format not in ["parquet", "csv"]:
646-
raise UnsupportedFileFormat(file_format)
647647
objects_paths = []
648648
if procs_cpu_bound > 1:
649649
bounders = _get_bounders(dataframe=dataframe,

awswrangler/spark.py

Lines changed: 68 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from pyspark.sql.functions import floor, rand
77
from pyspark.sql.types import TimestampType
88

9-
from awswrangler.exceptions import MissingBatchDetected
9+
from awswrangler.exceptions import MissingBatchDetected, UnsupportedFileFormat
1010

1111
logger = logging.getLogger(__name__)
1212

@@ -142,3 +142,70 @@ def write(pandas_dataframe):
142142
)
143143
dataframe.unpersist()
144144
self._session.s3.delete_objects(path=path)
145+
146+
def create_glue_table(self,
147+
database,
148+
path,
149+
dataframe,
150+
file_format,
151+
compression,
152+
table=None,
153+
serde=None,
154+
sep=",",
155+
partition_by=None,
156+
load_partitions=True,
157+
replace_if_exists=True):
158+
"""
159+
Create a Glue metadata table pointing for some dataset stored on AWS S3.
160+
161+
:param dataframe: PySpark Dataframe
162+
:param file_format: File format (E.g. "parquet", "csv")
163+
:param partition_by: Columns used for partitioning
164+
:param path: AWS S3 path
165+
:param compression: Compression (e.g. gzip, snappy, lzo, etc)
166+
:param sep: Separator token for CSV formats (e.g. ",", ";", "|")
167+
:param serde: Serializer/Deserializer (e.g. "OpenCSVSerDe", "LazySimpleSerDe")
168+
:param database: Glue database name
169+
:param table: Glue table name. If not passed, extracted from the path
170+
:param load_partitions: Load partitions after the table creation
171+
:param replace_if_exists: Drop table and recreates that if already exists
172+
:return: None
173+
"""
174+
file_format = file_format.lower()
175+
if file_format not in ["parquet", "csv"]:
176+
raise UnsupportedFileFormat(file_format)
177+
table = table if table else self._session.glue.parse_table_name(path)
178+
table = table.lower().replace(".", "_")
179+
logger.debug(f"table: {table}")
180+
full_schema = dataframe.dtypes
181+
if partition_by is None:
182+
partition_by = []
183+
schema = [x for x in full_schema if x[0] not in partition_by]
184+
partitions_schema_tmp = {
185+
x[0]: x[1]
186+
for x in full_schema if x[0] in partition_by
187+
}
188+
partitions_schema = [(x, partitions_schema_tmp[x])
189+
for x in partition_by]
190+
logger.debug(f"schema: {schema}")
191+
logger.debug(f"partitions_schema: {partitions_schema}")
192+
if replace_if_exists is not None:
193+
self._session.glue.delete_table_if_exists(database=database,
194+
table=table)
195+
extra_args = {}
196+
if file_format == "csv":
197+
extra_args["sep"] = sep
198+
if serde is None:
199+
serde = "OpenCSVSerDe"
200+
extra_args["serde"] = serde
201+
self._session.glue.create_table(
202+
database=database,
203+
table=table,
204+
schema=schema,
205+
partition_cols_schema=partitions_schema,
206+
path=path,
207+
file_format=file_format,
208+
compression=compression,
209+
extra_args=extra_args)
210+
if load_partitions:
211+
self._session.athena.repair_table(database=database, table=table)

docs/source/contributing.rst

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,13 @@ Step-by-step
2424

2525
* Fork the AWS Data Wrangler repository and clone that into your development environment
2626

27-
* Go to the project's directory create a Python's virtual environment for the project (**python -m venv venv && source source venv/bin/activate**)
27+
* Go to the project's directory create a Python's virtual environment for the project (**python -m venv venv && source venv/bin/activate**)
2828

2929
* Run **./install-dev.sh**
3030

3131
* Go to the *testing* directory
3232

33-
* Configure the parameters.json file with your AWS environment infos (Make sure that your Redshift will not be open for the World!)
33+
* Configure the parameters.json file with your AWS environment infos (Make sure that your Redshift will not be open for the World! Configure your security group to only give access for your IP.)
3434

3535
* Deploy the Cloudformation stack **./deploy-cloudformation.sh**
3636

docs/source/examples.rst

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,24 @@ Loading Pyspark Dataframe to Redshift
139139
mode="append",
140140
)
141141
142+
Register Glue table from Dataframe stored on S3
143+
```````````````````````````````````````````````
144+
145+
.. code-block:: python
146+
147+
dataframe.write \
148+
.mode("overwrite") \
149+
.format("parquet") \
150+
.partitionBy(["year", "month"]) \
151+
.save(compression="gzip", path="s3://...")
152+
session = awswrangler.Session(spark_session=spark)
153+
session.spark.create_glue_table(dataframe=dataframe,
154+
file_format="parquet",
155+
partition_by=["year", "month"],
156+
path="s3://...",
157+
compression="gzip",
158+
database="my_database")
159+
142160
General
143161
-------
144162

@@ -160,3 +178,11 @@ Get CloudWatch Logs Insights query results
160178
log_group_names=[LOG_GROUP_NAME],
161179
query="fields @timestamp, @message | sort @timestamp desc | limit 5",
162180
)
181+
182+
Load partitions on Athena/Glue table (repair table)
183+
```````````````````````````````````````````````````
184+
185+
.. code-block:: python
186+
187+
session = awswrangler.Session()
188+
session.athena.repair_table(database="db_name", table="tbl_name")

docs/source/index.rst

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,13 @@ Pandas
2020
* Pandas -> Redshift (Parallel)
2121
* CSV (S3) -> Pandas (One shot or Batching)
2222
* Athena -> Pandas (One shot or Batching)
23-
* CloudWatch Logs Insights -> Pandas (NEW)
24-
* Encrypt Pandas Dataframes on S3 with KMS keys (NEW)
23+
* CloudWatch Logs Insights -> Pandas
24+
* Encrypt Pandas Dataframes on S3 with KMS keys
2525

2626
PySpark
2727
```````
28-
* PySpark -> Redshift (Parallel) (NEW)
28+
* PySpark -> Redshift (Parallel)
29+
* Register Glue table from Dataframe stored on S3 (NEW)
2930

3031
General
3132
```````
@@ -35,7 +36,8 @@ General
3536
* Delete NOT listed S3 objects (Parallel)
3637
* Copy listed S3 objects (Parallel)
3738
* Get the size of S3 objects (Parallel)
38-
* Get CloudWatch Logs Insights query results (NEW)
39+
* Get CloudWatch Logs Insights query results
40+
* Load partitions on Athena/Glue table (repair table) (NEW)
3941

4042

4143
Table Of Contents

testing/template.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,7 @@ Resources:
131131
Properties:
132132
CatalogId: !Ref AWS::AccountId
133133
DatabaseInput:
134+
Name: awswrangler_test
134135
Description: AWS Data Wrangler Test Arena - Glue Database
135136

136137
LogGroup:

0 commit comments

Comments
 (0)