From 951b31ce7af0d90c2bea8ae16511e42a38e1bba4 Mon Sep 17 00:00:00 2001 From: sudharshanr Date: Mon, 2 Dec 2024 07:48:38 +0530 Subject: [PATCH 1/5] initial commit to support parquet --- .../xtable/model/storage/TableFormat.java | 3 +- xtable-core/pom.xml | 6 + .../parquet/ParquetConversionSource.java | 204 ++++++++++++++++++ .../ParquetConversionSourceProvider.java | 36 ++++ .../xtable/parquet/ParquetTableExtractor.java | 72 +++++++ .../resources/xtable-conversion-defaults.yaml | 2 + 6 files changed, 322 insertions(+), 1 deletion(-) create mode 100644 xtable-core/src/main/java/org/apache/xtable/parquet/ParquetConversionSource.java create mode 100644 xtable-core/src/main/java/org/apache/xtable/parquet/ParquetConversionSourceProvider.java create mode 100644 xtable-core/src/main/java/org/apache/xtable/parquet/ParquetTableExtractor.java diff --git a/xtable-api/src/main/java/org/apache/xtable/model/storage/TableFormat.java b/xtable-api/src/main/java/org/apache/xtable/model/storage/TableFormat.java index bea0b4774..22d26277e 100644 --- a/xtable-api/src/main/java/org/apache/xtable/model/storage/TableFormat.java +++ b/xtable-api/src/main/java/org/apache/xtable/model/storage/TableFormat.java @@ -27,8 +27,9 @@ public class TableFormat { public static final String HUDI = "HUDI"; public static final String ICEBERG = "ICEBERG"; public static final String DELTA = "DELTA"; + public static final String PARQUET="PARQUET"; public static String[] values() { - return new String[] {"HUDI", "ICEBERG", "DELTA"}; + return new String[] {"HUDI", "ICEBERG", "DELTA","PARQUET"}; } } diff --git a/xtable-core/pom.xml b/xtable-core/pom.xml index f277495e7..5c3a2359e 100644 --- a/xtable-core/pom.xml +++ b/xtable-core/pom.xml @@ -173,5 +173,11 @@ log4j-slf4j2-impl test + + + org.apache.parquet + parquet-avro + + diff --git a/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetConversionSource.java b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetConversionSource.java new file mode 100644 index 000000000..7e3d63266 --- /dev/null +++ b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetConversionSource.java @@ -0,0 +1,204 @@ +package org.apache.xtable.parquet; + +import lombok.Builder; +import lombok.NonNull; +import org.apache.avro.Schema; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import org.apache.parquet.column.statistics.Statistics; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.schema.MessageType; +import org.apache.xtable.avro.AvroSchemaConverter; + +import org.apache.xtable.model.*; +import org.apache.xtable.model.schema.InternalField; +import org.apache.xtable.model.schema.InternalPartitionField; +import org.apache.xtable.model.schema.InternalSchema; +import org.apache.xtable.model.stat.ColumnStat; +import org.apache.xtable.model.stat.Range; +import org.apache.xtable.model.storage.DataFilesDiff; +import org.apache.xtable.model.storage.DataLayoutStrategy; +import org.apache.xtable.model.storage.InternalDataFile; +import org.apache.xtable.model.storage.TableFormat; +import org.apache.xtable.spi.extractor.ConversionSource; + +import java.io.IOException; +import java.time.Instant; +import java.util.*; +import java.util.stream.Collectors; + +@Builder +public class ParquetConversionSource implements ConversionSource { + + private final String tableName; + private final String basePath; + @NonNull + private final Configuration hadoopConf; + +// @Getter(value = AccessLevel.PACKAGE) + // private final FileStatus latestFileStatus = initFileStatus(); + + @Builder.Default + private static final AvroSchemaConverter schemaExtractor = AvroSchemaConverter.getInstance(); + + private FileStatus initFileStatus() { + try { + FileSystem fs = FileSystem.get(hadoopConf); + Optional latestFile =Arrays.stream(fs.listStatus(new Path(basePath))). + filter(FileStatus::isFile) + .max(Comparator.comparing(FileStatus::getModificationTime)); + System.out.println("Latest file "+latestFile.get()); + return latestFile.get(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public InternalTable getTable(Long aLong) { + ParquetMetadata metadata = null; + try { + FileSystem fs = FileSystem.get(hadoopConf); + Optional latestFile =Arrays.stream(fs.listStatus(new Path(basePath))). + filter(FileStatus::isFile) + .max(Comparator.comparing(FileStatus::getModificationTime)); + Path p = latestFile.get().getPath(); + metadata = ParquetFileReader.readFooter(hadoopConf,p); + MessageType messageType = metadata.getFileMetaData().getSchema(); + org.apache.parquet.avro.AvroSchemaConverter avroSchemaConverter = new org.apache.parquet.avro.AvroSchemaConverter(); + Schema avroSchema = avroSchemaConverter.convert(messageType); + InternalSchema schema = schemaExtractor.toInternalSchema(avroSchema); + List partitionFields = Collections.emptyList(); + DataLayoutStrategy dataLayoutStrategy = DataLayoutStrategy.FLAT; + return InternalTable.builder() + .tableFormat(TableFormat.PARQUET) + .basePath(basePath) + .name(tableName) + .layoutStrategy(dataLayoutStrategy) + .partitioningFields(partitionFields) + .readSchema(schema) + .latestCommitTime(Instant.ofEpochMilli(latestFile.get().getModificationTime())) + .build(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public InternalSnapshot getCurrentSnapshot() { + InternalTable table = getTable(-1L); + + return InternalSnapshot.builder() + .table(table) + .partitionedDataFiles(Collections.emptyList()) + .build(); + } + + @Override + public TableChange getTableChangeForCommit(Long aLong) { + try { + FileSystem fs = FileSystem.get(hadoopConf); + List tableChanges =Arrays.stream(fs.listStatus(new Path(basePath))). + filter(FileStatus::isFile) + .filter(fileStatus -> fileStatus.getModificationTime() > aLong) + .collect(Collectors.toList()); + InternalTable internalTable = getTable(-1L); + Set internalDataFiles = new HashSet<>(); + + for(FileStatus tableStatus : tableChanges) + { + internalDataFiles.add( + InternalDataFile.builder() + .physicalPath(tableStatus.getPath().toString()) + .partitionValues(Collections.emptyList()) + .lastModified(tableStatus.getModificationTime()) + .fileSizeBytes(tableStatus.getLen()) + .columnStats(getColumnStatsForaFile(tableStatus,internalTable)) + .build() + ); + + } + + return TableChange.builder().tableAsOfChange(internalTable) + .filesDiff(DataFilesDiff.builder().filesAdded(internalDataFiles).build()) + .build(); + + + } catch (IOException e) { + throw new RuntimeException(e); + } + + + } + + @Override + public CommitsBacklog getCommitsBacklog(InstantsForIncrementalSync instantsForIncrementalSync) { + System.out.println("getCommitsBacklog is called even for full sync"+instantsForIncrementalSync); + List commitsToProcess = Collections.singletonList(instantsForIncrementalSync.getLastSyncInstant().toEpochMilli()); + + return CommitsBacklog.builder().commitsToProcess(commitsToProcess).build(); + } + + @Override + public boolean isIncrementalSyncSafeFrom(Instant instant) { + return true; + } + + @Override + public void close() throws IOException { + + } + + private List getColumnStatsForaFile(FileStatus fileStatus, InternalTable internalTable) + { + try { + List columnStats = new ArrayList<>(); + ParquetMetadata parquetMetadata =ParquetFileReader.readFooter(hadoopConf,fileStatus.getPath()); + + for(InternalField field : internalTable.getReadSchema().getAllFields()) + { + Optional columnStatistics = parquetMetadata.getBlocks().stream() + .flatMap(block -> block.getColumns().stream()) + .filter(column -> + { + System.out.println("column "+column.getPath().toString()); + System.out.println("column "+column.getPath().toDotString()); + System.out.println("field.getPath() "+field.getPath()); + + return column.getPath().toDotString().equals(field.getPath()); + } ) + .map(column -> column.getStatistics()) + .reduce((rg1,rg2) -> { + System.out.println("rg1 "+rg1.genericGetMin()); + System.out.println("rg2 "+rg2.genericGetMin()); + + rg1.mergeStatistics(rg2); + System.out.println("rg1 "+rg1.genericGetMin()); + + return rg1; + }); + if(!columnStatistics.isPresent()) + { + System.out.println("Column stats null for "+field.getPath()); + System.out.println(""); + } + columnStats.add( + ColumnStat.builder() + .field(field) + .numNulls(columnStatistics.get().getNumNulls()) + .range(Range.vector(columnStatistics.get().genericGetMin(),columnStatistics.get().genericGetMax())) + .build() + ) ; + } +System.out.println("sijze of column stats "+columnStats.get(0).getField()+" "+columnStats.get(0).getRange()); + } catch (IOException e) { + throw new RuntimeException(e); + } + + return Collections.emptyList(); + } +} diff --git a/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetConversionSourceProvider.java b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetConversionSourceProvider.java new file mode 100644 index 000000000..8cb2ca2c7 --- /dev/null +++ b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetConversionSourceProvider.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.parquet; + +import org.apache.hadoop.conf.Configuration; +import org.apache.xtable.conversion.ConversionSourceProvider; +import org.apache.xtable.conversion.SourceTable; + +/** A concrete implementation of {@link ConversionSourceProvider} for Delta Lake table format. */ +public class ParquetConversionSourceProvider extends ConversionSourceProvider { + @Override + public ParquetConversionSource getConversionSourceInstance(SourceTable sourceTable) { + +return ParquetConversionSource.builder() + .tableName(sourceTable.getName()) + .basePath(sourceTable.getBasePath()) + .hadoopConf(new Configuration()) + .build(); + } +} diff --git a/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetTableExtractor.java b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetTableExtractor.java new file mode 100644 index 000000000..f1bd41fb3 --- /dev/null +++ b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetTableExtractor.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.parquet; + +import lombok.Builder; +import org.apache.avro.Schema; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.schema.MessageType; +import org.apache.spark.sql.delta.DeltaLog; +import org.apache.spark.sql.delta.Snapshot; +import org.apache.xtable.avro.AvroSchemaConverter; +import org.apache.xtable.delta.DeltaPartitionExtractor; +import org.apache.xtable.delta.DeltaSchemaExtractor; +import org.apache.xtable.model.InternalTable; +import org.apache.xtable.model.schema.InternalPartitionField; +import org.apache.xtable.model.schema.InternalSchema; +import org.apache.xtable.model.storage.DataLayoutStrategy; +import org.apache.xtable.model.storage.TableFormat; +import scala.Option; + +import java.io.IOException; +import java.time.Instant; +import java.util.Collections; +import java.util.List; + +/** + * Extracts {@link InternalTable} canonical representation of a table at a point in time for Delta. + */ +@Builder +public class ParquetTableExtractor { + @Builder.Default + private static final AvroSchemaConverter schemaExtractor = AvroSchemaConverter.getInstance(); + + public InternalTable table(Configuration conf, String basePath, String tableName, Long latestFileTimeStamp) throws IOException { + + ParquetMetadata metadata = ParquetFileReader.readFooter(conf,new Path(basePath)); + MessageType messageType = metadata.getFileMetaData().getSchema(); + org.apache.parquet.avro.AvroSchemaConverter avroSchemaConverter = new org.apache.parquet.avro.AvroSchemaConverter(); + Schema avroSchema = avroSchemaConverter.convert(messageType); + InternalSchema schema = schemaExtractor.toInternalSchema(avroSchema); + List partitionFields =Collections.emptyList(); + DataLayoutStrategy dataLayoutStrategy = DataLayoutStrategy.FLAT; + return InternalTable.builder() + .tableFormat(TableFormat.PARQUET) + .basePath(basePath) + .name(tableName) + .layoutStrategy(dataLayoutStrategy) + .partitioningFields(partitionFields) + .readSchema(schema) + .latestCommitTime(Instant.ofEpochMilli(latestFileTimeStamp)) + .build(); + } +} diff --git a/xtable-utilities/src/main/resources/xtable-conversion-defaults.yaml b/xtable-utilities/src/main/resources/xtable-conversion-defaults.yaml index c80c939bf..0b206d85c 100644 --- a/xtable-utilities/src/main/resources/xtable-conversion-defaults.yaml +++ b/xtable-utilities/src/main/resources/xtable-conversion-defaults.yaml @@ -27,6 +27,8 @@ ## configuration: A map of configuration values specific to this converter. tableFormatConverters: + PARQUET: + conversionSourceProviderClass: org.apache.xtable.parquet.ParquetConversionSourceProvider HUDI: conversionSourceProviderClass: org.apache.xtable.hudi.HudiConversionSourceProvider DELTA: From 24efa0fc8468b4ee5a1bb69ef582b5743a4a9452 Mon Sep 17 00:00:00 2001 From: sudharshanr Date: Thu, 5 Dec 2024 07:22:12 +0530 Subject: [PATCH 2/5] Code enhancements --- .../parquet/ParquetConversionSource.java | 463 ++++++++++++------ .../ParquetInternalDataFileConvertor.java | 5 + .../parquet/ParquetPartitionExtractor.java | 85 ++++ .../resources/xtable-conversion-defaults.yaml | 1 + 4 files changed, 409 insertions(+), 145 deletions(-) create mode 100644 xtable-core/src/main/java/org/apache/xtable/parquet/ParquetInternalDataFileConvertor.java create mode 100644 xtable-core/src/main/java/org/apache/xtable/parquet/ParquetPartitionExtractor.java diff --git a/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetConversionSource.java b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetConversionSource.java index 7e3d63266..37189a859 100644 --- a/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetConversionSource.java +++ b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetConversionSource.java @@ -4,9 +4,7 @@ import lombok.NonNull; import org.apache.avro.Schema; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.*; import org.apache.parquet.column.statistics.Statistics; import org.apache.parquet.hadoop.ParquetFileReader; @@ -18,187 +16,362 @@ import org.apache.xtable.model.schema.InternalField; import org.apache.xtable.model.schema.InternalPartitionField; import org.apache.xtable.model.schema.InternalSchema; +import org.apache.xtable.model.schema.PartitionTransformType; import org.apache.xtable.model.stat.ColumnStat; +import org.apache.xtable.model.stat.PartitionValue; import org.apache.xtable.model.stat.Range; -import org.apache.xtable.model.storage.DataFilesDiff; -import org.apache.xtable.model.storage.DataLayoutStrategy; -import org.apache.xtable.model.storage.InternalDataFile; -import org.apache.xtable.model.storage.TableFormat; +import org.apache.xtable.model.storage.*; +import org.apache.xtable.schema.SchemaFieldFinder; import org.apache.xtable.spi.extractor.ConversionSource; import java.io.IOException; +import java.nio.file.Paths; import java.time.Instant; import java.util.*; import java.util.stream.Collectors; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; @Builder public class ParquetConversionSource implements ConversionSource { - private final String tableName; - private final String basePath; - @NonNull - private final Configuration hadoopConf; + private final String tableName; + private final String basePath; + @NonNull private final Configuration hadoopConf; -// @Getter(value = AccessLevel.PACKAGE) + // @Getter(value = AccessLevel.PACKAGE) // private final FileStatus latestFileStatus = initFileStatus(); - @Builder.Default - private static final AvroSchemaConverter schemaExtractor = AvroSchemaConverter.getInstance(); - - private FileStatus initFileStatus() { - try { - FileSystem fs = FileSystem.get(hadoopConf); - Optional latestFile =Arrays.stream(fs.listStatus(new Path(basePath))). - filter(FileStatus::isFile) - .max(Comparator.comparing(FileStatus::getModificationTime)); - System.out.println("Latest file "+latestFile.get()); - return latestFile.get(); - } catch (IOException e) { - throw new RuntimeException(e); - } - } + @Builder.Default + private static final AvroSchemaConverter schemaExtractor = AvroSchemaConverter.getInstance(); - @Override - public InternalTable getTable(Long aLong) { - ParquetMetadata metadata = null; - try { - FileSystem fs = FileSystem.get(hadoopConf); - Optional latestFile =Arrays.stream(fs.listStatus(new Path(basePath))). - filter(FileStatus::isFile) - .max(Comparator.comparing(FileStatus::getModificationTime)); - Path p = latestFile.get().getPath(); - metadata = ParquetFileReader.readFooter(hadoopConf,p); - MessageType messageType = metadata.getFileMetaData().getSchema(); - org.apache.parquet.avro.AvroSchemaConverter avroSchemaConverter = new org.apache.parquet.avro.AvroSchemaConverter(); - Schema avroSchema = avroSchemaConverter.convert(messageType); - InternalSchema schema = schemaExtractor.toInternalSchema(avroSchema); - List partitionFields = Collections.emptyList(); - DataLayoutStrategy dataLayoutStrategy = DataLayoutStrategy.FLAT; - return InternalTable.builder() - .tableFormat(TableFormat.PARQUET) - .basePath(basePath) - .name(tableName) - .layoutStrategy(dataLayoutStrategy) - .partitioningFields(partitionFields) - .readSchema(schema) - .latestCommitTime(Instant.ofEpochMilli(latestFile.get().getModificationTime())) - .build(); - } catch (IOException e) { - throw new RuntimeException(e); - } + private static Stream remoteIteratorToStream(RemoteIterator remoteIterator) { + Iterator iterator = + new Iterator() { + @Override + public boolean hasNext() { + try { + return remoteIterator.hasNext(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public T next() { + try { + return remoteIterator.next(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + }; + return StreamSupport.stream( + Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED), false); + } + + private Stream initFileStatus() { + try { + FileSystem fs = FileSystem.get(hadoopConf); + RemoteIterator locatedFileStatusRemoteIterator = + fs.listFiles(new Path(basePath), true); + Stream locatedFileStatusStream = + remoteIteratorToStream(locatedFileStatusRemoteIterator) + .filter(f -> f.getPath().getName().endsWith("parquet")); + return locatedFileStatusStream; + } catch (IOException e) { + throw new RuntimeException(e); } + } - @Override - public InternalSnapshot getCurrentSnapshot() { - InternalTable table = getTable(-1L); + @Override + public InternalTable getTable(Long aLong) { + ParquetMetadata metadata = null; + try { - return InternalSnapshot.builder() - .table(table) - .partitionedDataFiles(Collections.emptyList()) - .build(); + Optional latestFile = + initFileStatus().max(Comparator.comparing(LocatedFileStatus::getModificationTime)); + metadata = ParquetFileReader.readFooter(hadoopConf, latestFile.get().getPath()); + metadata.getBlocks().stream().mapToLong(b -> b.getRowCount()).sum(); + MessageType messageType = metadata.getFileMetaData().getSchema(); + org.apache.parquet.avro.AvroSchemaConverter avroSchemaConverter = + new org.apache.parquet.avro.AvroSchemaConverter(); + Schema avroSchema = avroSchemaConverter.convert(messageType); + InternalSchema schema = schemaExtractor.toInternalSchema(avroSchema); + + Set partitionKeys = getPartitionFromDirectoryStructure(basePath).keySet(); + List partitionFields = + partitionKeys.isEmpty() + ? Collections.emptyList() + : getInternalPartitionField(partitionKeys, schema); + DataLayoutStrategy dataLayoutStrategy = + partitionFields.isEmpty() + ? DataLayoutStrategy.FLAT + : DataLayoutStrategy.HIVE_STYLE_PARTITION; + return InternalTable.builder() + .tableFormat(TableFormat.PARQUET) + .basePath(basePath) + .name(tableName) + .layoutStrategy(dataLayoutStrategy) + .partitioningFields(partitionFields) + .readSchema(schema) + .latestCommitTime(Instant.ofEpochMilli(latestFile.get().getModificationTime())) + .build(); + } catch (IOException e) { + throw new RuntimeException(e); } + } - @Override - public TableChange getTableChangeForCommit(Long aLong) { - try { - FileSystem fs = FileSystem.get(hadoopConf); - List tableChanges =Arrays.stream(fs.listStatus(new Path(basePath))). - filter(FileStatus::isFile) - .filter(fileStatus -> fileStatus.getModificationTime() > aLong) - .collect(Collectors.toList()); - InternalTable internalTable = getTable(-1L); - Set internalDataFiles = new HashSet<>(); - - for(FileStatus tableStatus : tableChanges) - { - internalDataFiles.add( - InternalDataFile.builder() - .physicalPath(tableStatus.getPath().toString()) - .partitionValues(Collections.emptyList()) - .lastModified(tableStatus.getModificationTime()) - .fileSizeBytes(tableStatus.getLen()) - .columnStats(getColumnStatsForaFile(tableStatus,internalTable)) - .build() - ); + private List getInternalPartitionField( + Set partitionList, InternalSchema schema) { + List partitionFields = new ArrayList<>(); - } + for (String partitionKey : partitionList) { + + partitionFields.add( + InternalPartitionField.builder() + .sourceField(SchemaFieldFinder.getInstance().findFieldByPath(schema, partitionKey)) + .transformType(PartitionTransformType.VALUE) + .build()); + } + + return partitionFields; + } - return TableChange.builder().tableAsOfChange(internalTable) - .filesDiff(DataFilesDiff.builder().filesAdded(internalDataFiles).build()) - .build(); + private Map> getPartitionFromDirectoryStructure(String basePath) { + try { + FileSystem fs = FileSystem.get(hadoopConf); + FileStatus[] baseFileStatus = fs.listStatus(new Path(basePath)); + Map> partitionMap = new HashMap<>(); - } catch (IOException e) { - throw new RuntimeException(e); + for (FileStatus dirStatus : baseFileStatus) { + if (dirStatus.isDirectory()) { + String partitionPath = dirStatus.getPath().getName(); + if (partitionPath.contains("=")) { + String[] partitionKeyValue = partitionPath.split("="); + partitionMap + .computeIfAbsent(partitionKeyValue[0], k -> new ArrayList()) + .add(partitionKeyValue[1]); + } } + } + System.out.println("Detected partition " + partitionMap); + return partitionMap; + } catch (IOException e) { + throw new RuntimeException(e); } + } + + @Override + public InternalSnapshot getCurrentSnapshot() { + + List latestFile = initFileStatus().collect(Collectors.toList()); + InternalTable table = getTable(-1L); + Map> partitionKeys = getPartitionFromDirectoryStructure(basePath); + List partitionFields = + getInternalPartitionField(partitionKeys.keySet(), table.getReadSchema()); + + List internalDataFiles = + latestFile.stream() + .map( + file -> { + System.out.println("Get file path " + file.getPath().toString()); - @Override - public CommitsBacklog getCommitsBacklog(InstantsForIncrementalSync instantsForIncrementalSync) { - System.out.println("getCommitsBacklog is called even for full sync"+instantsForIncrementalSync); - List commitsToProcess = Collections.singletonList(instantsForIncrementalSync.getLastSyncInstant().toEpochMilli()); + return InternalDataFile.builder() + .physicalPath(file.getPath().toString()) + .fileFormat(FileFormat.APACHE_PARQUET) + .fileSizeBytes(file.getLen()) + .partitionValues( + getPartitionValue(file.getPath().toString(), table.getReadSchema())) + .lastModified(file.getModificationTime()) + .columnStats(getColumnStatsForaFile(file, table)) + .build(); + }) + .collect(Collectors.toList()); - return CommitsBacklog.builder().commitsToProcess(commitsToProcess).build(); + return InternalSnapshot.builder() + .table(table) + .partitionedDataFiles(PartitionFileGroup.fromFiles(internalDataFiles)) + .build(); + } + + private List getPartitionValue(String filePath, InternalSchema schema) { + System.out.println("Getting partition value for file " + filePath); + List partitionValues = new ArrayList<>(); + Map> partitionKeys = getPartitionFromDirectoryStructure(basePath); + java.nio.file.Path base = Paths.get(basePath).normalize(); + java.nio.file.Path file = Paths.get(filePath).normalize(); + java.nio.file.Path relative = base.relativize(file); + for (Map.Entry> entry : partitionKeys.entrySet()) { + String key = entry.getKey(); + List values = entry.getValue(); + for (String value : values) { + String pathCheck = key + "=" + value; + if (relative.startsWith(pathCheck)) { + System.out.println("Relative " + relative + " " + pathCheck); + partitionValues.add( + PartitionValue.builder() + .partitionField( + InternalPartitionField.builder() + .sourceField(SchemaFieldFinder.getInstance().findFieldByPath(schema, key)) + .transformType(PartitionTransformType.VALUE) + .build()) + .range(Range.scalar(value)) + .build()); + } + } } - @Override - public boolean isIncrementalSyncSafeFrom(Instant instant) { - return true; + return partitionValues; + } + + @Override + public TableChange getTableChangeForCommit(Long aLong) { + try { + FileSystem fs = FileSystem.get(hadoopConf); + + List tableChanges = + Arrays.stream(fs.listStatus(new Path(basePath))) + .filter(FileStatus::isFile) + .filter(fileStatus -> fileStatus.getModificationTime() > aLong) + .collect(Collectors.toList()); + InternalTable internalTable = getTable(-1L); + Set internalDataFiles = new HashSet<>(); + System.out.println("Table changes " + tableChanges); + for (FileStatus tableStatus : tableChanges) { + System.out.println("Trying to get stats for " + tableStatus.getPath().toString()); + internalDataFiles.add( + InternalDataFile.builder() + .physicalPath(tableStatus.getPath().toString()) + .partitionValues(Collections.emptyList()) + .lastModified(tableStatus.getModificationTime()) + .fileSizeBytes(tableStatus.getLen()) + .columnStats(getColumnStatsForaFile(tableStatus, internalTable)) + .build()); + } + + return TableChange.builder() + .tableAsOfChange(internalTable) + .filesDiff(DataFilesDiff.builder().filesAdded(internalDataFiles).build()) + .build(); + + } catch (IOException e) { + throw new RuntimeException(e); } + } + + @Override + public CommitsBacklog getCommitsBacklog( + InstantsForIncrementalSync instantsForIncrementalSync) { + System.out.println( + "getCommitsBacklog is called even for full sync" + instantsForIncrementalSync); + List commitsToProcess = + Collections.singletonList(instantsForIncrementalSync.getLastSyncInstant().toEpochMilli()); + + return CommitsBacklog.builder().commitsToProcess(commitsToProcess).build(); + } + + @Override + public boolean isIncrementalSyncSafeFrom(Instant instant) { + return true; + } - @Override - public void close() throws IOException { + @Override + public void close() throws IOException {} + private List getColumnStatsForaFile( + FileStatus fileStatus, InternalTable internalTable) { + try { + List columnStats = new ArrayList<>(); + ParquetMetadata parquetMetadata = + ParquetFileReader.readFooter(hadoopConf, fileStatus.getPath()); + + for (InternalField field : internalTable.getReadSchema().getAllFields()) { + Optional columnStatistics = + parquetMetadata.getBlocks().stream() + .flatMap(block -> block.getColumns().stream()) + .filter( + column -> { + return column.getPath().toDotString().equals(field.getPath()); + }) + .map(column -> column.getStatistics()) + .reduce( + (rg1, rg2) -> { + rg1.mergeStatistics(rg2); + System.out.println("rg1 " + rg1.genericGetMin()); + + return rg1; + }); + if (!columnStatistics.isPresent()) { + System.out.println("Column stats null for " + field.getPath()); + System.out.println(""); + } + if (columnStatistics.isPresent()) { + System.out.println("Column stats PRESENT for " + field.getPath()); + System.out.println("Min value " + columnStatistics.get().genericGetMin()); + System.out.println("Min value String " + columnStatistics.get().minAsString()); + + columnStats.add( + ColumnStat.builder() + .field(field) + .numNulls(columnStatistics.get().getNumNulls()) + .range( + Range.vector( + columnStatistics.get().genericGetMin(), + columnStatistics.get().genericGetMax())) + .build()); + } + } + return columnStats; + + } catch (IOException e) { + throw new RuntimeException(e); } + } - private List getColumnStatsForaFile(FileStatus fileStatus, InternalTable internalTable) - { - try { - List columnStats = new ArrayList<>(); - ParquetMetadata parquetMetadata =ParquetFileReader.readFooter(hadoopConf,fileStatus.getPath()); - - for(InternalField field : internalTable.getReadSchema().getAllFields()) - { - Optional columnStatistics = parquetMetadata.getBlocks().stream() - .flatMap(block -> block.getColumns().stream()) - .filter(column -> - { - System.out.println("column "+column.getPath().toString()); - System.out.println("column "+column.getPath().toDotString()); - System.out.println("field.getPath() "+field.getPath()); - - return column.getPath().toDotString().equals(field.getPath()); - } ) - .map(column -> column.getStatistics()) - .reduce((rg1,rg2) -> { - System.out.println("rg1 "+rg1.genericGetMin()); - System.out.println("rg2 "+rg2.genericGetMin()); - - rg1.mergeStatistics(rg2); - System.out.println("rg1 "+rg1.genericGetMin()); - - return rg1; - }); - if(!columnStatistics.isPresent()) - { - System.out.println("Column stats null for "+field.getPath()); - System.out.println(""); - } - columnStats.add( - ColumnStat.builder() - .field(field) - .numNulls(columnStatistics.get().getNumNulls()) - .range(Range.vector(columnStatistics.get().genericGetMin(),columnStatistics.get().genericGetMax())) - .build() - ) ; - } -System.out.println("sijze of column stats "+columnStats.get(0).getField()+" "+columnStats.get(0).getRange()); - } catch (IOException e) { - throw new RuntimeException(e); + private List getColumnStatsForaFile( + LocatedFileStatus fileStatus, InternalTable internalTable) { + try { + List columnStats = new ArrayList<>(); + ParquetMetadata parquetMetadata = + ParquetFileReader.readFooter(hadoopConf, fileStatus.getPath()); + + for (InternalField field : internalTable.getReadSchema().getAllFields()) { + Optional columnStatistics = + parquetMetadata.getBlocks().stream() + .flatMap(block -> block.getColumns().stream()) + .filter( + column -> { + return column.getPath().toDotString().equals(field.getPath()); + }) + .map(column -> column.getStatistics()) + .reduce( + (rg1, rg2) -> { + rg1.mergeStatistics(rg2); + return rg1; + }); + if (!columnStatistics.isPresent()) { + System.out.println("Column stats null for " + field.getPath()); + } + if (columnStatistics.isPresent()) { + columnStats.add( + ColumnStat.builder() + .field(field) + .numNulls(columnStatistics.get().getNumNulls()) + .range( + Range.vector( + columnStatistics.get().minAsString(), + columnStatistics.get().maxAsString())) + .build()); } + } + return columnStats; - return Collections.emptyList(); + } catch (IOException e) { + throw new RuntimeException(e); } + } } diff --git a/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetInternalDataFileConvertor.java b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetInternalDataFileConvertor.java new file mode 100644 index 000000000..f5df4a7c5 --- /dev/null +++ b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetInternalDataFileConvertor.java @@ -0,0 +1,5 @@ +package org.apache.xtable.parquet; + +public class ParquetInternalDataFileConvertor { + +} diff --git a/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetPartitionExtractor.java b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetPartitionExtractor.java new file mode 100644 index 000000000..3f474829d --- /dev/null +++ b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetPartitionExtractor.java @@ -0,0 +1,85 @@ +package org.apache.xtable.parquet; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.xtable.delta.DeltaPartitionExtractor; +import org.apache.xtable.model.schema.InternalPartitionField; +import org.apache.xtable.model.schema.InternalSchema; +import org.apache.xtable.model.schema.PartitionTransformType; +import org.apache.xtable.model.stat.PartitionValue; +import org.apache.xtable.model.stat.Range; +import org.apache.xtable.schema.SchemaFieldFinder; + +import java.io.IOException; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class ParquetPartitionExtractor { + private static final ParquetPartitionExtractor INSTANCE = new ParquetPartitionExtractor(); + public static ParquetPartitionExtractor getInstance() { + return INSTANCE; + } + + private Map> getPartitionFromDirectoryStructure(Configuration hadoopConf, String basePath , Map> partitions) { + + try { + FileSystem fs = FileSystem.get(hadoopConf); + FileStatus[] baseFileStatus = fs.listStatus(new Path(basePath)); + Map> partitionMap = new HashMap<>(partitions); + + for (FileStatus dirStatus : baseFileStatus) { + if (dirStatus.isDirectory()) { + String partitionPath = dirStatus.getPath().getName(); + if (partitionPath.contains("=")) { + String[] partitionKeyValue = partitionPath.split("="); + partitionMap + .computeIfAbsent(partitionKeyValue[0], k -> new ArrayList()) + .add(partitionKeyValue[1]); + } + getPartitionFromDirectoryStructure(hadoopConf,dirStatus.getPath().toString(),partitionMap); + } + } + + System.out.println("Detected partition " + partitionMap); + return partitionMap; + + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private List getPartitionValue(String basePath , String filePath, InternalSchema schema) { + System.out.println("Getting partition value for file " + filePath); + List partitionValues = new ArrayList<>(); + Map> partitionKeys = getPartitionFromDirectoryStructure(basePath); + java.nio.file.Path base = Paths.get(basePath).normalize(); + java.nio.file.Path file = Paths.get(filePath).normalize(); + java.nio.file.Path relative = base.relativize(file); + for (Map.Entry> entry : partitionKeys.entrySet()) { + String key = entry.getKey(); + List values = entry.getValue(); + for (String value : values) { + String pathCheck = key + "=" + value; + if (relative.startsWith(pathCheck)) { + System.out.println("Relative " + relative + " " + pathCheck); + partitionValues.add( + PartitionValue.builder() + .partitionField( + InternalPartitionField.builder() + .sourceField(SchemaFieldFinder.getInstance().findFieldByPath(schema, key)) + .transformType(PartitionTransformType.VALUE) + .build()) + .range(Range.scalar(value)) + .build()); + } + } + } + + return partitionValues; + } +} diff --git a/xtable-utilities/src/main/resources/xtable-conversion-defaults.yaml b/xtable-utilities/src/main/resources/xtable-conversion-defaults.yaml index 0b206d85c..af646dbbd 100644 --- a/xtable-utilities/src/main/resources/xtable-conversion-defaults.yaml +++ b/xtable-utilities/src/main/resources/xtable-conversion-defaults.yaml @@ -37,6 +37,7 @@ tableFormatConverters: configuration: spark.master: local[2] spark.app.name: xtable + spark.databricks.delta.commitValidation.enabled: false ICEBERG: conversionSourceProviderClass: org.apache.xtable.iceberg.IcebergConversionSourceProvider conversionTargetProviderClass: org.apache.xtable.iceberg.IcebergConversionTarget \ No newline at end of file From 9a4690ffca2b7c3e96245ffdc0ea3a0bee625e29 Mon Sep 17 00:00:00 2001 From: sudharshanr Date: Sat, 7 Dec 2024 12:50:28 +0530 Subject: [PATCH 3/5] First version of code for syncing parquet format Signed-off-by: sudharshanr --- .../xtable/parquet/FileSystemHelper.java | 81 ++++ .../parquet/ParquetConversionSource.java | 426 ++++++------------ .../parquet/ParquetPartitionExtractor.java | 85 ---- .../parquet/ParquetPartitionHelper.java | 69 +++ 4 files changed, 279 insertions(+), 382 deletions(-) create mode 100644 xtable-core/src/main/java/org/apache/xtable/parquet/FileSystemHelper.java delete mode 100644 xtable-core/src/main/java/org/apache/xtable/parquet/ParquetPartitionExtractor.java create mode 100644 xtable-core/src/main/java/org/apache/xtable/parquet/ParquetPartitionHelper.java diff --git a/xtable-core/src/main/java/org/apache/xtable/parquet/FileSystemHelper.java b/xtable-core/src/main/java/org/apache/xtable/parquet/FileSystemHelper.java new file mode 100644 index 000000000..7eebca90a --- /dev/null +++ b/xtable-core/src/main/java/org/apache/xtable/parquet/FileSystemHelper.java @@ -0,0 +1,81 @@ +package org.apache.xtable.parquet; + +import java.io.IOException; +import java.util.*; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.*; + +public class FileSystemHelper { + + private static final FileSystemHelper INSTANCE = new FileSystemHelper(); + + public static FileSystemHelper getInstance() { + return INSTANCE; + } + + public Stream getParquetFiles(Configuration hadoopConf, String basePath) { + try { + FileSystem fs = FileSystem.get(hadoopConf); + RemoteIterator iterator = fs.listFiles(new Path(basePath), true); + return remoteIteratorToStream(iterator) + .filter(file -> file.getPath().getName().endsWith("parquet")); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + public Map> getPartitionFromDirectoryStructure( + Configuration hadoopConf, String basePath, Map> partitionMap) { + + try { + FileSystem fs = FileSystem.get(hadoopConf); + FileStatus[] baseFileStatus = fs.listStatus(new Path(basePath)); + Map> currentPartitionMap = new HashMap<>(partitionMap); + + for (FileStatus dirStatus : baseFileStatus) { + if (dirStatus.isDirectory()) { + String partitionPath = dirStatus.getPath().getName(); + if (partitionPath.contains("=")) { + String[] partitionKeyValue = partitionPath.split("="); + currentPartitionMap + .computeIfAbsent(partitionKeyValue[0], k -> new ArrayList<>()) + .add(partitionKeyValue[1]); + getPartitionFromDirectoryStructure( + hadoopConf, dirStatus.getPath().toString(), partitionMap); + } + } + } + return currentPartitionMap; + + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private static Stream remoteIteratorToStream(RemoteIterator remoteIterator) { + Iterator iterator = + new Iterator() { + @Override + public boolean hasNext() { + try { + return remoteIterator.hasNext(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public T next() { + try { + return remoteIterator.next(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + }; + return StreamSupport.stream( + Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED), false); + } +} diff --git a/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetConversionSource.java b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetConversionSource.java index 37189a859..f51a633a5 100644 --- a/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetConversionSource.java +++ b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetConversionSource.java @@ -1,37 +1,23 @@ package org.apache.xtable.parquet; +import java.io.IOException; +import java.time.Instant; +import java.util.*; +import java.util.stream.Collectors; import lombok.Builder; import lombok.NonNull; import org.apache.avro.Schema; +import org.apache.avro.SchemaBuilder; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.*; - -import org.apache.parquet.column.statistics.Statistics; -import org.apache.parquet.hadoop.ParquetFileReader; import org.apache.parquet.hadoop.metadata.ParquetMetadata; -import org.apache.parquet.schema.MessageType; import org.apache.xtable.avro.AvroSchemaConverter; - import org.apache.xtable.model.*; -import org.apache.xtable.model.schema.InternalField; import org.apache.xtable.model.schema.InternalPartitionField; import org.apache.xtable.model.schema.InternalSchema; -import org.apache.xtable.model.schema.PartitionTransformType; -import org.apache.xtable.model.stat.ColumnStat; -import org.apache.xtable.model.stat.PartitionValue; -import org.apache.xtable.model.stat.Range; import org.apache.xtable.model.storage.*; -import org.apache.xtable.schema.SchemaFieldFinder; import org.apache.xtable.spi.extractor.ConversionSource; -import java.io.IOException; -import java.nio.file.Paths; -import java.time.Instant; -import java.util.*; -import java.util.stream.Collectors; -import java.util.stream.Stream; -import java.util.stream.StreamSupport; - @Builder public class ParquetConversionSource implements ConversionSource { @@ -39,157 +25,102 @@ public class ParquetConversionSource implements ConversionSource { private final String basePath; @NonNull private final Configuration hadoopConf; - // @Getter(value = AccessLevel.PACKAGE) - // private final FileStatus latestFileStatus = initFileStatus(); - @Builder.Default private static final AvroSchemaConverter schemaExtractor = AvroSchemaConverter.getInstance(); - private static Stream remoteIteratorToStream(RemoteIterator remoteIterator) { - Iterator iterator = - new Iterator() { - @Override - public boolean hasNext() { - try { - return remoteIterator.hasNext(); - } catch (IOException e) { - throw new RuntimeException(e); - } - } + @Builder.Default private static final FileSystemHelper fsHelper = FileSystemHelper.getInstance(); - @Override - public T next() { - try { - return remoteIterator.next(); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - }; - return StreamSupport.stream( - Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED), false); - } + @Builder.Default + private static final ParquetMetadataExtractor parquetMetadataExtractor = + ParquetMetadataExtractor.getInstance(); - private Stream initFileStatus() { - try { - FileSystem fs = FileSystem.get(hadoopConf); - RemoteIterator locatedFileStatusRemoteIterator = - fs.listFiles(new Path(basePath), true); - Stream locatedFileStatusStream = - remoteIteratorToStream(locatedFileStatusRemoteIterator) - .filter(f -> f.getPath().getName().endsWith("parquet")); - return locatedFileStatusStream; - } catch (IOException e) { - throw new RuntimeException(e); - } + @Builder.Default + private static final ParquetPartitionHelper parquetPartitionHelper = + ParquetPartitionHelper.getInstance(); + + private Map> initPartitionInfo() { + return fsHelper.getPartitionFromDirectoryStructure( + hadoopConf, basePath, Collections.emptyMap()); } + /** + * To infer schema getting the latest file assumption is that latest file will have new fields + * + * @param modificationTime the commit to consider for reading the table state + * @return + */ @Override - public InternalTable getTable(Long aLong) { - ParquetMetadata metadata = null; - try { + public InternalTable getTable(Long modificationTime) { - Optional latestFile = - initFileStatus().max(Comparator.comparing(LocatedFileStatus::getModificationTime)); - metadata = ParquetFileReader.readFooter(hadoopConf, latestFile.get().getPath()); - metadata.getBlocks().stream().mapToLong(b -> b.getRowCount()).sum(); - MessageType messageType = metadata.getFileMetaData().getSchema(); - org.apache.parquet.avro.AvroSchemaConverter avroSchemaConverter = - new org.apache.parquet.avro.AvroSchemaConverter(); - Schema avroSchema = avroSchemaConverter.convert(messageType); - InternalSchema schema = schemaExtractor.toInternalSchema(avroSchema); - - Set partitionKeys = getPartitionFromDirectoryStructure(basePath).keySet(); - List partitionFields = - partitionKeys.isEmpty() - ? Collections.emptyList() - : getInternalPartitionField(partitionKeys, schema); - DataLayoutStrategy dataLayoutStrategy = - partitionFields.isEmpty() - ? DataLayoutStrategy.FLAT - : DataLayoutStrategy.HIVE_STYLE_PARTITION; - return InternalTable.builder() - .tableFormat(TableFormat.PARQUET) - .basePath(basePath) - .name(tableName) - .layoutStrategy(dataLayoutStrategy) - .partitioningFields(partitionFields) - .readSchema(schema) - .latestCommitTime(Instant.ofEpochMilli(latestFile.get().getModificationTime())) - .build(); - } catch (IOException e) { - throw new RuntimeException(e); - } - } + Optional latestFile = + fsHelper + .getParquetFiles(hadoopConf, basePath) + .max(Comparator.comparing(FileStatus::getModificationTime)); - private List getInternalPartitionField( - Set partitionList, InternalSchema schema) { - List partitionFields = new ArrayList<>(); + ParquetMetadata parquetMetadata = + parquetMetadataExtractor.readParquetMetadata(hadoopConf, latestFile.get().getPath()); + Schema tableSchema = + new org.apache.parquet.avro.AvroSchemaConverter() + .convert(parquetMetadata.getFileMetaData().getSchema()); - for (String partitionKey : partitionList) { + Set partitionKeys = initPartitionInfo().keySet(); - partitionFields.add( - InternalPartitionField.builder() - .sourceField(SchemaFieldFinder.getInstance().findFieldByPath(schema, partitionKey)) - .transformType(PartitionTransformType.VALUE) - .build()); + // merge schema of partition into original as partition is not part of parquet fie + if (!partitionKeys.isEmpty()) { + tableSchema = mergeAvroSchema(tableSchema, partitionKeys); } + InternalSchema schema = schemaExtractor.toInternalSchema(tableSchema); - return partitionFields; - } - - private Map> getPartitionFromDirectoryStructure(String basePath) { - - try { - FileSystem fs = FileSystem.get(hadoopConf); - FileStatus[] baseFileStatus = fs.listStatus(new Path(basePath)); - Map> partitionMap = new HashMap<>(); - - for (FileStatus dirStatus : baseFileStatus) { - if (dirStatus.isDirectory()) { - String partitionPath = dirStatus.getPath().getName(); - if (partitionPath.contains("=")) { - String[] partitionKeyValue = partitionPath.split("="); - partitionMap - .computeIfAbsent(partitionKeyValue[0], k -> new ArrayList()) - .add(partitionKeyValue[1]); - } - } - } - - System.out.println("Detected partition " + partitionMap); - return partitionMap; - - } catch (IOException e) { - throw new RuntimeException(e); - } + List partitionFields = + partitionKeys.isEmpty() + ? Collections.emptyList() + : parquetPartitionHelper.getInternalPartitionField(partitionKeys, schema); + DataLayoutStrategy dataLayoutStrategy = + partitionFields.isEmpty() + ? DataLayoutStrategy.FLAT + : DataLayoutStrategy.HIVE_STYLE_PARTITION; + return InternalTable.builder() + .tableFormat(TableFormat.PARQUET) + .basePath(basePath) + .name(tableName) + .layoutStrategy(dataLayoutStrategy) + .partitioningFields(partitionFields) + .readSchema(schema) + .latestCommitTime(Instant.ofEpochMilli(latestFile.get().getModificationTime())) + .build(); } + /** + * Here to get current snapshot listing all files hence the -1 is being passed + * + * @return + */ @Override public InternalSnapshot getCurrentSnapshot() { - List latestFile = initFileStatus().collect(Collectors.toList()); + List latestFile = + fsHelper.getParquetFiles(hadoopConf, basePath).collect(Collectors.toList()); + Map> partitionInfo = initPartitionInfo(); InternalTable table = getTable(-1L); - Map> partitionKeys = getPartitionFromDirectoryStructure(basePath); - List partitionFields = - getInternalPartitionField(partitionKeys.keySet(), table.getReadSchema()); - List internalDataFiles = latestFile.stream() .map( - file -> { - System.out.println("Get file path " + file.getPath().toString()); - - return InternalDataFile.builder() - .physicalPath(file.getPath().toString()) - .fileFormat(FileFormat.APACHE_PARQUET) - .fileSizeBytes(file.getLen()) - .partitionValues( - getPartitionValue(file.getPath().toString(), table.getReadSchema())) - .lastModified(file.getModificationTime()) - .columnStats(getColumnStatsForaFile(file, table)) - .build(); - }) + file -> + InternalDataFile.builder() + .physicalPath(file.getPath().toString()) + .fileFormat(FileFormat.APACHE_PARQUET) + .fileSizeBytes(file.getLen()) + .partitionValues( + parquetPartitionHelper.getPartitionValue( + basePath, + file.getPath().toString(), + table.getReadSchema(), + partitionInfo)) + .lastModified(file.getModificationTime()) + .columnStats( + parquetMetadataExtractor.getColumnStatsForaFile( + hadoopConf, file, table)) + .build()) .collect(Collectors.toList()); return InternalSnapshot.builder() @@ -198,82 +129,61 @@ public InternalSnapshot getCurrentSnapshot() { .build(); } - private List getPartitionValue(String filePath, InternalSchema schema) { - System.out.println("Getting partition value for file " + filePath); - List partitionValues = new ArrayList<>(); - Map> partitionKeys = getPartitionFromDirectoryStructure(basePath); - java.nio.file.Path base = Paths.get(basePath).normalize(); - java.nio.file.Path file = Paths.get(filePath).normalize(); - java.nio.file.Path relative = base.relativize(file); - for (Map.Entry> entry : partitionKeys.entrySet()) { - String key = entry.getKey(); - List values = entry.getValue(); - for (String value : values) { - String pathCheck = key + "=" + value; - if (relative.startsWith(pathCheck)) { - System.out.println("Relative " + relative + " " + pathCheck); - partitionValues.add( - PartitionValue.builder() - .partitionField( - InternalPartitionField.builder() - .sourceField(SchemaFieldFinder.getInstance().findFieldByPath(schema, key)) - .transformType(PartitionTransformType.VALUE) - .build()) - .range(Range.scalar(value)) - .build()); - } - } - } - - return partitionValues; - } - + /** + * Whenever new file is added , condition to get new file is listing files whose modification time + * is greater than previous ysnc + * + * @param modificationTime commit to capture table changes for. + * @return + */ @Override - public TableChange getTableChangeForCommit(Long aLong) { - try { - FileSystem fs = FileSystem.get(hadoopConf); - - List tableChanges = - Arrays.stream(fs.listStatus(new Path(basePath))) - .filter(FileStatus::isFile) - .filter(fileStatus -> fileStatus.getModificationTime() > aLong) - .collect(Collectors.toList()); - InternalTable internalTable = getTable(-1L); - Set internalDataFiles = new HashSet<>(); - System.out.println("Table changes " + tableChanges); - for (FileStatus tableStatus : tableChanges) { - System.out.println("Trying to get stats for " + tableStatus.getPath().toString()); - internalDataFiles.add( - InternalDataFile.builder() - .physicalPath(tableStatus.getPath().toString()) - .partitionValues(Collections.emptyList()) - .lastModified(tableStatus.getModificationTime()) - .fileSizeBytes(tableStatus.getLen()) - .columnStats(getColumnStatsForaFile(tableStatus, internalTable)) - .build()); - } - - return TableChange.builder() - .tableAsOfChange(internalTable) - .filesDiff(DataFilesDiff.builder().filesAdded(internalDataFiles).build()) - .build(); - - } catch (IOException e) { - throw new RuntimeException(e); + public TableChange getTableChangeForCommit(Long modificationTime) { + List tableChanges = + fsHelper + .getParquetFiles(hadoopConf, basePath) + .filter(fileStatus -> fileStatus.getModificationTime() > modificationTime) + .collect(Collectors.toList()); + // TODO avoid doing full list of directory to get schema , just argument of modification time + // needs to be tweaked + InternalTable internalTable = getTable(-1L); + Set internalDataFiles = new HashSet<>(); + Map> partitionInfo = initPartitionInfo(); + for (FileStatus tableStatus : tableChanges) { + internalDataFiles.add( + InternalDataFile.builder() + .physicalPath(tableStatus.getPath().toString()) + .partitionValues( + parquetPartitionHelper.getPartitionValue( + basePath, + tableStatus.getPath().toString(), + internalTable.getReadSchema(), + partitionInfo)) + .lastModified(tableStatus.getModificationTime()) + .fileSizeBytes(tableStatus.getLen()) + .columnStats( + parquetMetadataExtractor.getColumnStatsForaFile( + hadoopConf, tableStatus, internalTable)) + .build()); } + + return TableChange.builder() + .tableAsOfChange(internalTable) + .filesDiff(DataFilesDiff.builder().filesAdded(internalDataFiles).build()) + .build(); } @Override public CommitsBacklog getCommitsBacklog( InstantsForIncrementalSync instantsForIncrementalSync) { - System.out.println( - "getCommitsBacklog is called even for full sync" + instantsForIncrementalSync); + List commitsToProcess = Collections.singletonList(instantsForIncrementalSync.getLastSyncInstant().toEpochMilli()); return CommitsBacklog.builder().commitsToProcess(commitsToProcess).build(); } + // TODO Need to understnad how this needs to be implemented should _SUCCESS or .staging dir needs + // to be checked @Override public boolean isIncrementalSyncSafeFrom(Instant instant) { return true; @@ -282,96 +192,18 @@ public boolean isIncrementalSyncSafeFrom(Instant instant) { @Override public void close() throws IOException {} - private List getColumnStatsForaFile( - FileStatus fileStatus, InternalTable internalTable) { - try { - List columnStats = new ArrayList<>(); - ParquetMetadata parquetMetadata = - ParquetFileReader.readFooter(hadoopConf, fileStatus.getPath()); - - for (InternalField field : internalTable.getReadSchema().getAllFields()) { - Optional columnStatistics = - parquetMetadata.getBlocks().stream() - .flatMap(block -> block.getColumns().stream()) - .filter( - column -> { - return column.getPath().toDotString().equals(field.getPath()); - }) - .map(column -> column.getStatistics()) - .reduce( - (rg1, rg2) -> { - rg1.mergeStatistics(rg2); - System.out.println("rg1 " + rg1.genericGetMin()); + private Schema mergeAvroSchema(Schema internalSchema, Set parititonFields) { - return rg1; - }); - if (!columnStatistics.isPresent()) { - System.out.println("Column stats null for " + field.getPath()); - System.out.println(""); - } - if (columnStatistics.isPresent()) { - System.out.println("Column stats PRESENT for " + field.getPath()); - System.out.println("Min value " + columnStatistics.get().genericGetMin()); - System.out.println("Min value String " + columnStatistics.get().minAsString()); - - columnStats.add( - ColumnStat.builder() - .field(field) - .numNulls(columnStatistics.get().getNumNulls()) - .range( - Range.vector( - columnStatistics.get().genericGetMin(), - columnStatistics.get().genericGetMax())) - .build()); - } - } - return columnStats; - - } catch (IOException e) { - throw new RuntimeException(e); + SchemaBuilder.FieldAssembler fieldAssembler = + SchemaBuilder.record(internalSchema.getName()).fields(); + for (Schema.Field field : internalSchema.getFields()) { + fieldAssembler = fieldAssembler.name(field.name()).type(field.schema()).noDefault(); } - } - private List getColumnStatsForaFile( - LocatedFileStatus fileStatus, InternalTable internalTable) { - try { - List columnStats = new ArrayList<>(); - ParquetMetadata parquetMetadata = - ParquetFileReader.readFooter(hadoopConf, fileStatus.getPath()); - - for (InternalField field : internalTable.getReadSchema().getAllFields()) { - Optional columnStatistics = - parquetMetadata.getBlocks().stream() - .flatMap(block -> block.getColumns().stream()) - .filter( - column -> { - return column.getPath().toDotString().equals(field.getPath()); - }) - .map(column -> column.getStatistics()) - .reduce( - (rg1, rg2) -> { - rg1.mergeStatistics(rg2); - return rg1; - }); - if (!columnStatistics.isPresent()) { - System.out.println("Column stats null for " + field.getPath()); - } - if (columnStatistics.isPresent()) { - columnStats.add( - ColumnStat.builder() - .field(field) - .numNulls(columnStatistics.get().getNumNulls()) - .range( - Range.vector( - columnStatistics.get().minAsString(), - columnStatistics.get().maxAsString())) - .build()); - } - } - return columnStats; - - } catch (IOException e) { - throw new RuntimeException(e); + for (String paritionKey : parititonFields) { + fieldAssembler = fieldAssembler.name(paritionKey).type().stringType().noDefault(); } + + return fieldAssembler.endRecord(); } } diff --git a/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetPartitionExtractor.java b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetPartitionExtractor.java deleted file mode 100644 index 3f474829d..000000000 --- a/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetPartitionExtractor.java +++ /dev/null @@ -1,85 +0,0 @@ -package org.apache.xtable.parquet; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.xtable.delta.DeltaPartitionExtractor; -import org.apache.xtable.model.schema.InternalPartitionField; -import org.apache.xtable.model.schema.InternalSchema; -import org.apache.xtable.model.schema.PartitionTransformType; -import org.apache.xtable.model.stat.PartitionValue; -import org.apache.xtable.model.stat.Range; -import org.apache.xtable.schema.SchemaFieldFinder; - -import java.io.IOException; -import java.nio.file.Paths; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -public class ParquetPartitionExtractor { - private static final ParquetPartitionExtractor INSTANCE = new ParquetPartitionExtractor(); - public static ParquetPartitionExtractor getInstance() { - return INSTANCE; - } - - private Map> getPartitionFromDirectoryStructure(Configuration hadoopConf, String basePath , Map> partitions) { - - try { - FileSystem fs = FileSystem.get(hadoopConf); - FileStatus[] baseFileStatus = fs.listStatus(new Path(basePath)); - Map> partitionMap = new HashMap<>(partitions); - - for (FileStatus dirStatus : baseFileStatus) { - if (dirStatus.isDirectory()) { - String partitionPath = dirStatus.getPath().getName(); - if (partitionPath.contains("=")) { - String[] partitionKeyValue = partitionPath.split("="); - partitionMap - .computeIfAbsent(partitionKeyValue[0], k -> new ArrayList()) - .add(partitionKeyValue[1]); - } - getPartitionFromDirectoryStructure(hadoopConf,dirStatus.getPath().toString(),partitionMap); - } - } - - System.out.println("Detected partition " + partitionMap); - return partitionMap; - - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - private List getPartitionValue(String basePath , String filePath, InternalSchema schema) { - System.out.println("Getting partition value for file " + filePath); - List partitionValues = new ArrayList<>(); - Map> partitionKeys = getPartitionFromDirectoryStructure(basePath); - java.nio.file.Path base = Paths.get(basePath).normalize(); - java.nio.file.Path file = Paths.get(filePath).normalize(); - java.nio.file.Path relative = base.relativize(file); - for (Map.Entry> entry : partitionKeys.entrySet()) { - String key = entry.getKey(); - List values = entry.getValue(); - for (String value : values) { - String pathCheck = key + "=" + value; - if (relative.startsWith(pathCheck)) { - System.out.println("Relative " + relative + " " + pathCheck); - partitionValues.add( - PartitionValue.builder() - .partitionField( - InternalPartitionField.builder() - .sourceField(SchemaFieldFinder.getInstance().findFieldByPath(schema, key)) - .transformType(PartitionTransformType.VALUE) - .build()) - .range(Range.scalar(value)) - .build()); - } - } - } - - return partitionValues; - } -} diff --git a/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetPartitionHelper.java b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetPartitionHelper.java new file mode 100644 index 000000000..1c185ced0 --- /dev/null +++ b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetPartitionHelper.java @@ -0,0 +1,69 @@ +package org.apache.xtable.parquet; + +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.xtable.model.schema.InternalPartitionField; +import org.apache.xtable.model.schema.InternalSchema; +import org.apache.xtable.model.schema.PartitionTransformType; +import org.apache.xtable.model.stat.PartitionValue; +import org.apache.xtable.model.stat.Range; +import org.apache.xtable.schema.SchemaFieldFinder; + +public class ParquetPartitionHelper { + private static final ParquetPartitionHelper INSTANCE = new ParquetPartitionHelper(); + + public static ParquetPartitionHelper getInstance() { + return INSTANCE; + } + + public List getInternalPartitionField( + Set partitionList, InternalSchema schema) { + List partitionFields = new ArrayList<>(); + + for (String partitionKey : partitionList) { + + partitionFields.add( + InternalPartitionField.builder() + .sourceField(SchemaFieldFinder.getInstance().findFieldByPath(schema, partitionKey)) + .transformType(PartitionTransformType.VALUE) + .build()); + } + + return partitionFields; + } + + // TODO logic is too complicated can be simplified + public List getPartitionValue( + String basePath, + String filePath, + InternalSchema schema, + Map> partitionInfo) { + List partitionValues = new ArrayList<>(); + java.nio.file.Path base = Paths.get(basePath).normalize(); + java.nio.file.Path file = Paths.get(filePath).normalize(); + java.nio.file.Path relative = base.relativize(file); + for (Map.Entry> entry : partitionInfo.entrySet()) { + String key = entry.getKey(); + List values = entry.getValue(); + for (String value : values) { + String pathCheck = key + "=" + value; + if (relative.startsWith(pathCheck)) { + System.out.println("Relative " + relative + " " + pathCheck); + partitionValues.add( + PartitionValue.builder() + .partitionField( + InternalPartitionField.builder() + .sourceField(SchemaFieldFinder.getInstance().findFieldByPath(schema, key)) + .transformType(PartitionTransformType.VALUE) + .build()) + .range(Range.scalar(value)) + .build()); + } + } + } + return partitionValues; + } +} From fc4dfab8721eb2c2e9e7d77612944a43eb0a0a12 Mon Sep 17 00:00:00 2001 From: sudhar91 Date: Sat, 7 Dec 2024 13:22:14 +0530 Subject: [PATCH 4/5] delete unused files --- .../xtable/parquet/ParquetTableExtractor.java | 72 ------------------- 1 file changed, 72 deletions(-) delete mode 100644 xtable-core/src/main/java/org/apache/xtable/parquet/ParquetTableExtractor.java diff --git a/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetTableExtractor.java b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetTableExtractor.java deleted file mode 100644 index f1bd41fb3..000000000 --- a/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetTableExtractor.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.xtable.parquet; - -import lombok.Builder; -import org.apache.avro.Schema; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.parquet.hadoop.ParquetFileReader; -import org.apache.parquet.hadoop.metadata.ParquetMetadata; -import org.apache.parquet.schema.MessageType; -import org.apache.spark.sql.delta.DeltaLog; -import org.apache.spark.sql.delta.Snapshot; -import org.apache.xtable.avro.AvroSchemaConverter; -import org.apache.xtable.delta.DeltaPartitionExtractor; -import org.apache.xtable.delta.DeltaSchemaExtractor; -import org.apache.xtable.model.InternalTable; -import org.apache.xtable.model.schema.InternalPartitionField; -import org.apache.xtable.model.schema.InternalSchema; -import org.apache.xtable.model.storage.DataLayoutStrategy; -import org.apache.xtable.model.storage.TableFormat; -import scala.Option; - -import java.io.IOException; -import java.time.Instant; -import java.util.Collections; -import java.util.List; - -/** - * Extracts {@link InternalTable} canonical representation of a table at a point in time for Delta. - */ -@Builder -public class ParquetTableExtractor { - @Builder.Default - private static final AvroSchemaConverter schemaExtractor = AvroSchemaConverter.getInstance(); - - public InternalTable table(Configuration conf, String basePath, String tableName, Long latestFileTimeStamp) throws IOException { - - ParquetMetadata metadata = ParquetFileReader.readFooter(conf,new Path(basePath)); - MessageType messageType = metadata.getFileMetaData().getSchema(); - org.apache.parquet.avro.AvroSchemaConverter avroSchemaConverter = new org.apache.parquet.avro.AvroSchemaConverter(); - Schema avroSchema = avroSchemaConverter.convert(messageType); - InternalSchema schema = schemaExtractor.toInternalSchema(avroSchema); - List partitionFields =Collections.emptyList(); - DataLayoutStrategy dataLayoutStrategy = DataLayoutStrategy.FLAT; - return InternalTable.builder() - .tableFormat(TableFormat.PARQUET) - .basePath(basePath) - .name(tableName) - .layoutStrategy(dataLayoutStrategy) - .partitioningFields(partitionFields) - .readSchema(schema) - .latestCommitTime(Instant.ofEpochMilli(latestFileTimeStamp)) - .build(); - } -} From c9f52f804f25f89713a7432240015fa3efb2f9d2 Mon Sep 17 00:00:00 2001 From: sudhar91 Date: Sat, 7 Dec 2024 18:00:23 +0530 Subject: [PATCH 5/5] remove configs used in dev testing --- .../src/main/resources/xtable-conversion-defaults.yaml | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/xtable-utilities/src/main/resources/xtable-conversion-defaults.yaml b/xtable-utilities/src/main/resources/xtable-conversion-defaults.yaml index af646dbbd..9d3686fb4 100644 --- a/xtable-utilities/src/main/resources/xtable-conversion-defaults.yaml +++ b/xtable-utilities/src/main/resources/xtable-conversion-defaults.yaml @@ -37,7 +37,6 @@ tableFormatConverters: configuration: spark.master: local[2] spark.app.name: xtable - spark.databricks.delta.commitValidation.enabled: false ICEBERG: conversionSourceProviderClass: org.apache.xtable.iceberg.IcebergConversionSourceProvider - conversionTargetProviderClass: org.apache.xtable.iceberg.IcebergConversionTarget \ No newline at end of file + conversionTargetProviderClass: org.apache.xtable.iceberg.IcebergConversionTarget