diff --git a/pom.xml b/pom.xml
index bed4d63b4..4c313f4c5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -53,7 +53,7 @@
xtable-utilities
xtable-aws
xtable-hive-metastore
- xtable-service
+
@@ -713,7 +713,7 @@
${skipUTs}
- true
+ false
false
120
diff --git a/xtable-core/pom.xml b/xtable-core/pom.xml
index 6bd5282c7..c8675e341 100644
--- a/xtable-core/pom.xml
+++ b/xtable-core/pom.xml
@@ -110,6 +110,19 @@
test
+
+ io.delta
+ delta-kernel-api
+ 4.0.0
+
+
+
+ io.delta
+ delta-kernel-defaults
+ 4.0.0
+
+
+
org.apache.hadoop
diff --git a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaKernelActionsConverter.java b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaKernelActionsConverter.java
new file mode 100644
index 000000000..6531ebb6e
--- /dev/null
+++ b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaKernelActionsConverter.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.xtable.delta;
+
+import static org.apache.xtable.delta.DeltaActionsConverter.getFullPathToFile;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
+import scala.collection.JavaConverters;
+
+import io.delta.kernel.Table;
+import io.delta.kernel.defaults.engine.DefaultEngine;
+import io.delta.kernel.engine.Engine;
+import io.delta.kernel.internal.actions.AddFile;
+import io.delta.kernel.types.*;
+
+import org.apache.xtable.exception.NotSupportedException;
+import org.apache.xtable.model.schema.InternalField;
+import org.apache.xtable.model.schema.InternalPartitionField;
+import org.apache.xtable.model.stat.ColumnStat;
+import org.apache.xtable.model.stat.FileStats;
+import org.apache.xtable.model.storage.FileFormat;
+import org.apache.xtable.model.storage.InternalDataFile;
+
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public class DeltaKernelActionsConverter {
+ private static final DeltaKernelActionsConverter INSTANCE = new DeltaKernelActionsConverter();
+
+ public static DeltaKernelActionsConverter getInstance() {
+ return INSTANCE;
+ }
+
+ public InternalDataFile convertAddActionToInternalDataFile(
+ AddFile addFile,
+ Table table,
+ FileFormat fileFormat,
+ List partitionFields,
+ List fields,
+ boolean includeColumnStats,
+ DeltaKernelPartitionExtractor partitionExtractor,
+ DeltaKernelStatsExtractor fileStatsExtractor,
+ Map partitionValues) {
+ FileStats fileStats = fileStatsExtractor.getColumnStatsForFile(addFile, fields);
+ List columnStats =
+ includeColumnStats ? fileStats.getColumnStats() : Collections.emptyList();
+ long recordCount = fileStats.getNumRecords();
+ // The immutable map from Java to Scala is not working, need to
+
+ scala.collection.mutable.Map scalaMap =
+ JavaConverters.mapAsScalaMap(partitionValues);
+
+ return InternalDataFile.builder()
+ .physicalPath(getFullPathToFile(addFile.getPath(), table))
+ .fileFormat(fileFormat)
+ .fileSizeBytes(addFile.getSize())
+ .lastModified(addFile.getModificationTime())
+ .partitionValues(partitionExtractor.partitionValueExtraction(scalaMap, partitionFields))
+ .columnStats(columnStats)
+ .recordCount(recordCount)
+ .build();
+ }
+
+ public FileFormat convertToFileFormat(String provider) {
+ if (provider.equals("parquet")) {
+ return FileFormat.APACHE_PARQUET;
+ } else if (provider.equals("orc")) {
+ return FileFormat.APACHE_ORC;
+ }
+ throw new NotSupportedException(
+ String.format("delta file format %s is not recognized", provider));
+ }
+
+ static String getFullPathToFile(String dataFilePath, Table table) {
+ Configuration hadoopConf = new Configuration();
+ Engine myEngine = DefaultEngine.create(hadoopConf);
+ String tableBasePath = table.getPath(myEngine);
+ ;
+ // String tableBasePath = snapshot.dataPath().toUri().toString();
+ if (dataFilePath.startsWith(tableBasePath)) {
+ return dataFilePath;
+ }
+ return tableBasePath + Path.SEPARATOR + dataFilePath;
+ }
+}
diff --git a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaKernelConversionSourceProvider.java b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaKernelConversionSourceProvider.java
new file mode 100644
index 000000000..c81353dac
--- /dev/null
+++ b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaKernelConversionSourceProvider.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.xtable.delta;
+
+import org.apache.hadoop.conf.Configuration;
+
+import io.delta.kernel.defaults.engine.DefaultEngine;
+import io.delta.kernel.engine.Engine;
+
+import org.apache.xtable.conversion.ConversionSourceProvider;
+import org.apache.xtable.conversion.SourceTable;
+import org.apache.xtable.kernel.DeltaKernelConversionSource;
+
+public class DeltaKernelConversionSourceProvider extends ConversionSourceProvider {
+ @Override
+ public DeltaKernelConversionSource getConversionSourceInstance(SourceTable sourceTable) {
+ Configuration hadoopConf = new Configuration();
+ Engine engine = DefaultEngine.create(hadoopConf);
+ // DeltaTable deltaTable = DeltaT/able.forPath(sourceTable.getBasePath());
+ return DeltaKernelConversionSource.builder()
+ .tableName(sourceTable.getName())
+ .basePath(sourceTable.getBasePath())
+ .engine(engine)
+ .build();
+ }
+}
diff --git a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaKernelDataFileExtractor.java b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaKernelDataFileExtractor.java
new file mode 100644
index 000000000..ecc0c1276
--- /dev/null
+++ b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaKernelDataFileExtractor.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.xtable.delta;
+
+// import scala.collection.Map;
+import java.util.*;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import lombok.Builder;
+
+import io.delta.kernel.Snapshot;
+import io.delta.kernel.Table;
+import io.delta.kernel.data.FilteredColumnarBatch;
+import io.delta.kernel.data.Row;
+import io.delta.kernel.engine.Engine;
+import io.delta.kernel.internal.InternalScanFileUtils;
+import io.delta.kernel.internal.ScanImpl;
+import io.delta.kernel.internal.SnapshotImpl;
+import io.delta.kernel.internal.actions.AddFile;
+import io.delta.kernel.types.StructField;
+import io.delta.kernel.types.StructType;
+import io.delta.kernel.utils.CloseableIterator;
+
+import org.apache.xtable.model.schema.InternalField;
+import org.apache.xtable.model.schema.InternalPartitionField;
+import org.apache.xtable.model.schema.InternalSchema;
+import org.apache.xtable.model.storage.FileFormat;
+import org.apache.xtable.model.storage.InternalDataFile;
+import org.apache.xtable.spi.extractor.DataFileIterator;
+
+/** DeltaDataFileExtractor lets the consumer iterate over partitions. */
+@Builder
+public class DeltaKernelDataFileExtractor {
+
+ @Builder.Default
+ private final DeltaKernelPartitionExtractor partitionExtractor =
+ DeltaKernelPartitionExtractor.getInstance();
+
+ @Builder.Default
+ private final DeltaKernelStatsExtractor fileStatsExtractor =
+ DeltaKernelStatsExtractor.getInstance();
+
+ @Builder.Default
+ private final DeltaKernelActionsConverter actionsConverter =
+ DeltaKernelActionsConverter.getInstance();
+
+ private final String basePath;
+
+ /**
+ * Initializes an iterator for Delta Lake files.
+ *
+ * @return Delta table file iterator
+ */
+ public DataFileIterator iterator(
+ Snapshot deltaSnapshot, Table table, Engine engine, InternalSchema schema) {
+ return new DeltaDataFileIterator(deltaSnapshot, table, engine, schema, true);
+ }
+
+ public class DeltaDataFileIterator implements DataFileIterator {
+ private final FileFormat fileFormat;
+ private final List fields;
+ private final List partitionFields;
+ private Iterator dataFilesIterator = Collections.emptyIterator();
+
+ private DeltaDataFileIterator(
+ Snapshot snapshot,
+ Table table,
+ Engine engine,
+ InternalSchema schema,
+ boolean includeColumnStats) {
+ String provider = ((SnapshotImpl) snapshot).getMetadata().getFormat().getProvider();
+ this.fileFormat = actionsConverter.convertToFileFormat(provider);
+
+ this.fields = schema.getFields();
+
+ StructType fullSchema = snapshot.getSchema(); // The full table schema
+ List partitionColumns = snapshot.getPartitionColumnNames(); // List
+
+ List partitionFields_strfld =
+ fullSchema.fields().stream()
+ .filter(field -> partitionColumns.contains(field.getName()))
+ .collect(Collectors.toList());
+
+ StructType partitionSchema = new StructType(partitionFields_strfld);
+
+ this.partitionFields =
+ partitionExtractor.convertFromDeltaPartitionFormat(schema, partitionSchema);
+
+ ScanImpl myScan = (ScanImpl) snapshot.getScanBuilder().build();
+ CloseableIterator scanFiles =
+ myScan.getScanFiles(engine, includeColumnStats);
+
+ List dataFiles = new ArrayList<>();
+ this.dataFilesIterator =
+ Collections
+ .emptyIterator(); // Initialize the dataFilesIterator by iterating over the scan files
+ while (scanFiles.hasNext()) {
+ FilteredColumnarBatch scanFileColumnarBatch = scanFiles.next();
+ CloseableIterator scanFileRows = scanFileColumnarBatch.getRows();
+ while (scanFileRows.hasNext()) {
+ Row scanFileRow = scanFileRows.next();
+ // From the scan file row, extract the file path, size and modification time metadata
+ // needed to read the file.
+ AddFile addFile =
+ new AddFile(scanFileRow.getStruct(scanFileRow.getSchema().indexOf("add")));
+ Map partitionValues =
+ InternalScanFileUtils.getPartitionValues(scanFileRow);
+ // Convert the FileStatus to InternalDataFile using the actionsConverter
+ dataFiles.add(
+ actionsConverter.convertAddActionToInternalDataFile(
+ addFile,
+ table,
+ fileFormat,
+ partitionFields,
+ fields,
+ includeColumnStats,
+ partitionExtractor,
+ fileStatsExtractor,
+ partitionValues));
+ }
+ }
+ this.dataFilesIterator = dataFiles.iterator();
+ }
+
+ @Override
+ public void close() throws Exception {}
+
+ @Override
+ public boolean hasNext() {
+ return this.dataFilesIterator.hasNext();
+ }
+
+ @Override
+ public InternalDataFile next() {
+ return dataFilesIterator.next();
+ }
+ }
+}
diff --git a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaKernelPartitionExtractor.java b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaKernelPartitionExtractor.java
new file mode 100644
index 000000000..cf81b73a1
--- /dev/null
+++ b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaKernelPartitionExtractor.java
@@ -0,0 +1,540 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.xtable.delta;
+
+import static org.apache.xtable.collectors.CustomCollectors.toList;
+import static org.apache.xtable.delta.DeltaValueConverter.convertFromDeltaPartitionValue;
+import static org.apache.xtable.delta.DeltaValueConverter.convertToDeltaPartitionValue;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import lombok.AccessLevel;
+import lombok.Builder;
+import lombok.NoArgsConstructor;
+import lombok.extern.log4j.Log4j2;
+
+import org.apache.spark.sql.types.Metadata;
+
+import scala.collection.JavaConverters;
+
+import com.google.common.collect.Iterators;
+import com.google.common.collect.PeekingIterator;
+
+import io.delta.kernel.types.*;
+import io.delta.kernel.types.FieldMetadata;
+
+import org.apache.xtable.exception.PartitionSpecException;
+import org.apache.xtable.model.schema.InternalPartitionField;
+import org.apache.xtable.model.schema.InternalSchema;
+import org.apache.xtable.model.schema.PartitionTransformType;
+import org.apache.xtable.model.stat.PartitionValue;
+import org.apache.xtable.model.stat.Range;
+import org.apache.xtable.model.storage.InternalDataFile;
+import org.apache.xtable.schema.SchemaFieldFinder;
+
+@Log4j2
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public class DeltaKernelPartitionExtractor {
+ private static final DeltaKernelPartitionExtractor INSTANCE = new DeltaKernelPartitionExtractor();
+ private static final String CAST_FUNCTION = "CAST(%s as DATE)";
+ private static final String DATE_FORMAT_FUNCTION = "DATE_FORMAT(%s, '%s')";
+ private static final String YEAR_FUNCTION = "YEAR(%s)";
+ private static final String DATE_FORMAT_FOR_HOUR = "yyyy-MM-dd-HH";
+ private static final String DATE_FORMAT_FOR_DAY = "yyyy-MM-dd";
+ private static final String DATE_FORMAT_FOR_MONTH = "yyyy-MM";
+ private static final String DATE_FORMAT_FOR_YEAR = "yyyy";
+ private static final String BUCKET_FUNCTION = "MOD((HASH(%s) & %d), %d)";
+ // For timestamp partition fields, actual partition column names in delta format will be of type
+ // generated & and with a name like `delta_partition_col_{transform_type}_{source_field_name}`.
+ private static final String DELTA_PARTITION_COL_NAME_FORMAT = "xtable_partition_col_%s_%s";
+ static final String DELTA_GENERATION_EXPRESSION = "delta.generationExpression";
+ private static final List GRANULARITIES =
+ Arrays.asList(
+ ParsedGeneratedExpr.GeneratedExprType.YEAR,
+ ParsedGeneratedExpr.GeneratedExprType.MONTH,
+ ParsedGeneratedExpr.GeneratedExprType.DAY,
+ ParsedGeneratedExpr.GeneratedExprType.HOUR);
+
+ public static DeltaKernelPartitionExtractor getInstance() {
+ return INSTANCE;
+ }
+
+ /**
+ * Extracts partition fields from delta table. Partitioning by nested columns isn't supported.
+ * Example: Given a delta table and a reference to DeltaLog, method parameters can be obtained by
+ * deltaLog = DeltaLog.forTable(spark, deltaTablePath); InternalSchema internalSchema =
+ * DeltaSchemaExtractor.getInstance().toInternalSchema(deltaLog.snapshot().schema()); StructType
+ * partitionSchema = deltaLog.metadata().partitionSchema();
+ *
+ * @param internalSchema canonical representation of the schema.
+ * @param partitionSchema partition schema of the delta table.
+ * @return list of canonical representation of the partition fields
+ */
+ public List convertFromDeltaPartitionFormat(
+ InternalSchema internalSchema, StructType partitionSchema) {
+ if (partitionSchema.fields().size() == 0) {
+ return Collections.emptyList();
+ }
+ return getInternalPartitionFields(partitionSchema, internalSchema);
+ }
+
+ /**
+ * If all of them are value process individually and return. If they contain month they should
+ * contain year as well. If they contain day they should contain month and year as well. If they
+ * contain hour they should contain day, month and year as well. Other supports CAST(col as DATE)
+ * and DATE_FORMAT(col, 'yyyy-MM-dd'). Partition by nested fields may not be fully supported.
+ */
+ private List getInternalPartitionFields(
+ StructType partitionSchema, InternalSchema internalSchema) {
+ PeekingIterator itr =
+ Iterators.peekingIterator(partitionSchema.fields().iterator());
+ List partitionFields = new ArrayList<>(partitionSchema.fields().size());
+ while (itr.hasNext()) {
+ StructField currPartitionField = itr.peek();
+ if (!currPartitionField.getMetadata().contains(DELTA_GENERATION_EXPRESSION)) {
+ partitionFields.add(
+ InternalPartitionField.builder()
+ .sourceField(
+ SchemaFieldFinder.getInstance()
+ .findFieldByPath(internalSchema, currPartitionField.getName()))
+ .transformType(PartitionTransformType.VALUE)
+ .build());
+ itr.next(); // consume the field.
+ } else {
+ // Partition contains generated expression.
+ // if it starts with year we should consume until we hit field with no generated expression
+ // or we hit a field with generated expression that is of cast or date format.
+ String expr = currPartitionField.getMetadata().getString(DELTA_GENERATION_EXPRESSION);
+ ParsedGeneratedExpr parsedGeneratedExpr =
+ ParsedGeneratedExpr.buildFromString(currPartitionField.getName(), expr);
+ if (ParsedGeneratedExpr.GeneratedExprType.CAST == parsedGeneratedExpr.generatedExprType) {
+ partitionFields.add(
+ getPartitionWithDateTransform(
+ currPartitionField.getName(), parsedGeneratedExpr, internalSchema));
+ itr.next(); // consume the field.
+ } else if (ParsedGeneratedExpr.GeneratedExprType.DATE_FORMAT
+ == parsedGeneratedExpr.generatedExprType) {
+ partitionFields.add(
+ getPartitionWithDateFormatTransform(
+ currPartitionField.getName(), parsedGeneratedExpr, internalSchema));
+ itr.next(); // consume the field.
+ } else {
+ // consume until we hit field with no generated expression or generated expression
+ // that is not of type cast or date format.
+ List parsedGeneratedExprs = new ArrayList<>();
+ while (itr.hasNext()
+ && currPartitionField.getMetadata().contains(DELTA_GENERATION_EXPRESSION)) {
+ expr = currPartitionField.getMetadata().getString(DELTA_GENERATION_EXPRESSION);
+ parsedGeneratedExpr =
+ ParsedGeneratedExpr.buildFromString(currPartitionField.getName(), expr);
+
+ if (ParsedGeneratedExpr.GeneratedExprType.CAST == parsedGeneratedExpr.generatedExprType
+ || ParsedGeneratedExpr.GeneratedExprType.DATE_FORMAT
+ == parsedGeneratedExpr.generatedExprType) {
+ break;
+ }
+ parsedGeneratedExprs.add(parsedGeneratedExpr);
+ itr.next(); // consume the field
+ if (itr.hasNext()) {
+ currPartitionField = itr.peek();
+ }
+ }
+ partitionFields.add(
+ getPartitionColumnsForHourOrDayOrMonthOrYear(parsedGeneratedExprs, internalSchema));
+ }
+ }
+ }
+ return partitionFields;
+ }
+
+ private InternalPartitionField getPartitionColumnsForHourOrDayOrMonthOrYear(
+ List parsedGeneratedExprs, InternalSchema internalSchema) {
+ if (parsedGeneratedExprs.size() > 4) {
+ throw new IllegalStateException("Invalid partition transform");
+ }
+ validate(
+ parsedGeneratedExprs, new HashSet<>(GRANULARITIES.subList(0, parsedGeneratedExprs.size())));
+
+ ParsedGeneratedExpr transform = parsedGeneratedExprs.get(0);
+ List partitionColumns =
+ parsedGeneratedExprs.stream()
+ .map(parsedGeneratedExpr -> parsedGeneratedExpr.partitionColumnName)
+ .collect(toList(parsedGeneratedExprs.size()));
+ return InternalPartitionField.builder()
+ .sourceField(
+ SchemaFieldFinder.getInstance().findFieldByPath(internalSchema, transform.sourceColumn))
+ .partitionFieldNames(partitionColumns)
+ .transformType(
+ parsedGeneratedExprs.get(parsedGeneratedExprs.size() - 1)
+ .internalPartitionTransformType)
+ .build();
+ }
+
+ // Cast has default format of yyyy-MM-dd.
+ private InternalPartitionField getPartitionWithDateTransform(
+ String partitionColumnName,
+ ParsedGeneratedExpr parsedGeneratedExpr,
+ InternalSchema internalSchema) {
+ return InternalPartitionField.builder()
+ .sourceField(
+ SchemaFieldFinder.getInstance()
+ .findFieldByPath(internalSchema, parsedGeneratedExpr.sourceColumn))
+ .partitionFieldNames(Collections.singletonList(partitionColumnName))
+ .transformType(PartitionTransformType.DAY)
+ .build();
+ }
+
+ private InternalPartitionField getPartitionWithDateFormatTransform(
+ String partitionColumnName,
+ ParsedGeneratedExpr parsedGeneratedExpr,
+ InternalSchema internalSchema) {
+ return InternalPartitionField.builder()
+ .sourceField(
+ SchemaFieldFinder.getInstance()
+ .findFieldByPath(internalSchema, parsedGeneratedExpr.sourceColumn))
+ .partitionFieldNames(Collections.singletonList(partitionColumnName))
+ .transformType(parsedGeneratedExpr.internalPartitionTransformType)
+ .build();
+ }
+
+ public Map convertToDeltaPartitionFormat(
+ List partitionFields) {
+ if (partitionFields == null) {
+ return null;
+ }
+ Map nameToStructFieldMap = new HashMap<>();
+ for (InternalPartitionField internalPartitionField : partitionFields) {
+ String currPartitionColumnName;
+ StructField field;
+
+ if (internalPartitionField.getTransformType() == PartitionTransformType.VALUE) {
+ currPartitionColumnName = internalPartitionField.getSourceField().getName();
+ field = null;
+ } else {
+ // Since partition field of timestamp or bucket type, create new field in schema.
+ field = getGeneratedField(internalPartitionField);
+ currPartitionColumnName = field.getName();
+ }
+ nameToStructFieldMap.put(currPartitionColumnName, field);
+ }
+ return nameToStructFieldMap;
+ }
+
+ public Map partitionValueSerialization(InternalDataFile internalDataFile) {
+ Map partitionValuesSerialized = new HashMap<>();
+ if (internalDataFile.getPartitionValues() == null
+ || internalDataFile.getPartitionValues().isEmpty()) {
+ return partitionValuesSerialized;
+ }
+ for (PartitionValue partitionValue : internalDataFile.getPartitionValues()) {
+ InternalPartitionField partitionField = partitionValue.getPartitionField();
+ PartitionTransformType transformType = partitionField.getTransformType();
+ String partitionValueSerialized;
+ if (transformType == PartitionTransformType.VALUE) {
+ partitionValueSerialized =
+ convertToDeltaPartitionValue(
+ partitionValue.getRange().getMaxValue(),
+ partitionField.getSourceField().getSchema().getDataType(),
+ transformType,
+ "");
+ partitionValuesSerialized.put(
+ partitionField.getSourceField().getName(), partitionValueSerialized);
+ } else if (transformType == PartitionTransformType.BUCKET) {
+ partitionValueSerialized = partitionValue.getRange().getMaxValue().toString();
+ partitionValuesSerialized.put(
+ getGeneratedColumnName(partitionField), partitionValueSerialized);
+ } else {
+ // use appropriate date formatter for value serialization.
+ partitionValueSerialized =
+ convertToDeltaPartitionValue(
+ partitionValue.getRange().getMaxValue(),
+ partitionField.getSourceField().getSchema().getDataType(),
+ transformType,
+ getDateFormat(partitionField.getTransformType()));
+ partitionValuesSerialized.put(
+ getGeneratedColumnName(partitionField), partitionValueSerialized);
+ }
+ }
+ return partitionValuesSerialized;
+ }
+
+ public List partitionValueExtraction(
+ scala.collection.Map values, List partitionFields) {
+ return partitionFields.stream()
+ .map(
+ partitionField -> {
+ PartitionTransformType partitionTransformType = partitionField.getTransformType();
+ String dateFormat =
+ partitionTransformType.isTimeBased()
+ ? getDateFormat(partitionTransformType)
+ : null;
+ String serializedValue =
+ getSerializedPartitionValue(convertScalaMapToJavaMap(values), partitionField);
+ Object partitionValue =
+ convertFromDeltaPartitionValue(
+ serializedValue,
+ partitionField.getSourceField().getSchema().getDataType(),
+ partitionField.getTransformType(),
+ dateFormat);
+ return PartitionValue.builder()
+ .partitionField(partitionField)
+ .range(Range.scalar(partitionValue))
+ .build();
+ })
+ .collect(toList(partitionFields.size()));
+ }
+
+ private String getSerializedPartitionValue(
+ Map values, InternalPartitionField partitionField) {
+ if (partitionField.getPartitionFieldNames() == null
+ || partitionField.getPartitionFieldNames().isEmpty()) {
+ return values.getOrDefault(partitionField.getSourceField().getName(), null);
+ }
+ List partitionFieldNames = partitionField.getPartitionFieldNames();
+ if (partitionFieldNames.size() == 1) {
+ return values.getOrDefault(partitionFieldNames.get(0), null);
+ }
+ return partitionFieldNames.stream()
+ .map(name -> values.get(name))
+ .collect(Collectors.joining("-"));
+ }
+
+ private String getGeneratedColumnName(InternalPartitionField internalPartitionField) {
+ return String.format(
+ DELTA_PARTITION_COL_NAME_FORMAT,
+ internalPartitionField.getTransformType().toString(),
+ internalPartitionField.getSourceField().getName());
+ }
+
+ private String getDateFormat(PartitionTransformType transformType) {
+ switch (transformType) {
+ case YEAR:
+ return DATE_FORMAT_FOR_YEAR;
+ case MONTH:
+ return DATE_FORMAT_FOR_MONTH;
+ case DAY:
+ return DATE_FORMAT_FOR_DAY;
+ case HOUR:
+ return DATE_FORMAT_FOR_HOUR;
+ default:
+ throw new PartitionSpecException("Invalid transform type");
+ }
+ }
+
+ private StructField getGeneratedField(InternalPartitionField internalPartitionField) {
+ String generatedExpression;
+ DataType dataType;
+ String currPartitionColumnName = getGeneratedColumnName(internalPartitionField);
+ switch (internalPartitionField.getTransformType()) {
+ case YEAR:
+ generatedExpression =
+ String.format(YEAR_FUNCTION, internalPartitionField.getSourceField().getPath());
+ dataType = IntegerType.INTEGER;
+ break;
+ case MONTH:
+ case HOUR:
+ generatedExpression =
+ String.format(
+ DATE_FORMAT_FUNCTION,
+ internalPartitionField.getSourceField().getPath(),
+ getDateFormat(internalPartitionField.getTransformType()));
+ dataType = IntegerType.INTEGER;
+ break;
+ case DAY:
+ generatedExpression =
+ String.format(CAST_FUNCTION, internalPartitionField.getSourceField().getPath());
+ dataType = DateType.DATE;
+ break;
+ case BUCKET:
+ generatedExpression =
+ String.format(
+ BUCKET_FUNCTION,
+ internalPartitionField.getSourceField().getPath(),
+ Integer.MAX_VALUE,
+ (int)
+ internalPartitionField
+ .getTransformOptions()
+ .get(InternalPartitionField.NUM_BUCKETS));
+ dataType = IntegerType.INTEGER;
+ break;
+ default:
+ throw new PartitionSpecException("Invalid transform type");
+ }
+ Map generatedExpressionMetadata =
+ Collections.singletonMap(DELTA_GENERATION_EXPRESSION, generatedExpression);
+ Metadata partitionFieldMetadata =
+ new Metadata(ScalaUtils.convertJavaMapToScala(generatedExpressionMetadata));
+ return new StructField(currPartitionColumnName, dataType, true, FieldMetadata.empty());
+ }
+
+ private void validate(
+ List parsedGeneratedExprs,
+ Set expectedTypesToBePresent) {
+ Set sourceFields =
+ parsedGeneratedExprs.stream().map(expr -> expr.sourceColumn).collect(Collectors.toSet());
+ if (sourceFields.size() > 1) {
+ log.error(
+ String.format("Multiple source columns found for partition transform: %s", sourceFields));
+ throw new PartitionSpecException(
+ String.format("Multiple source columns found for partition transform: %s", sourceFields));
+ }
+ Set actualTypesPresent =
+ parsedGeneratedExprs.stream()
+ .map(expr -> expr.generatedExprType)
+ .collect(Collectors.toSet());
+ if (!actualTypesPresent.equals(expectedTypesToBePresent)) {
+ log.error(
+ "Mismatched types present. Expected: "
+ + expectedTypesToBePresent
+ + ", Found: "
+ + actualTypesPresent);
+ throw new PartitionSpecException(
+ "Mismatched types present. Expected: "
+ + expectedTypesToBePresent
+ + ", Found: "
+ + actualTypesPresent);
+ }
+ }
+
+ private Map convertScalaMapToJavaMap(
+ scala.collection.Map scalaMap) {
+ return JavaConverters.mapAsJavaMapConverter(scalaMap).asJava();
+ }
+
+ @Builder
+ static class ParsedGeneratedExpr {
+ private static final Pattern YEAR_PATTERN = Pattern.compile("YEAR\\(([^)]+)\\)");
+ private static final Pattern MONTH_PATTERN = Pattern.compile("MONTH\\(([^)]+)\\)");
+ private static final Pattern DAY_PATTERN = Pattern.compile("DAY\\(([^)]+)\\)");
+ private static final Pattern HOUR_PATTERN = Pattern.compile("HOUR\\(([^)]+)\\)");
+ private static final Pattern CAST_PATTERN = Pattern.compile("CAST\\(([^ ]+) AS DATE\\)");
+ private static final Pattern DATE_FORMAT_PATTERN =
+ Pattern.compile("DATE_FORMAT\\(([^,]+),[^']+'([^']+)'\\)");
+
+ enum GeneratedExprType {
+ YEAR,
+ MONTH,
+ DAY,
+ HOUR,
+ CAST,
+ DATE_FORMAT
+ }
+
+ String sourceColumn;
+ String partitionColumnName;
+ GeneratedExprType generatedExprType;
+ PartitionTransformType internalPartitionTransformType;
+
+ private static ParsedGeneratedExpr buildFromString(String partitionColumnName, String expr) {
+ if (expr.contains("YEAR")) {
+ return ParsedGeneratedExpr.builder()
+ .generatedExprType(GeneratedExprType.YEAR)
+ .partitionColumnName(partitionColumnName)
+ .sourceColumn(extractColumnName(expr, YEAR_PATTERN))
+ .internalPartitionTransformType(PartitionTransformType.YEAR)
+ .build();
+ } else if (expr.contains("MONTH")) {
+ return ParsedGeneratedExpr.builder()
+ .generatedExprType(GeneratedExprType.MONTH)
+ .partitionColumnName(partitionColumnName)
+ .sourceColumn(extractColumnName(expr, MONTH_PATTERN))
+ .internalPartitionTransformType(PartitionTransformType.MONTH)
+ .build();
+ } else if (expr.contains("DAY")) {
+ return ParsedGeneratedExpr.builder()
+ .generatedExprType(GeneratedExprType.DAY)
+ .partitionColumnName(partitionColumnName)
+ .sourceColumn(extractColumnName(expr, DAY_PATTERN))
+ .internalPartitionTransformType(PartitionTransformType.DAY)
+ .build();
+ } else if (expr.contains("HOUR")) {
+ return ParsedGeneratedExpr.builder()
+ .generatedExprType(GeneratedExprType.HOUR)
+ .partitionColumnName(partitionColumnName)
+ .sourceColumn(extractColumnName(expr, HOUR_PATTERN))
+ .internalPartitionTransformType(PartitionTransformType.HOUR)
+ .build();
+ } else if (expr.contains("CAST")) {
+ return ParsedGeneratedExpr.builder()
+ .generatedExprType(GeneratedExprType.CAST)
+ .partitionColumnName(partitionColumnName)
+ .sourceColumn(extractColumnName(expr, CAST_PATTERN))
+ .internalPartitionTransformType(PartitionTransformType.DAY)
+ .build();
+ } else if (expr.contains("DATE_FORMAT")) {
+ Matcher matcher = DATE_FORMAT_PATTERN.matcher(expr);
+ if (matcher.find()) {
+ /*
+ * from DATE_FORMAT(source_col, 'yyyy-MM-dd-HH') the code below extracts yyyy-MM-dd-HH.
+ */
+ String fieldName = matcher.group(1);
+ String dateFormatExpr = matcher.group(2);
+ return ParsedGeneratedExpr.builder()
+ .generatedExprType(GeneratedExprType.DATE_FORMAT)
+ .partitionColumnName(partitionColumnName)
+ .sourceColumn(fieldName)
+ .internalPartitionTransformType(computeInternalPartitionTransform(dateFormatExpr))
+ .build();
+ } else {
+ throw new IllegalArgumentException("Could not extract values from: " + expr);
+ }
+ } else {
+ throw new IllegalArgumentException(
+ "Unsupported expression for generated expression: " + expr);
+ }
+ }
+
+ // Supporting granularity as per https://docs.databricks.com/en/delta/generated-columns.html
+ private static PartitionTransformType computeInternalPartitionTransform(String dateFormatExpr) {
+ if (DATE_FORMAT_FOR_HOUR.equals(dateFormatExpr)) {
+ return PartitionTransformType.HOUR;
+ } else if (DATE_FORMAT_FOR_DAY.equals(dateFormatExpr)) {
+ return PartitionTransformType.DAY;
+ } else if (DATE_FORMAT_FOR_MONTH.equals(dateFormatExpr)) {
+ return PartitionTransformType.MONTH;
+ } else {
+ throw new IllegalArgumentException(
+ String.format(
+ "Unsupported date format expression: %s for generated expression", dateFormatExpr));
+ }
+ }
+
+ private static String extractColumnName(String expr, Pattern regexPattern) {
+ Matcher matcher = regexPattern.matcher(expr);
+ if (matcher.find()) {
+ return matcher.group(1).trim();
+ }
+ throw new IllegalArgumentException(
+ "Could not extract column name from: "
+ + expr
+ + " using pattern: "
+ + regexPattern.pattern());
+ }
+ }
+}
diff --git a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaKernelSchemaExtractor.java b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaKernelSchemaExtractor.java
new file mode 100644
index 000000000..5371a2b9b
--- /dev/null
+++ b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaKernelSchemaExtractor.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.xtable.delta;
+
+import java.util.*;
+
+import io.delta.kernel.types.*;
+
+import org.apache.xtable.collectors.CustomCollectors;
+import org.apache.xtable.model.schema.InternalField;
+import org.apache.xtable.model.schema.InternalSchema;
+import org.apache.xtable.model.schema.InternalType;
+import org.apache.xtable.schema.SchemaUtils;
+
+public class DeltaKernelSchemaExtractor {
+
+ private static final String DELTA_COLUMN_MAPPING_ID = "delta.columnMapping.id";
+ private static final DeltaKernelSchemaExtractor INSTANCE = new DeltaKernelSchemaExtractor();
+ private static final Map
+ DEFAULT_TIMESTAMP_PRECISION_METADATA =
+ Collections.singletonMap(
+ InternalSchema.MetadataKey.TIMESTAMP_PRECISION, InternalSchema.MetadataValue.MICROS);
+
+ public static DeltaKernelSchemaExtractor getInstance() {
+ return INSTANCE;
+ }
+
+ public InternalSchema toInternalSchema(StructType structType) {
+ return toInternalSchema(structType, null, false, null, null);
+ }
+
+ String trimmedTypeName = "";
+ InternalType type = null;
+
+ private InternalSchema toInternalSchema(
+ DataType dataType,
+ String parentPath,
+ boolean nullable,
+ String comment,
+ FieldMetadata originalMetadata) {
+
+ Map metadata = null;
+ List fields = null;
+
+ if (dataType instanceof IntegerType) {
+ type = InternalType.INT;
+ trimmedTypeName = "integer";
+ } else if (dataType instanceof StringType) {
+ type = InternalType.STRING;
+ trimmedTypeName = "string";
+ } else if (dataType instanceof BooleanType) {
+ type = InternalType.BOOLEAN;
+ trimmedTypeName = "boolean";
+ } else if (dataType instanceof FloatType) {
+ type = InternalType.FLOAT;
+ trimmedTypeName = "float";
+ } else if (dataType instanceof DoubleType) {
+ type = InternalType.DOUBLE;
+ trimmedTypeName = "double";
+ } else if (dataType instanceof BinaryType) {
+ if (originalMetadata.contains(InternalSchema.XTABLE_LOGICAL_TYPE)
+ && "uuid".equals(originalMetadata.getString(InternalSchema.XTABLE_LOGICAL_TYPE))) {
+ type = InternalType.UUID;
+ trimmedTypeName = "binary";
+ } else {
+ type = InternalType.BYTES;
+ trimmedTypeName = "binary";
+ }
+ } else if (dataType instanceof LongType) {
+ type = InternalType.LONG;
+ trimmedTypeName = "long";
+ } else if (dataType instanceof DateType) {
+ type = InternalType.DATE;
+ trimmedTypeName = "date";
+ } else if (dataType instanceof TimestampType) {
+ type = InternalType.TIMESTAMP;
+ metadata = DEFAULT_TIMESTAMP_PRECISION_METADATA;
+ trimmedTypeName = "timestamp";
+ } else if (dataType instanceof TimestampNTZType) {
+ type = InternalType.TIMESTAMP_NTZ;
+ metadata = DEFAULT_TIMESTAMP_PRECISION_METADATA;
+ trimmedTypeName = "timestamp_ntz";
+ } else if (dataType instanceof StructType) {
+ // Handle StructType
+ StructType structType = (StructType) dataType;
+ // your logic here
+
+ fields =
+ structType.fields().stream()
+ .filter(
+ field ->
+ !field
+ .getMetadata()
+ .contains(DeltaPartitionExtractor.DELTA_GENERATION_EXPRESSION))
+ .map(
+ field -> {
+ Integer fieldId =
+ field.getMetadata().contains(DELTA_COLUMN_MAPPING_ID)
+ ? Long.valueOf(field.getMetadata().getLong(DELTA_COLUMN_MAPPING_ID))
+ .intValue()
+ : null;
+ String fieldComment =
+ field.getMetadata().contains("comment")
+ ? field.getMetadata().getString("comment")
+ : null;
+ InternalSchema schema =
+ toInternalSchema(
+ field.getDataType(),
+ SchemaUtils.getFullyQualifiedPath(parentPath, field.getName()),
+ field.isNullable(),
+ fieldComment,
+ field.getMetadata());
+ return InternalField.builder()
+ .name(field.getName())
+ .fieldId(fieldId)
+ .parentPath(parentPath)
+ .schema(schema)
+ .defaultValue(
+ field.isNullable() ? InternalField.Constants.NULL_DEFAULT_VALUE : null)
+ .build();
+ })
+ .collect(CustomCollectors.toList(structType.fields().size()));
+ type = InternalType.RECORD;
+ trimmedTypeName = "struct";
+ } else if (dataType instanceof DecimalType) {
+ DecimalType decimalType = (DecimalType) dataType;
+ metadata = new HashMap<>(2, 1.0f);
+ metadata.put(InternalSchema.MetadataKey.DECIMAL_PRECISION, decimalType.getPrecision());
+ metadata.put(InternalSchema.MetadataKey.DECIMAL_SCALE, decimalType.getScale());
+ type = InternalType.DECIMAL;
+ trimmedTypeName = "decimal";
+
+ } else if (dataType instanceof ArrayType) {
+ ArrayType arrayType = (ArrayType) dataType;
+ InternalSchema elementSchema =
+ toInternalSchema(
+ arrayType.getElementType(),
+ SchemaUtils.getFullyQualifiedPath(
+ parentPath, InternalField.Constants.ARRAY_ELEMENT_FIELD_NAME),
+ arrayType.containsNull(),
+ null,
+ null);
+ InternalField elementField =
+ InternalField.builder()
+ .name(InternalField.Constants.ARRAY_ELEMENT_FIELD_NAME)
+ .parentPath(parentPath)
+ .schema(elementSchema)
+ .build();
+ type = InternalType.LIST;
+ fields = Collections.singletonList(elementField);
+ trimmedTypeName = "array";
+ } else if (dataType instanceof MapType) {
+ MapType mapType = (MapType) dataType;
+ InternalSchema keySchema =
+ toInternalSchema(
+ mapType.getKeyType(),
+ SchemaUtils.getFullyQualifiedPath(
+ parentPath, InternalField.Constants.MAP_VALUE_FIELD_NAME),
+ false,
+ null,
+ null);
+ InternalField keyField =
+ InternalField.builder()
+ .name(InternalField.Constants.MAP_KEY_FIELD_NAME)
+ .parentPath(parentPath)
+ .schema(keySchema)
+ .build();
+ InternalSchema valueSchema =
+ toInternalSchema(
+ mapType.getValueType(),
+ SchemaUtils.getFullyQualifiedPath(
+ parentPath, InternalField.Constants.MAP_VALUE_FIELD_NAME),
+ mapType.isValueContainsNull(),
+ null,
+ null);
+ InternalField valueField =
+ InternalField.builder()
+ .name(InternalField.Constants.MAP_VALUE_FIELD_NAME)
+ .parentPath(parentPath)
+ .schema(valueSchema)
+ .build();
+ type = InternalType.MAP;
+ fields = Arrays.asList(keyField, valueField);
+ trimmedTypeName = "map";
+ }
+ return InternalSchema.builder()
+ .name(trimmedTypeName)
+ .dataType(type)
+ .comment(comment)
+ .isNullable(nullable)
+ .metadata(metadata)
+ .fields(fields)
+ .build();
+ }
+}
diff --git a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaKernelStatsExtractor.java b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaKernelStatsExtractor.java
new file mode 100644
index 000000000..bedc063f5
--- /dev/null
+++ b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaKernelStatsExtractor.java
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.xtable.delta;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import lombok.AccessLevel;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Value;
+import lombok.extern.log4j.Log4j2;
+
+import org.apache.commons.lang3.StringUtils;
+
+import com.fasterxml.jackson.annotation.JsonAnySetter;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
+
+import io.delta.kernel.internal.actions.AddFile;
+
+import org.apache.xtable.collectors.CustomCollectors;
+import org.apache.xtable.model.exception.ParseException;
+import org.apache.xtable.model.schema.InternalField;
+import org.apache.xtable.model.schema.InternalSchema;
+import org.apache.xtable.model.schema.InternalType;
+import org.apache.xtable.model.stat.ColumnStat;
+import org.apache.xtable.model.stat.FileStats;
+import org.apache.xtable.model.stat.Range;
+
+/**
+ * DeltaStatsExtractor extracts column stats and also responsible for their serialization leveraging
+ * {@link DeltaValueConverter}.
+ */
+@Log4j2
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public class DeltaKernelStatsExtractor {
+ private static final Set FIELD_TYPES_WITH_STATS_SUPPORT =
+ new HashSet<>(
+ Arrays.asList(
+ InternalType.BOOLEAN,
+ InternalType.DATE,
+ InternalType.DECIMAL,
+ InternalType.DOUBLE,
+ InternalType.INT,
+ InternalType.LONG,
+ InternalType.FLOAT,
+ InternalType.STRING,
+ InternalType.TIMESTAMP,
+ InternalType.TIMESTAMP_NTZ));
+
+ private static final DeltaKernelStatsExtractor INSTANCE = new DeltaKernelStatsExtractor();
+
+ private static final ObjectMapper MAPPER = new ObjectMapper();
+
+ /* this data structure collects type names of all unrecognized Delta Lake stats. For instance
+ data file stats in presence of delete vectors would contain 'tightBounds' stat which is
+ currently not handled by XTable */
+ private final Set unsupportedStats = new HashSet<>();
+
+ public static DeltaKernelStatsExtractor getInstance() {
+ return INSTANCE;
+ }
+
+ public String convertStatsToDeltaFormat(
+ InternalSchema schema, long numRecords, List columnStats)
+ throws JsonProcessingException {
+ DeltaStats.DeltaStatsBuilder deltaStatsBuilder = DeltaStats.builder();
+ deltaStatsBuilder.numRecords(numRecords);
+ if (columnStats == null) {
+ return MAPPER.writeValueAsString(deltaStatsBuilder.build());
+ }
+ Set validPaths = getPathsFromStructSchemaForMinAndMaxStats(schema);
+ List validColumnStats =
+ columnStats.stream()
+ .filter(stat -> validPaths.contains(stat.getField().getPath()))
+ .collect(Collectors.toList());
+ DeltaStats deltaStats =
+ deltaStatsBuilder
+ .minValues(getMinValues(validColumnStats))
+ .maxValues(getMaxValues(validColumnStats))
+ .nullCount(getNullCount(validColumnStats))
+ .build();
+ return MAPPER.writeValueAsString(deltaStats);
+ }
+
+ private Set getPathsFromStructSchemaForMinAndMaxStats(InternalSchema schema) {
+ return schema.getAllFields().stream()
+ .filter(
+ field -> {
+ InternalType type = field.getSchema().getDataType();
+ return FIELD_TYPES_WITH_STATS_SUPPORT.contains(type);
+ })
+ .map(InternalField::getPath)
+ .collect(Collectors.toSet());
+ }
+
+ private Map getMinValues(List validColumnStats) {
+ return getValues(validColumnStats, columnStat -> columnStat.getRange().getMinValue());
+ }
+
+ private Map getMaxValues(List validColumnStats) {
+ return getValues(validColumnStats, columnStat -> columnStat.getRange().getMaxValue());
+ }
+
+ private Map getValues(
+ List validColumnStats, Function valueExtractor) {
+ Map jsonObject = new HashMap<>();
+ validColumnStats.forEach(
+ columnStat -> {
+ InternalField field = columnStat.getField();
+ String[] pathParts = field.getPathParts();
+ insertValueAtPath(
+ jsonObject,
+ pathParts,
+ DeltaValueConverter.convertToDeltaColumnStatValue(
+ valueExtractor.apply(columnStat), field.getSchema()));
+ });
+ return jsonObject;
+ }
+
+ private Map getNullCount(List validColumnStats) {
+ // TODO: Additional work needed to track nulls maps & arrays.
+ Map jsonObject = new HashMap<>();
+ validColumnStats.forEach(
+ columnStat -> {
+ String[] pathParts = columnStat.getField().getPathParts();
+ insertValueAtPath(jsonObject, pathParts, columnStat.getNumNulls());
+ });
+ return jsonObject;
+ }
+
+ private void insertValueAtPath(Map jsonObject, String[] pathParts, Object value) {
+ if (pathParts == null || pathParts.length == 0) {
+ return;
+ }
+ Map currObject = jsonObject;
+ for (int i = 0; i < pathParts.length; i++) {
+ String part = pathParts[i];
+ if (i == pathParts.length - 1) {
+ currObject.put(part, value);
+ } else {
+ if (!currObject.containsKey(part)) {
+ currObject.put(part, new HashMap());
+ }
+ try {
+ currObject = (HashMap) currObject.get(part);
+ } catch (ClassCastException e) {
+ throw new RuntimeException(
+ String.format(
+ "Cannot cast to hashmap while inserting stats at path %s",
+ String.join("->", pathParts)),
+ e);
+ }
+ }
+ }
+ }
+
+ public FileStats getColumnStatsForFile(AddFile addFile, List fields) {
+
+ Optional statsOpt = addFile.getStatsJson();
+ if (!statsOpt.isPresent() || StringUtils.isEmpty(statsOpt.get())) {
+ // No statistics available
+ return FileStats.builder().columnStats(Collections.emptyList()).numRecords(0).build();
+ }
+ // TODO: Additional work needed to track maps & arrays.
+ try {
+ DeltaStats deltaStats = MAPPER.readValue(statsOpt.get(), DeltaStats.class);
+
+ collectUnsupportedStats(deltaStats.getAdditionalStats());
+
+ Map fieldPathToMaxValue = flattenStatMap(deltaStats.getMaxValues());
+ Map fieldPathToMinValue = flattenStatMap(deltaStats.getMinValues());
+ Map fieldPathToNullCount = flattenStatMap(deltaStats.getNullCount());
+ List columnStats =
+ fields.stream()
+ .filter(field -> fieldPathToMaxValue.containsKey(field.getPath()))
+ .map(
+ field -> {
+ String fieldPath = field.getPath();
+ Object minRaw = fieldPathToMinValue.get(fieldPath);
+ Object maxRaw = fieldPathToMaxValue.get(fieldPath);
+ Object nullCountRaw = fieldPathToNullCount.get(fieldPath);
+ Object minValue =
+ minRaw != null
+ ? DeltaValueConverter.convertFromDeltaColumnStatValue(
+ minRaw, field.getSchema())
+ : null;
+ Object maxValue =
+ maxRaw != null
+ ? DeltaValueConverter.convertFromDeltaColumnStatValue(
+ maxRaw, field.getSchema())
+ : null;
+ long nullCount =
+ nullCountRaw instanceof Number ? ((Number) nullCountRaw).longValue() : 0;
+ Range range = Range.vector(minValue, maxValue);
+ return ColumnStat.builder()
+ .field(field)
+ .numValues(deltaStats.getNumRecords())
+ .numNulls(nullCount)
+ .range(range)
+ .build();
+ })
+ .collect(CustomCollectors.toList(fields.size()));
+ return FileStats.builder()
+ .columnStats(columnStats)
+ .numRecords(deltaStats.getNumRecords())
+ .build();
+ } catch (IOException ex) {
+ throw new ParseException("Unable to parse stats json", ex);
+ }
+ }
+
+ private void collectUnsupportedStats(Map additionalStats) {
+ if (additionalStats == null || additionalStats.isEmpty()) {
+ return;
+ }
+
+ additionalStats.keySet().stream()
+ .filter(key -> !unsupportedStats.contains(key))
+ .forEach(
+ key -> {
+ log.info("Unrecognized/unsupported Delta data file stat: {}", key);
+ unsupportedStats.add(key);
+ });
+ }
+
+ /**
+ * Takes the input map which represents a json object and flattens it.
+ *
+ * @param statMap input json map
+ * @return map with keys representing the dot-path for the field
+ */
+ private Map flattenStatMap(Map statMap) {
+ Map result = new HashMap<>();
+ Queue statFieldQueue = new ArrayDeque<>();
+ statFieldQueue.add(StatField.of("", statMap));
+ while (!statFieldQueue.isEmpty()) {
+ StatField statField = statFieldQueue.poll();
+ String prefix = statField.getParentPath().isEmpty() ? "" : statField.getParentPath() + ".";
+ statField
+ .getValues()
+ .forEach(
+ (fieldName, value) -> {
+ String fullName = prefix + fieldName;
+ if (value instanceof Map) {
+ statFieldQueue.add(StatField.of(fullName, (Map) value));
+ } else {
+ result.put(fullName, value);
+ }
+ });
+ }
+ return result;
+ }
+
+ /**
+ * Returns the names of all unsupported stats that have been discovered during the parsing of
+ * Delta Lake stats.
+ *
+ * @return set of unsupported stats
+ */
+ @VisibleForTesting
+ Set getUnsupportedStats() {
+ return Collections.unmodifiableSet(unsupportedStats);
+ }
+
+ @Builder
+ @Value
+ private static class DeltaStats {
+ long numRecords;
+ Map minValues;
+ Map maxValues;
+ Map nullCount;
+
+ /* this is a catch-all for any additional stats that are not explicitly handled */
+ @JsonIgnore
+ @Getter(lazy = true)
+ Map additionalStats = new HashMap<>();
+
+ @JsonAnySetter
+ public void setAdditionalStat(String key, Object value) {
+ getAdditionalStats().put(key, value);
+ }
+ }
+
+ @Value
+ @AllArgsConstructor(staticName = "of")
+ private static class StatField {
+ String parentPath;
+ Map values;
+ }
+}
diff --git a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaKernelTableExtractor.java b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaKernelTableExtractor.java
new file mode 100644
index 000000000..f1e4ed780
--- /dev/null
+++ b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaKernelTableExtractor.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.xtable.delta;
+
+import java.time.Instant;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import lombok.Builder;
+
+import io.delta.kernel.*;
+import io.delta.kernel.engine.Engine;
+import io.delta.kernel.types.StructField;
+import io.delta.kernel.types.StructType;
+
+import org.apache.xtable.model.InternalTable;
+import org.apache.xtable.model.schema.InternalPartitionField;
+import org.apache.xtable.model.schema.InternalSchema;
+import org.apache.xtable.model.storage.DataLayoutStrategy;
+import org.apache.xtable.model.storage.TableFormat;
+
+/**
+ * Extracts {@link InternalTable} canonical representation of a table at a point in time for Delta.
+ */
+@Builder
+public class DeltaKernelTableExtractor {
+ @Builder.Default
+ private static final DeltaKernelSchemaExtractor schemaExtractor =
+ DeltaKernelSchemaExtractor.getInstance();
+
+ private final String basePath;
+
+ public InternalTable table(
+ Table deltaKernelTable, Snapshot snapshot, Engine engine, String tableName, String basePath) {
+ try {
+ // Get schema from Delta Kernel's snapshot
+ io.delta.kernel.types.StructType schema = snapshot.getSchema();
+ InternalSchema internalSchema = schemaExtractor.toInternalSchema(schema);
+ // Get partition columns);
+ StructType fullSchema = snapshot.getSchema(); // The full table schema
+ List partitionColumns = snapshot.getPartitionColumnNames(); // List
+
+ List partitionFields_strfld =
+ fullSchema.fields().stream()
+ .filter(field -> partitionColumns.contains(field.getName()))
+ .collect(Collectors.toList());
+
+ StructType partitionSchema = new StructType(partitionFields_strfld);
+
+ List partitionFields =
+ DeltaKernelPartitionExtractor.getInstance()
+ .convertFromDeltaPartitionFormat(internalSchema, partitionSchema);
+
+ DataLayoutStrategy dataLayoutStrategy =
+ !partitionFields.isEmpty()
+ ? DataLayoutStrategy.HIVE_STYLE_PARTITION
+ : DataLayoutStrategy.FLAT;
+
+ // Get the timestamp
+ long timestamp = snapshot.getTimestamp(engine) * 1000; // Convert to milliseconds
+ return InternalTable.builder()
+ .tableFormat(TableFormat.DELTA)
+ .basePath(basePath)
+ .name(tableName)
+ .layoutStrategy(dataLayoutStrategy)
+ .partitioningFields(partitionFields)
+ .readSchema(internalSchema)
+ .latestCommitTime(Instant.ofEpochMilli(timestamp))
+ .latestMetadataPath(basePath + "/_delta_log")
+ .build();
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to extract table information using Delta Kernel", e);
+ }
+ }
+}
diff --git a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaSchemaExtractor.java b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaSchemaExtractor.java
index 1376f884e..3b770adf0 100644
--- a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaSchemaExtractor.java
+++ b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaSchemaExtractor.java
@@ -18,11 +18,7 @@
package org.apache.xtable.delta;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
@@ -41,22 +37,10 @@
import org.apache.xtable.model.schema.InternalType;
import org.apache.xtable.schema.SchemaUtils;
-/**
- * Converts between Delta and InternalTable schemas. Some items to be aware of:
- *
- *
- * - Delta schemas are represented as Spark StructTypes which do not have enums so the enum
- * types are lost when converting from XTable to Delta Lake representations
- *
- Delta does not have a fixed length byte array option so {@link InternalType#FIXED} is
- * simply translated to a {@link org.apache.spark.sql.types.BinaryType}
- *
- Similarly, {@link InternalType#TIMESTAMP_NTZ} is translated to a long in Delta Lake
- *
- */
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class DeltaSchemaExtractor {
private static final String DELTA_COLUMN_MAPPING_ID = "delta.columnMapping.id";
private static final DeltaSchemaExtractor INSTANCE = new DeltaSchemaExtractor();
- // Timestamps in Delta are microsecond precision by default
private static final Map
DEFAULT_TIMESTAMP_PRECISION_METADATA =
Collections.singletonMap(
diff --git a/xtable-core/src/main/java/org/apache/xtable/kernel/DeltaKernelConversionSource.java b/xtable-core/src/main/java/org/apache/xtable/kernel/DeltaKernelConversionSource.java
new file mode 100644
index 000000000..aa63cc581
--- /dev/null
+++ b/xtable-core/src/main/java/org/apache/xtable/kernel/DeltaKernelConversionSource.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.xtable.kernel;
+
+import java.io.IOException;
+import java.sql.Timestamp;
+import java.time.Instant;
+import java.util.*;
+
+import lombok.Builder;
+
+import org.apache.hadoop.conf.Configuration;
+
+import io.delta.kernel.Snapshot;
+import io.delta.kernel.Table;
+import io.delta.kernel.data.Row;
+import io.delta.kernel.defaults.engine.DefaultEngine;
+import io.delta.kernel.engine.Engine;
+import io.delta.kernel.internal.DeltaLogActionUtils;
+import io.delta.kernel.internal.InternalScanFileUtils;
+import io.delta.kernel.internal.SnapshotImpl;
+import io.delta.kernel.internal.actions.*;
+import io.delta.kernel.internal.actions.SingleAction;
+import io.delta.kernel.internal.fs.Path;
+import io.delta.kernel.internal.replay.ActionsIterator;
+import io.delta.kernel.internal.util.FileNames;
+import io.delta.kernel.types.StructType;
+import io.delta.kernel.utils.CloseableIterator;
+import io.delta.kernel.utils.FileStatus;
+
+import org.apache.xtable.delta.*;
+import org.apache.xtable.exception.ReadException;
+import org.apache.xtable.model.*;
+import org.apache.xtable.model.schema.InternalSchema;
+import org.apache.xtable.model.storage.FileFormat;
+import org.apache.xtable.model.storage.InternalDataFile;
+import org.apache.xtable.model.storage.InternalFilesDiff;
+import org.apache.xtable.model.storage.PartitionFileGroup;
+import org.apache.xtable.spi.extractor.ConversionSource;
+import org.apache.xtable.spi.extractor.DataFileIterator;
+
+@Builder
+public class DeltaKernelConversionSource implements ConversionSource {
+
+ @Builder.Default
+ private final DeltaKernelDataFileExtractor dataFileExtractor =
+ DeltaKernelDataFileExtractor.builder().build();
+
+ @Builder.Default
+ private final DeltaKernelActionsConverter actionsConverter =
+ DeltaKernelActionsConverter.getInstance();
+
+ private final String basePath;
+ private final String tableName;
+ private final Engine engine;
+
+ private final StructType actionSchema = SingleAction.FULL_SCHEMA;
+ // private final DeltaKernelTableExtractor tableExtractor;
+
+ @Builder.Default
+ private final DeltaKernelTableExtractor tableExtractor =
+ DeltaKernelTableExtractor.builder().build();
+
+ private Optional deltaIncrementalChangesState = Optional.empty();
+
+ @Override
+ public InternalTable getTable(Long version) {
+ Configuration hadoopConf = new Configuration();
+ try {
+ Engine engine = DefaultEngine.create(hadoopConf);
+ Table table = Table.forPath(engine, basePath);
+ Snapshot snapshot = table.getSnapshotAsOfVersion(engine, version);
+ return tableExtractor.table(table, snapshot, engine, tableName, basePath);
+ } catch (Exception e) {
+ throw new ReadException("Failed to get table at version " + version, e);
+ }
+ }
+
+ @Override
+ public InternalTable getCurrentTable() {
+ Configuration hadoopConf = new Configuration();
+ Engine engine = DefaultEngine.create(hadoopConf);
+ Table table = Table.forPath(engine, basePath);
+ Snapshot snapshot = table.getLatestSnapshot(engine);
+ return getTable(snapshot.getVersion());
+ }
+
+ @Override
+ public InternalSnapshot getCurrentSnapshot() {
+ Configuration hadoopConf = new Configuration();
+ Engine engine = DefaultEngine.create(hadoopConf);
+ Table table_snapshot = Table.forPath(engine, basePath);
+ Snapshot snapshot = table_snapshot.getLatestSnapshot(engine);
+ InternalTable table = getTable(snapshot.getVersion());
+ return InternalSnapshot.builder()
+ .table(table)
+ .partitionedDataFiles(
+ getInternalDataFiles(snapshot, table_snapshot, engine, table.getReadSchema()))
+ .sourceIdentifier(getCommitIdentifier(snapshot.getVersion()))
+ .build();
+ }
+
+ @Override
+ public TableChange getTableChangeForCommit(Long versionNumber) {
+ Configuration hadoopConf = new Configuration();
+ Engine engine = DefaultEngine.create(hadoopConf);
+ Table table = Table.forPath(engine, basePath);
+ Snapshot snapshot = table.getSnapshotAsOfVersion(engine, versionNumber);
+ InternalTable tableAtVersion =
+ tableExtractor.table(table, snapshot, engine, tableName, basePath);
+ Map addedFiles = new HashMap<>();
+ String provider = ((SnapshotImpl) snapshot).getMetadata().getFormat().getProvider();
+ FileFormat fileFormat = actionsConverter.convertToFileFormat(provider);
+ List files =
+ DeltaLogActionUtils.listDeltaLogFilesAsIter(
+ engine,
+ Collections.singleton(FileNames.DeltaLogFileType.COMMIT),
+ new Path(basePath),
+ versionNumber,
+ Optional.of(versionNumber),
+ false)
+ .toInMemoryList();
+
+ List actions = new ArrayList<>();
+ ActionsIterator actionsIterator =
+ new ActionsIterator(engine, files, actionSchema, Optional.empty());
+ while (actionsIterator.hasNext()) {
+ // Each ActionWrapper may wrap a batch of rows (actions)
+ CloseableIterator scanFileRows = actionsIterator.next().getColumnarBatch().getRows();
+ while (scanFileRows.hasNext()) {
+ Row scanFileRow = scanFileRows.next();
+ if (scanFileRow instanceof AddFile) {
+ Map partitionValues =
+ InternalScanFileUtils.getPartitionValues(scanFileRow);
+ // List actionsForVersion =
+ // getChangesState().getActionsForVersion(versionNumber);
+ InternalDataFile dataFile =
+ actionsConverter.convertAddActionToInternalDataFile(
+ (AddFile) scanFileRow,
+ table,
+ fileFormat,
+ tableAtVersion.getPartitioningFields(),
+ tableAtVersion.getReadSchema().getFields(),
+ true,
+ DeltaKernelPartitionExtractor.getInstance(),
+ DeltaKernelStatsExtractor.getInstance(),
+ partitionValues);
+ addedFiles.put(dataFile.getPhysicalPath(), dataFile);
+ }
+ }
+ }
+
+ InternalFilesDiff internalFilesDiff =
+ InternalFilesDiff.builder().filesAdded(addedFiles.values()).build();
+ return TableChange.builder()
+ .tableAsOfChange(tableAtVersion)
+ .filesDiff(internalFilesDiff)
+ .sourceIdentifier(getCommitIdentifier(versionNumber))
+ .build();
+ }
+
+ @Override
+ public CommitsBacklog getCommitsBacklog(
+ InstantsForIncrementalSync instantsForIncrementalSync) {
+ Configuration hadoopConf = new Configuration();
+ Engine engine = DefaultEngine.create(hadoopConf);
+ Table table = Table.forPath(engine, basePath);
+ Snapshot snapshot =
+ table.getSnapshotAsOfTimestamp(
+ engine, Timestamp.from(instantsForIncrementalSync.getLastSyncInstant()).getTime());
+
+ long versionNumberAtLastSyncInstant = snapshot.getVersion();
+ System.out.println("versionNumberAtLastSyncInstant: " + versionNumberAtLastSyncInstant);
+ // resetState(versionNumberAtLastSyncInstant + 1);
+ return CommitsBacklog.builder()
+ .commitsToProcess(getChangesState().getVersionsInSortedOrder())
+ .build();
+ }
+
+ @Override
+ public boolean isIncrementalSyncSafeFrom(Instant instant) {
+ Configuration hadoopConf = new Configuration();
+ Engine engine = DefaultEngine.create(hadoopConf);
+ Table table = Table.forPath(engine, basePath);
+ Snapshot snapshot = table.getSnapshotAsOfTimestamp(engine, Timestamp.from(instant).getTime());
+
+ // There is a chance earliest commit of the table is returned if the instant is before the
+ // earliest commit of the table, hence the additional check.
+ Instant deltaCommitInstant = Instant.ofEpochMilli(snapshot.getTimestamp(engine));
+ return deltaCommitInstant.equals(instant) || deltaCommitInstant.isBefore(instant);
+ }
+
+ @Override
+ public String getCommitIdentifier(Long commit) {
+ return String.valueOf(commit);
+ }
+
+ // private void resetState(long versionToStartFrom) {
+ // deltaIncrementalChangesState =
+ // Optional.of(
+ // DeltaIncrementalChangesState.builder()
+ // .deltaLog(deltaLog)
+ // .versionToStartFrom(versionToStartFrom)
+ // .build());
+ // }
+
+ private List getInternalDataFiles(
+ io.delta.kernel.Snapshot snapshot, Table table, Engine engine, InternalSchema schema) {
+ try (DataFileIterator fileIterator =
+ dataFileExtractor.iterator(snapshot, table, engine, schema)) {
+
+ List dataFiles = new ArrayList<>();
+ fileIterator.forEachRemaining(dataFiles::add);
+ return PartitionFileGroup.fromFiles(dataFiles);
+ } catch (Exception e) {
+ throw new ReadException("Failed to iterate through Delta data files", e);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {}
+
+ private DeltaIncrementalChangesState getChangesState() {
+ return deltaIncrementalChangesState.orElseThrow(
+ () -> new IllegalStateException("DeltaIncrementalChangesState is not initialized"));
+ }
+}
diff --git a/xtable-core/src/test/java/org/apache/xtable/DeltaTableKernel.java b/xtable-core/src/test/java/org/apache/xtable/DeltaTableKernel.java
new file mode 100644
index 000000000..050d12e64
--- /dev/null
+++ b/xtable-core/src/test/java/org/apache/xtable/DeltaTableKernel.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.xtable;
+
+// import org.junit.jupiter.api.Test;
+//
+import static io.delta.kernel.internal.util.Utils.singletonCloseableIterator;
+
+import java.io.IOException;
+import java.util.Optional;
+
+import org.apache.hadoop.conf.Configuration;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.delta.kernel.*;
+import io.delta.kernel.data.ColumnVector;
+import io.delta.kernel.data.ColumnarBatch;
+import io.delta.kernel.data.FilteredColumnarBatch;
+import io.delta.kernel.data.Row;
+import io.delta.kernel.defaults.*;
+import io.delta.kernel.defaults.engine.DefaultEngine;
+import io.delta.kernel.engine.Engine;
+import io.delta.kernel.internal.InternalScanFileUtils;
+import io.delta.kernel.internal.data.ScanStateRow;
+import io.delta.kernel.types.StructType;
+import io.delta.kernel.utils.CloseableIterator;
+import io.delta.kernel.utils.FileStatus;
+
+public class DeltaTableKernel {
+ private static final Logger logger = LoggerFactory.getLogger(DeltaTableKernel.class);
+
+ @Test
+ public void readDeltaKernel() throws IOException {
+ String myTablePath =
+ "/Users/vaibhakumar/Desktop/opensource/iceberg/warehouse/demo/nyc/taxis"; // fully qualified
+ Configuration hadoopConf = new Configuration();
+ Engine myEngine = DefaultEngine.create(hadoopConf);
+ Table myTable = Table.forPath(myEngine, myTablePath);
+ Snapshot mySnapshot = myTable.getLatestSnapshot(myEngine);
+ long version = mySnapshot.getVersion();
+ StructType tableSchema = mySnapshot.getSchema();
+ Scan myScan = mySnapshot.getScanBuilder().build();
+
+ // Common information about scanning for all data files to read.
+ Row scanState = myScan.getScanState(myEngine);
+
+ // Information about the list of scan files to read
+ CloseableIterator fileIter = myScan.getScanFiles(myEngine);
+ int readRecordCount = 0;
+ try {
+ StructType physicalReadSchema = ScanStateRow.getPhysicalDataReadSchema(myEngine, scanState);
+ while (fileIter.hasNext()) {
+ FilteredColumnarBatch scanFilesBatch = fileIter.next();
+ try (CloseableIterator scanFileRows = scanFilesBatch.getRows()) {
+ while (scanFileRows.hasNext()) {
+ Row scanFileRow = scanFileRows.next();
+ FileStatus fileStatus = InternalScanFileUtils.getAddFileStatus(scanFileRow);
+ CloseableIterator physicalDataIter =
+ myEngine
+ .getParquetHandler()
+ .readParquetFiles(
+ singletonCloseableIterator(fileStatus),
+ physicalReadSchema,
+ Optional.empty());
+ try (CloseableIterator transformedData =
+ Scan.transformPhysicalData(myEngine, scanState, scanFileRow, physicalDataIter)) {
+ while (transformedData.hasNext()) {
+ FilteredColumnarBatch logicalData = transformedData.next();
+ ColumnarBatch dataBatch = logicalData.getData();
+
+ // access the data for the column at ordinal 0
+ ColumnVector column0 = dataBatch.getColumnVector(0);
+ ColumnVector column1 = dataBatch.getColumnVector(1);
+ ColumnVector column2 = dataBatch.getColumnVector(2);
+ ColumnVector column3 = dataBatch.getColumnVector(3);
+
+ for (int rowIndex = 0; rowIndex < column0.getSize(); rowIndex++) {
+ System.out.println(column0.getInt(rowIndex));
+ }
+ for (int rowIndex = 0; rowIndex < column1.getSize(); rowIndex++) {
+ System.out.println(column1.getString(rowIndex));
+ }
+ }
+ }
+ }
+ }
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ System.out.println("IOException occurred: " + e.getMessage());
+ }
+ }
+}
diff --git a/xtable-core/src/test/java/org/apache/xtable/delta/ITDeltaKernelConversionSource.java b/xtable-core/src/test/java/org/apache/xtable/delta/ITDeltaKernelConversionSource.java
new file mode 100644
index 000000000..13ac7a059
--- /dev/null
+++ b/xtable-core/src/test/java/org/apache/xtable/delta/ITDeltaKernelConversionSource.java
@@ -0,0 +1,448 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.xtable.delta;
+
+import static org.apache.xtable.testutil.ITTestUtils.validateTable;
+import static org.junit.jupiter.api.Assertions.*;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Stream;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.spark.serializer.KryoSerializer;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.junit.jupiter.api.*;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import io.delta.kernel.*;
+
+import org.apache.xtable.GenericTable;
+import org.apache.xtable.TestSparkDeltaTable;
+import org.apache.xtable.ValidationTestHelper;
+import org.apache.xtable.conversion.SourceTable;
+import org.apache.xtable.kernel.DeltaKernelConversionSource;
+import org.apache.xtable.model.*;
+import org.apache.xtable.model.schema.*;
+import org.apache.xtable.model.stat.ColumnStat;
+import org.apache.xtable.model.stat.PartitionValue;
+import org.apache.xtable.model.stat.Range;
+import org.apache.xtable.model.storage.*;
+import org.apache.xtable.model.storage.DataLayoutStrategy;
+import org.apache.xtable.model.storage.TableFormat;
+
+public class ITDeltaKernelConversionSource {
+ private static final InternalField COL1_INT_FIELD =
+ InternalField.builder()
+ .name("col1")
+ .schema(
+ InternalSchema.builder()
+ .name("integer")
+ .dataType(InternalType.INT)
+ .isNullable(true)
+ .build())
+ .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE)
+ .build();
+
+ private static final InternalField COL2_INT_FIELD =
+ InternalField.builder()
+ .name("col2")
+ .schema(
+ InternalSchema.builder()
+ .name("integer")
+ .dataType(InternalType.INT)
+ .isNullable(true)
+ .build())
+ .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE)
+ .build();
+
+ private static final InternalField COL3_STR_FIELD =
+ InternalField.builder()
+ .name("col3")
+ .schema(
+ InternalSchema.builder()
+ .name("integer")
+ .dataType(InternalType.INT)
+ .isNullable(true)
+ .build())
+ .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE)
+ .build();
+ private static final ColumnStat COL2_COLUMN_STAT =
+ ColumnStat.builder()
+ .field(COL2_INT_FIELD)
+ .range(Range.vector(2, 2))
+ .numNulls(0)
+ .numValues(1)
+ .totalSize(0)
+ .build();
+ private static final ColumnStat COL1_COLUMN_STAT =
+ ColumnStat.builder()
+ .field(COL1_INT_FIELD)
+ .range(Range.vector(1, 1))
+ .numNulls(0)
+ .numValues(1)
+ .totalSize(0)
+ .build();
+
+ private DeltaKernelConversionSourceProvider conversionSourceProvider;
+ private static SparkSession sparkSession;
+
+ @BeforeAll
+ public static void setupOnce() {
+ sparkSession =
+ SparkSession.builder()
+ .appName("TestDeltaTable")
+ .master("local[4]")
+ .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
+ .config(
+ "spark.sql.catalog.spark_catalog",
+ "org.apache.spark.sql.delta.catalog.DeltaCatalog")
+ .config("spark.databricks.delta.retentionDurationCheck.enabled", "false")
+ .config("spark.databricks.delta.schema.autoMerge.enabled", "true")
+ .config("spark.sql.shuffle.partitions", "1")
+ .config("spark.default.parallelism", "1")
+ .config("spark.serializer", KryoSerializer.class.getName())
+ .getOrCreate();
+ }
+
+ @TempDir private static Path tempDir;
+
+ @AfterAll
+ public static void tearDownSparkSession() {
+ if (sparkSession != null) {
+ sparkSession.catalog().clearCache();
+ sparkSession.stop();
+ }
+ }
+
+ @BeforeEach
+ void setUp() {
+ Configuration hadoopConf = new Configuration();
+ hadoopConf.set("fs.defaultFS", "file:///");
+
+ conversionSourceProvider = new DeltaKernelConversionSourceProvider();
+ conversionSourceProvider.init(hadoopConf);
+ }
+
+ @Test
+ void getCurrentSnapshotNonPartitionedTest() throws URISyntaxException {
+ // Table name
+ final String tableName = GenericTable.getTableName();
+ final Path basePath = tempDir.resolve(tableName);
+ // Create table with a single row using Spark
+ sparkSession.sql(
+ "CREATE TABLE `"
+ + tableName
+ + "` USING DELTA LOCATION '"
+ + basePath
+ + "' AS SELECT * FROM VALUES (1, 2)");
+ // Create Delta source
+ SourceTable tableConfig =
+ SourceTable.builder()
+ .name(tableName)
+ .basePath(basePath.toString())
+ .formatName(TableFormat.DELTA)
+ .build();
+ DeltaKernelConversionSource conversionSource =
+ conversionSourceProvider.getConversionSourceInstance(tableConfig);
+ // Get current snapshot
+ InternalSnapshot snapshot = conversionSource.getCurrentSnapshot();
+ // Validate table
+ List fields = Arrays.asList(COL1_INT_FIELD, COL2_INT_FIELD);
+ validateTable(
+ snapshot.getTable(),
+ tableName,
+ TableFormat.DELTA,
+ InternalSchema.builder()
+ .name("struct")
+ .dataType(InternalType.RECORD)
+ .fields(fields)
+ .build(),
+ DataLayoutStrategy.FLAT,
+ "file://" + basePath,
+ snapshot.getTable().getLatestMetadataPath(),
+ Collections.emptyList());
+ // Validate data files
+ List columnStats = Arrays.asList(COL1_COLUMN_STAT, COL2_COLUMN_STAT);
+ Assertions.assertEquals(1, snapshot.getPartitionedDataFiles().size());
+
+ validatePartitionDataFiles(
+ PartitionFileGroup.builder()
+ .files(
+ Collections.singletonList(
+ InternalDataFile.builder()
+ .physicalPath("file:/fake/path")
+ .fileFormat(FileFormat.APACHE_PARQUET)
+ .partitionValues(Collections.emptyList())
+ .fileSizeBytes(716)
+ .recordCount(1)
+ .columnStats(columnStats)
+ .build()))
+ .partitionValues(Collections.emptyList())
+ .build(),
+ snapshot.getPartitionedDataFiles().get(0));
+ }
+
+ @Test
+ void getCurrentTableTest() {
+ // Table name
+ final String tableName = GenericTable.getTableName();
+ final Path basePath = tempDir.resolve(tableName);
+ ;
+ // Create table with a single row using Spark
+ sparkSession.sql(
+ "CREATE TABLE `"
+ + tableName
+ + "` USING DELTA LOCATION '"
+ + basePath
+ + "' AS SELECT * FROM VALUES (1, 2, 3)");
+ // Create Delta source
+ SourceTable tableConfig =
+ SourceTable.builder()
+ .name(tableName)
+ .basePath(basePath.toString())
+ .formatName(TableFormat.DELTA)
+ .build();
+ DeltaKernelConversionSource conversionSource =
+ conversionSourceProvider.getConversionSourceInstance(tableConfig);
+ // Get current table
+ InternalTable internalTable = conversionSource.getCurrentTable();
+ List fields = Arrays.asList(COL1_INT_FIELD, COL2_INT_FIELD, COL3_STR_FIELD);
+ validateTable(
+ internalTable,
+ tableName,
+ TableFormat.DELTA,
+ InternalSchema.builder()
+ .name("struct")
+ .dataType(InternalType.RECORD)
+ .fields(fields)
+ .build(),
+ DataLayoutStrategy.FLAT,
+ "file://" + basePath,
+ internalTable.getLatestMetadataPath(),
+ Collections.emptyList());
+ }
+
+ @Test
+ void getCurrentSnapshotPartitionedTest() throws URISyntaxException {
+ // Table name
+ final String tableName = GenericTable.getTableName();
+ final Path basePath = tempDir.resolve(tableName);
+ // Create table with a single row using Spark
+ sparkSession.sql(
+ "CREATE TABLE `"
+ + tableName
+ + "` USING DELTA PARTITIONED BY (part_col)\n"
+ + "LOCATION '"
+ + basePath
+ + "' AS SELECT 'SingleValue' AS part_col, 1 AS col1, 2 AS col2");
+ // Create Delta source
+ SourceTable tableConfig =
+ SourceTable.builder()
+ .name(tableName)
+ .basePath(basePath.toString())
+ .formatName(TableFormat.DELTA)
+ .build();
+ DeltaKernelConversionSource conversionSource =
+ conversionSourceProvider.getConversionSourceInstance(tableConfig);
+ // Get current snapshot
+ InternalSnapshot snapshot = conversionSource.getCurrentSnapshot();
+ // Validate table
+ InternalField partCol =
+ InternalField.builder()
+ .name("part_col")
+ .schema(
+ InternalSchema.builder()
+ .name("string")
+ .dataType(InternalType.STRING)
+ .isNullable(true)
+ .build())
+ .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE)
+ .build();
+ List fields = Arrays.asList(partCol, COL1_INT_FIELD, COL2_INT_FIELD);
+ validateTable(
+ snapshot.getTable(),
+ tableName,
+ TableFormat.DELTA,
+ InternalSchema.builder()
+ .name("struct")
+ .dataType(InternalType.RECORD)
+ .fields(fields)
+ .build(),
+ DataLayoutStrategy.HIVE_STYLE_PARTITION,
+ "file://" + basePath,
+ snapshot.getTable().getLatestMetadataPath(),
+ Collections.singletonList(
+ InternalPartitionField.builder()
+ .sourceField(partCol)
+ .transformType(PartitionTransformType.VALUE)
+ .build()));
+ // Validate data files
+ List columnStats = Arrays.asList(COL1_COLUMN_STAT, COL2_COLUMN_STAT);
+ Assertions.assertEquals(1, snapshot.getPartitionedDataFiles().size());
+ List partitionValue =
+ Collections.singletonList(
+ PartitionValue.builder()
+ .partitionField(
+ InternalPartitionField.builder()
+ .sourceField(partCol)
+ .transformType(PartitionTransformType.VALUE)
+ .build())
+ .range(Range.scalar("SingleValue"))
+ .build());
+ validatePartitionDataFiles(
+ PartitionFileGroup.builder()
+ .partitionValues(partitionValue)
+ .files(
+ Collections.singletonList(
+ InternalDataFile.builder()
+ .physicalPath("file:/fake/path")
+ .fileFormat(FileFormat.APACHE_PARQUET)
+ .partitionValues(partitionValue)
+ .fileSizeBytes(716)
+ .recordCount(1)
+ .columnStats(columnStats)
+ .build()))
+ .build(),
+ snapshot.getPartitionedDataFiles().get(0));
+ }
+
+ @ParameterizedTest
+ @MethodSource("testWithPartitionToggle")
+ public void testInsertsUpsertsAndDeletes(boolean isPartitioned) {
+ String tableName = GenericTable.getTableName();
+ TestSparkDeltaTable testSparkDeltaTable =
+ new TestSparkDeltaTable(
+ tableName, tempDir, sparkSession, isPartitioned ? "yearOfBirth" : null, false);
+ // System.out.println("testSparkDeltaTable" + testSparkDeltaTable.getColumnsToSelect());
+ List> allActiveFiles = new ArrayList<>();
+ List allTableChanges = new ArrayList<>();
+ List rows = testSparkDeltaTable.insertRows(50);
+ Long timestamp1 = testSparkDeltaTable.getLastCommitTimestamp();
+ allActiveFiles.add(testSparkDeltaTable.getAllActiveFiles());
+
+ testSparkDeltaTable.insertRows(50);
+ allActiveFiles.add(testSparkDeltaTable.getAllActiveFiles());
+
+ testSparkDeltaTable.upsertRows(rows.subList(0, 20));
+ allActiveFiles.add(testSparkDeltaTable.getAllActiveFiles());
+
+ testSparkDeltaTable.insertRows(50);
+ allActiveFiles.add(testSparkDeltaTable.getAllActiveFiles());
+
+ testSparkDeltaTable.insertRows(50);
+ allActiveFiles.add(testSparkDeltaTable.getAllActiveFiles());
+ SourceTable tableConfig =
+ SourceTable.builder()
+ .name(testSparkDeltaTable.getTableName())
+ .basePath(testSparkDeltaTable.getBasePath())
+ .formatName(TableFormat.DELTA)
+ .build();
+ DeltaKernelConversionSource conversionSource =
+ conversionSourceProvider.getConversionSourceInstance(tableConfig);
+ assertEquals(200L, testSparkDeltaTable.getNumRows());
+ InternalSnapshot internalSnapshot = conversionSource.getCurrentSnapshot();
+
+ if (isPartitioned) {
+ validateDeltaPartitioning(internalSnapshot);
+ }
+ ValidationTestHelper.validateSnapshot(
+ internalSnapshot, allActiveFiles.get(allActiveFiles.size() - 1));
+ // Get changes in incremental format.
+ InstantsForIncrementalSync instantsForIncrementalSync =
+ InstantsForIncrementalSync.builder()
+ .lastSyncInstant(Instant.ofEpochMilli(timestamp1))
+ .build();
+ // CommitsBacklog commitsBacklog =
+ // conversionSource.getCommitsBacklog(instantsForIncrementalSync);
+ // for (Long version : commitsBacklog.getCommitsToProcess()) {
+ // TableChange tableChange = conversionSource.getTableChangeForCommit(version);
+ // allTableChanges.add(tableChange);
+ // }
+ // ValidationTestHelper.validateTableChanges(allActiveFiles, allTableChanges);
+ }
+
+ private void validateDeltaPartitioning(InternalSnapshot internalSnapshot) {
+ List partitionFields =
+ internalSnapshot.getTable().getPartitioningFields();
+ assertEquals(1, partitionFields.size());
+ InternalPartitionField partitionField = partitionFields.get(0);
+ assertEquals("birthDate", partitionField.getSourceField().getName());
+ assertEquals(PartitionTransformType.YEAR, partitionField.getTransformType());
+ }
+
+ private void validatePartitionDataFiles(
+ PartitionFileGroup expectedPartitionFiles, PartitionFileGroup actualPartitionFiles)
+ throws URISyntaxException {
+ assertEquals(
+ expectedPartitionFiles.getPartitionValues(), actualPartitionFiles.getPartitionValues());
+ validateDataFiles(expectedPartitionFiles.getDataFiles(), actualPartitionFiles.getDataFiles());
+ }
+
+ private void validateDataFiles(
+ List expectedFiles, List actualFiles)
+ throws URISyntaxException {
+ Assertions.assertEquals(expectedFiles.size(), actualFiles.size());
+ for (int i = 0; i < expectedFiles.size(); i++) {
+ InternalDataFile expected = expectedFiles.get(i);
+ InternalDataFile actual = actualFiles.get(i);
+ validatePropertiesDataFile(expected, actual);
+ }
+ }
+
+ private static Stream testWithPartitionToggle() {
+ return Stream.of(Arguments.of(false), Arguments.of(true));
+ }
+
+ private void validatePropertiesDataFile(InternalDataFile expected, InternalDataFile actual)
+ throws URISyntaxException {
+ Assertions.assertTrue(
+ Paths.get(new URI(actual.getPhysicalPath()).getPath()).isAbsolute(),
+ () -> "path == " + actual.getPhysicalPath() + " is not absolute");
+ Assertions.assertEquals(expected.getFileFormat(), actual.getFileFormat());
+ Assertions.assertEquals(expected.getPartitionValues(), actual.getPartitionValues());
+ Assertions.assertEquals(expected.getFileSizeBytes(), actual.getFileSizeBytes());
+ Assertions.assertEquals(expected.getRecordCount(), actual.getRecordCount());
+ Instant now = Instant.now();
+ long minRange = now.minus(1, ChronoUnit.HOURS).toEpochMilli();
+ long maxRange = now.toEpochMilli();
+ Assertions.assertTrue(
+ actual.getLastModified() > minRange && actual.getLastModified() <= maxRange,
+ () ->
+ "last modified == "
+ + actual.getLastModified()
+ + " is expected between "
+ + minRange
+ + " and "
+ + maxRange);
+ Assertions.assertEquals(expected.getColumnStats(), actual.getColumnStats());
+ }
+}
diff --git a/xtable-utilities/src/test/resources/my_config.yaml b/xtable-utilities/src/test/resources/my_config.yaml
index 1416c04c2..f0594eb9f 100644
--- a/xtable-utilities/src/test/resources/my_config.yaml
+++ b/xtable-utilities/src/test/resources/my_config.yaml
@@ -19,6 +19,6 @@ targetFormats:
- DELTA
datasets:
-
- tableBasePath: /Desktop/opensource/iceberg/warehouse/demo/nyc/taxis
- tableDataPath: /Desktop/opensource/iceberg/warehouse/demo/nyc/taxis/data
+ tableBasePath: /Users/vaibhakumar/Desktop/opensource/iceberg/warehouse/demo/nyc/taxis
+ tableDataPath: /Users/vaibhakumar/Desktop/opensource/iceberg/warehouse/demo/nyc/taxis/data
tableName: taxis
\ No newline at end of file