|
1 | 1 | # Copyright 2016-2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. |
2 | | -# Licensed under the Amazon Software License (the "License"). You may not use |
3 | | -# this file except in compliance with the License. A copy of the License is |
4 | | -# located at |
5 | | -# |
6 | | -# http://aws.amazon.com/asl/ |
7 | | -# |
8 | | -# and in the "LICENSE" file accompanying this file. This file is distributed |
9 | | -# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, express |
10 | | -# or implied. See the License for the specific language governing |
11 | | -# permissions and limitations under the License. |
12 | | - |
| 2 | +# SPDX-License-Identifier: MIT-0 |
13 | 3 |
|
14 | 4 | from __future__ import print_function |
15 | 5 |
|
16 | | -import logging |
17 | | -import os |
18 | | - |
19 | 6 | from pyspark.sql.functions import lit, struct, array, col, concat |
20 | 7 |
|
21 | 8 | from awsglue.context import GlueContext |
|
24 | 11 | from hive_metastore_migration import * |
25 | 12 |
|
26 | 13 |
|
27 | | -logging.basicConfig() |
28 | | -logger = logging.getLogger(__name__) |
29 | | -logger.setLevel(getattr(logging, os.getenv('LOG_LEVEL', 'INFO'))) |
30 | | - |
31 | | - |
32 | 14 | def transform_df_to_catalog_import_schema(sql_context, glue_context, df_databases, df_tables, df_partitions): |
33 | 15 | df_databases_array = df_databases.select(df_databases['type'], array(df_databases['item']).alias('items')) |
34 | 16 | df_tables_array = df_tables.select(df_tables['type'], df_tables['database'], |
@@ -81,14 +63,14 @@ def metastore_import_from_s3(sql_context, glue_context, db_input_dir, tbl_input_ |
81 | 63 | databases = sql_context.read.json(path=db_input_dir, schema=METASTORE_DATABASE_SCHEMA) |
82 | 64 | tables = sql_context.read.json(path=tbl_input_dir, schema=METASTORE_TABLE_SCHEMA) |
83 | 65 | partitions = sql_context.read.json(path=parts_input_dir, schema=METASTORE_PARTITION_SCHEMA) |
84 | | - |
85 | | - # Changes to Prefix on database |
| 66 | + |
| 67 | + # Changes to Prefix on database |
86 | 68 | if db_prefix: |
87 | 69 | databases = databases.withColumn('item', struct(col('item.description'), col('item.locationUri'), concat(lit(db_prefix),col('item.name')).alias('name'), col('item.parameters'))) |
88 | 70 | tables = tables.withColumn("database",concat(lit(db_prefix),col('database')).alias('database')) |
89 | 71 | partitions = partitions.withColumn("database",concat(lit(db_prefix),col('database')).alias('database')) |
90 | 72 | partitions = partitions.withColumn('item', struct(col('item.creationTime'), col('item.creationTime'), concat(lit(db_prefix),col('item.namespaceName')).alias('namespaceName'), col('item.parameters'), col('item.storageDescriptor'), col('item.values'))) |
91 | | - |
| 73 | + |
92 | 74 |
|
93 | 75 | # load |
94 | 76 | import_datacatalog(sql_context, glue_context, datacatalog_name, databases, tables, partitions, region) |
@@ -146,7 +128,7 @@ def main(): |
146 | 128 | db_input_dir=options['database_input_path'], |
147 | 129 | tbl_input_dir=options['table_input_path'], |
148 | 130 | parts_input_dir=options['partition_input_path'], |
149 | | - db_prefix=options.get('database_prefix') or '', |
| 131 | + db_prefix=options.get('database_prefix') or '', |
150 | 132 | datacatalog_name='datacatalog', |
151 | 133 | region=options.get('region') or 'us-east-1' |
152 | 134 | ) |
|
0 commit comments