diff --git a/pom.xml b/pom.xml index bed4d63b4..4c313f4c5 100644 --- a/pom.xml +++ b/pom.xml @@ -53,7 +53,7 @@ xtable-utilities xtable-aws xtable-hive-metastore - xtable-service + @@ -713,7 +713,7 @@ ${skipUTs} - true + false false 120 diff --git a/xtable-core/pom.xml b/xtable-core/pom.xml index 6bd5282c7..c8675e341 100644 --- a/xtable-core/pom.xml +++ b/xtable-core/pom.xml @@ -110,6 +110,19 @@ test + + io.delta + delta-kernel-api + 4.0.0 + + + + io.delta + delta-kernel-defaults + 4.0.0 + + + org.apache.hadoop diff --git a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaKernelActionsConverter.java b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaKernelActionsConverter.java new file mode 100644 index 000000000..6531ebb6e --- /dev/null +++ b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaKernelActionsConverter.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.delta; + +import static org.apache.xtable.delta.DeltaActionsConverter.getFullPathToFile; + +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import lombok.AccessLevel; +import lombok.NoArgsConstructor; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; + +import scala.collection.JavaConverters; + +import io.delta.kernel.Table; +import io.delta.kernel.defaults.engine.DefaultEngine; +import io.delta.kernel.engine.Engine; +import io.delta.kernel.internal.actions.AddFile; +import io.delta.kernel.types.*; + +import org.apache.xtable.exception.NotSupportedException; +import org.apache.xtable.model.schema.InternalField; +import org.apache.xtable.model.schema.InternalPartitionField; +import org.apache.xtable.model.stat.ColumnStat; +import org.apache.xtable.model.stat.FileStats; +import org.apache.xtable.model.storage.FileFormat; +import org.apache.xtable.model.storage.InternalDataFile; + +@NoArgsConstructor(access = AccessLevel.PRIVATE) +public class DeltaKernelActionsConverter { + private static final DeltaKernelActionsConverter INSTANCE = new DeltaKernelActionsConverter(); + + public static DeltaKernelActionsConverter getInstance() { + return INSTANCE; + } + + public InternalDataFile convertAddActionToInternalDataFile( + AddFile addFile, + Table table, + FileFormat fileFormat, + List partitionFields, + List fields, + boolean includeColumnStats, + DeltaKernelPartitionExtractor partitionExtractor, + DeltaKernelStatsExtractor fileStatsExtractor, + Map partitionValues) { + FileStats fileStats = fileStatsExtractor.getColumnStatsForFile(addFile, fields); + List columnStats = + includeColumnStats ? fileStats.getColumnStats() : Collections.emptyList(); + long recordCount = fileStats.getNumRecords(); + // The immutable map from Java to Scala is not working, need to + + scala.collection.mutable.Map scalaMap = + JavaConverters.mapAsScalaMap(partitionValues); + + return InternalDataFile.builder() + .physicalPath(getFullPathToFile(addFile.getPath(), table)) + .fileFormat(fileFormat) + .fileSizeBytes(addFile.getSize()) + .lastModified(addFile.getModificationTime()) + .partitionValues(partitionExtractor.partitionValueExtraction(scalaMap, partitionFields)) + .columnStats(columnStats) + .recordCount(recordCount) + .build(); + } + + public FileFormat convertToFileFormat(String provider) { + if (provider.equals("parquet")) { + return FileFormat.APACHE_PARQUET; + } else if (provider.equals("orc")) { + return FileFormat.APACHE_ORC; + } + throw new NotSupportedException( + String.format("delta file format %s is not recognized", provider)); + } + + static String getFullPathToFile(String dataFilePath, Table table) { + Configuration hadoopConf = new Configuration(); + Engine myEngine = DefaultEngine.create(hadoopConf); + String tableBasePath = table.getPath(myEngine); + ; + // String tableBasePath = snapshot.dataPath().toUri().toString(); + if (dataFilePath.startsWith(tableBasePath)) { + return dataFilePath; + } + return tableBasePath + Path.SEPARATOR + dataFilePath; + } +} diff --git a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaKernelConversionSourceProvider.java b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaKernelConversionSourceProvider.java new file mode 100644 index 000000000..c81353dac --- /dev/null +++ b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaKernelConversionSourceProvider.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.delta; + +import org.apache.hadoop.conf.Configuration; + +import io.delta.kernel.defaults.engine.DefaultEngine; +import io.delta.kernel.engine.Engine; + +import org.apache.xtable.conversion.ConversionSourceProvider; +import org.apache.xtable.conversion.SourceTable; +import org.apache.xtable.kernel.DeltaKernelConversionSource; + +public class DeltaKernelConversionSourceProvider extends ConversionSourceProvider { + @Override + public DeltaKernelConversionSource getConversionSourceInstance(SourceTable sourceTable) { + Configuration hadoopConf = new Configuration(); + Engine engine = DefaultEngine.create(hadoopConf); + // DeltaTable deltaTable = DeltaT/able.forPath(sourceTable.getBasePath()); + return DeltaKernelConversionSource.builder() + .tableName(sourceTable.getName()) + .basePath(sourceTable.getBasePath()) + .engine(engine) + .build(); + } +} diff --git a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaKernelDataFileExtractor.java b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaKernelDataFileExtractor.java new file mode 100644 index 000000000..ecc0c1276 --- /dev/null +++ b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaKernelDataFileExtractor.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.delta; + +// import scala.collection.Map; +import java.util.*; +import java.util.List; +import java.util.stream.Collectors; + +import lombok.Builder; + +import io.delta.kernel.Snapshot; +import io.delta.kernel.Table; +import io.delta.kernel.data.FilteredColumnarBatch; +import io.delta.kernel.data.Row; +import io.delta.kernel.engine.Engine; +import io.delta.kernel.internal.InternalScanFileUtils; +import io.delta.kernel.internal.ScanImpl; +import io.delta.kernel.internal.SnapshotImpl; +import io.delta.kernel.internal.actions.AddFile; +import io.delta.kernel.types.StructField; +import io.delta.kernel.types.StructType; +import io.delta.kernel.utils.CloseableIterator; + +import org.apache.xtable.model.schema.InternalField; +import org.apache.xtable.model.schema.InternalPartitionField; +import org.apache.xtable.model.schema.InternalSchema; +import org.apache.xtable.model.storage.FileFormat; +import org.apache.xtable.model.storage.InternalDataFile; +import org.apache.xtable.spi.extractor.DataFileIterator; + +/** DeltaDataFileExtractor lets the consumer iterate over partitions. */ +@Builder +public class DeltaKernelDataFileExtractor { + + @Builder.Default + private final DeltaKernelPartitionExtractor partitionExtractor = + DeltaKernelPartitionExtractor.getInstance(); + + @Builder.Default + private final DeltaKernelStatsExtractor fileStatsExtractor = + DeltaKernelStatsExtractor.getInstance(); + + @Builder.Default + private final DeltaKernelActionsConverter actionsConverter = + DeltaKernelActionsConverter.getInstance(); + + private final String basePath; + + /** + * Initializes an iterator for Delta Lake files. + * + * @return Delta table file iterator + */ + public DataFileIterator iterator( + Snapshot deltaSnapshot, Table table, Engine engine, InternalSchema schema) { + return new DeltaDataFileIterator(deltaSnapshot, table, engine, schema, true); + } + + public class DeltaDataFileIterator implements DataFileIterator { + private final FileFormat fileFormat; + private final List fields; + private final List partitionFields; + private Iterator dataFilesIterator = Collections.emptyIterator(); + + private DeltaDataFileIterator( + Snapshot snapshot, + Table table, + Engine engine, + InternalSchema schema, + boolean includeColumnStats) { + String provider = ((SnapshotImpl) snapshot).getMetadata().getFormat().getProvider(); + this.fileFormat = actionsConverter.convertToFileFormat(provider); + + this.fields = schema.getFields(); + + StructType fullSchema = snapshot.getSchema(); // The full table schema + List partitionColumns = snapshot.getPartitionColumnNames(); // List + + List partitionFields_strfld = + fullSchema.fields().stream() + .filter(field -> partitionColumns.contains(field.getName())) + .collect(Collectors.toList()); + + StructType partitionSchema = new StructType(partitionFields_strfld); + + this.partitionFields = + partitionExtractor.convertFromDeltaPartitionFormat(schema, partitionSchema); + + ScanImpl myScan = (ScanImpl) snapshot.getScanBuilder().build(); + CloseableIterator scanFiles = + myScan.getScanFiles(engine, includeColumnStats); + + List dataFiles = new ArrayList<>(); + this.dataFilesIterator = + Collections + .emptyIterator(); // Initialize the dataFilesIterator by iterating over the scan files + while (scanFiles.hasNext()) { + FilteredColumnarBatch scanFileColumnarBatch = scanFiles.next(); + CloseableIterator scanFileRows = scanFileColumnarBatch.getRows(); + while (scanFileRows.hasNext()) { + Row scanFileRow = scanFileRows.next(); + // From the scan file row, extract the file path, size and modification time metadata + // needed to read the file. + AddFile addFile = + new AddFile(scanFileRow.getStruct(scanFileRow.getSchema().indexOf("add"))); + Map partitionValues = + InternalScanFileUtils.getPartitionValues(scanFileRow); + // Convert the FileStatus to InternalDataFile using the actionsConverter + dataFiles.add( + actionsConverter.convertAddActionToInternalDataFile( + addFile, + table, + fileFormat, + partitionFields, + fields, + includeColumnStats, + partitionExtractor, + fileStatsExtractor, + partitionValues)); + } + } + this.dataFilesIterator = dataFiles.iterator(); + } + + @Override + public void close() throws Exception {} + + @Override + public boolean hasNext() { + return this.dataFilesIterator.hasNext(); + } + + @Override + public InternalDataFile next() { + return dataFilesIterator.next(); + } + } +} diff --git a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaKernelPartitionExtractor.java b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaKernelPartitionExtractor.java new file mode 100644 index 000000000..cf81b73a1 --- /dev/null +++ b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaKernelPartitionExtractor.java @@ -0,0 +1,540 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.delta; + +import static org.apache.xtable.collectors.CustomCollectors.toList; +import static org.apache.xtable.delta.DeltaValueConverter.convertFromDeltaPartitionValue; +import static org.apache.xtable.delta.DeltaValueConverter.convertToDeltaPartitionValue; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +import lombok.AccessLevel; +import lombok.Builder; +import lombok.NoArgsConstructor; +import lombok.extern.log4j.Log4j2; + +import org.apache.spark.sql.types.Metadata; + +import scala.collection.JavaConverters; + +import com.google.common.collect.Iterators; +import com.google.common.collect.PeekingIterator; + +import io.delta.kernel.types.*; +import io.delta.kernel.types.FieldMetadata; + +import org.apache.xtable.exception.PartitionSpecException; +import org.apache.xtable.model.schema.InternalPartitionField; +import org.apache.xtable.model.schema.InternalSchema; +import org.apache.xtable.model.schema.PartitionTransformType; +import org.apache.xtable.model.stat.PartitionValue; +import org.apache.xtable.model.stat.Range; +import org.apache.xtable.model.storage.InternalDataFile; +import org.apache.xtable.schema.SchemaFieldFinder; + +@Log4j2 +@NoArgsConstructor(access = AccessLevel.PRIVATE) +public class DeltaKernelPartitionExtractor { + private static final DeltaKernelPartitionExtractor INSTANCE = new DeltaKernelPartitionExtractor(); + private static final String CAST_FUNCTION = "CAST(%s as DATE)"; + private static final String DATE_FORMAT_FUNCTION = "DATE_FORMAT(%s, '%s')"; + private static final String YEAR_FUNCTION = "YEAR(%s)"; + private static final String DATE_FORMAT_FOR_HOUR = "yyyy-MM-dd-HH"; + private static final String DATE_FORMAT_FOR_DAY = "yyyy-MM-dd"; + private static final String DATE_FORMAT_FOR_MONTH = "yyyy-MM"; + private static final String DATE_FORMAT_FOR_YEAR = "yyyy"; + private static final String BUCKET_FUNCTION = "MOD((HASH(%s) & %d), %d)"; + // For timestamp partition fields, actual partition column names in delta format will be of type + // generated & and with a name like `delta_partition_col_{transform_type}_{source_field_name}`. + private static final String DELTA_PARTITION_COL_NAME_FORMAT = "xtable_partition_col_%s_%s"; + static final String DELTA_GENERATION_EXPRESSION = "delta.generationExpression"; + private static final List GRANULARITIES = + Arrays.asList( + ParsedGeneratedExpr.GeneratedExprType.YEAR, + ParsedGeneratedExpr.GeneratedExprType.MONTH, + ParsedGeneratedExpr.GeneratedExprType.DAY, + ParsedGeneratedExpr.GeneratedExprType.HOUR); + + public static DeltaKernelPartitionExtractor getInstance() { + return INSTANCE; + } + + /** + * Extracts partition fields from delta table. Partitioning by nested columns isn't supported. + * Example: Given a delta table and a reference to DeltaLog, method parameters can be obtained by + * deltaLog = DeltaLog.forTable(spark, deltaTablePath); InternalSchema internalSchema = + * DeltaSchemaExtractor.getInstance().toInternalSchema(deltaLog.snapshot().schema()); StructType + * partitionSchema = deltaLog.metadata().partitionSchema(); + * + * @param internalSchema canonical representation of the schema. + * @param partitionSchema partition schema of the delta table. + * @return list of canonical representation of the partition fields + */ + public List convertFromDeltaPartitionFormat( + InternalSchema internalSchema, StructType partitionSchema) { + if (partitionSchema.fields().size() == 0) { + return Collections.emptyList(); + } + return getInternalPartitionFields(partitionSchema, internalSchema); + } + + /** + * If all of them are value process individually and return. If they contain month they should + * contain year as well. If they contain day they should contain month and year as well. If they + * contain hour they should contain day, month and year as well. Other supports CAST(col as DATE) + * and DATE_FORMAT(col, 'yyyy-MM-dd'). Partition by nested fields may not be fully supported. + */ + private List getInternalPartitionFields( + StructType partitionSchema, InternalSchema internalSchema) { + PeekingIterator itr = + Iterators.peekingIterator(partitionSchema.fields().iterator()); + List partitionFields = new ArrayList<>(partitionSchema.fields().size()); + while (itr.hasNext()) { + StructField currPartitionField = itr.peek(); + if (!currPartitionField.getMetadata().contains(DELTA_GENERATION_EXPRESSION)) { + partitionFields.add( + InternalPartitionField.builder() + .sourceField( + SchemaFieldFinder.getInstance() + .findFieldByPath(internalSchema, currPartitionField.getName())) + .transformType(PartitionTransformType.VALUE) + .build()); + itr.next(); // consume the field. + } else { + // Partition contains generated expression. + // if it starts with year we should consume until we hit field with no generated expression + // or we hit a field with generated expression that is of cast or date format. + String expr = currPartitionField.getMetadata().getString(DELTA_GENERATION_EXPRESSION); + ParsedGeneratedExpr parsedGeneratedExpr = + ParsedGeneratedExpr.buildFromString(currPartitionField.getName(), expr); + if (ParsedGeneratedExpr.GeneratedExprType.CAST == parsedGeneratedExpr.generatedExprType) { + partitionFields.add( + getPartitionWithDateTransform( + currPartitionField.getName(), parsedGeneratedExpr, internalSchema)); + itr.next(); // consume the field. + } else if (ParsedGeneratedExpr.GeneratedExprType.DATE_FORMAT + == parsedGeneratedExpr.generatedExprType) { + partitionFields.add( + getPartitionWithDateFormatTransform( + currPartitionField.getName(), parsedGeneratedExpr, internalSchema)); + itr.next(); // consume the field. + } else { + // consume until we hit field with no generated expression or generated expression + // that is not of type cast or date format. + List parsedGeneratedExprs = new ArrayList<>(); + while (itr.hasNext() + && currPartitionField.getMetadata().contains(DELTA_GENERATION_EXPRESSION)) { + expr = currPartitionField.getMetadata().getString(DELTA_GENERATION_EXPRESSION); + parsedGeneratedExpr = + ParsedGeneratedExpr.buildFromString(currPartitionField.getName(), expr); + + if (ParsedGeneratedExpr.GeneratedExprType.CAST == parsedGeneratedExpr.generatedExprType + || ParsedGeneratedExpr.GeneratedExprType.DATE_FORMAT + == parsedGeneratedExpr.generatedExprType) { + break; + } + parsedGeneratedExprs.add(parsedGeneratedExpr); + itr.next(); // consume the field + if (itr.hasNext()) { + currPartitionField = itr.peek(); + } + } + partitionFields.add( + getPartitionColumnsForHourOrDayOrMonthOrYear(parsedGeneratedExprs, internalSchema)); + } + } + } + return partitionFields; + } + + private InternalPartitionField getPartitionColumnsForHourOrDayOrMonthOrYear( + List parsedGeneratedExprs, InternalSchema internalSchema) { + if (parsedGeneratedExprs.size() > 4) { + throw new IllegalStateException("Invalid partition transform"); + } + validate( + parsedGeneratedExprs, new HashSet<>(GRANULARITIES.subList(0, parsedGeneratedExprs.size()))); + + ParsedGeneratedExpr transform = parsedGeneratedExprs.get(0); + List partitionColumns = + parsedGeneratedExprs.stream() + .map(parsedGeneratedExpr -> parsedGeneratedExpr.partitionColumnName) + .collect(toList(parsedGeneratedExprs.size())); + return InternalPartitionField.builder() + .sourceField( + SchemaFieldFinder.getInstance().findFieldByPath(internalSchema, transform.sourceColumn)) + .partitionFieldNames(partitionColumns) + .transformType( + parsedGeneratedExprs.get(parsedGeneratedExprs.size() - 1) + .internalPartitionTransformType) + .build(); + } + + // Cast has default format of yyyy-MM-dd. + private InternalPartitionField getPartitionWithDateTransform( + String partitionColumnName, + ParsedGeneratedExpr parsedGeneratedExpr, + InternalSchema internalSchema) { + return InternalPartitionField.builder() + .sourceField( + SchemaFieldFinder.getInstance() + .findFieldByPath(internalSchema, parsedGeneratedExpr.sourceColumn)) + .partitionFieldNames(Collections.singletonList(partitionColumnName)) + .transformType(PartitionTransformType.DAY) + .build(); + } + + private InternalPartitionField getPartitionWithDateFormatTransform( + String partitionColumnName, + ParsedGeneratedExpr parsedGeneratedExpr, + InternalSchema internalSchema) { + return InternalPartitionField.builder() + .sourceField( + SchemaFieldFinder.getInstance() + .findFieldByPath(internalSchema, parsedGeneratedExpr.sourceColumn)) + .partitionFieldNames(Collections.singletonList(partitionColumnName)) + .transformType(parsedGeneratedExpr.internalPartitionTransformType) + .build(); + } + + public Map convertToDeltaPartitionFormat( + List partitionFields) { + if (partitionFields == null) { + return null; + } + Map nameToStructFieldMap = new HashMap<>(); + for (InternalPartitionField internalPartitionField : partitionFields) { + String currPartitionColumnName; + StructField field; + + if (internalPartitionField.getTransformType() == PartitionTransformType.VALUE) { + currPartitionColumnName = internalPartitionField.getSourceField().getName(); + field = null; + } else { + // Since partition field of timestamp or bucket type, create new field in schema. + field = getGeneratedField(internalPartitionField); + currPartitionColumnName = field.getName(); + } + nameToStructFieldMap.put(currPartitionColumnName, field); + } + return nameToStructFieldMap; + } + + public Map partitionValueSerialization(InternalDataFile internalDataFile) { + Map partitionValuesSerialized = new HashMap<>(); + if (internalDataFile.getPartitionValues() == null + || internalDataFile.getPartitionValues().isEmpty()) { + return partitionValuesSerialized; + } + for (PartitionValue partitionValue : internalDataFile.getPartitionValues()) { + InternalPartitionField partitionField = partitionValue.getPartitionField(); + PartitionTransformType transformType = partitionField.getTransformType(); + String partitionValueSerialized; + if (transformType == PartitionTransformType.VALUE) { + partitionValueSerialized = + convertToDeltaPartitionValue( + partitionValue.getRange().getMaxValue(), + partitionField.getSourceField().getSchema().getDataType(), + transformType, + ""); + partitionValuesSerialized.put( + partitionField.getSourceField().getName(), partitionValueSerialized); + } else if (transformType == PartitionTransformType.BUCKET) { + partitionValueSerialized = partitionValue.getRange().getMaxValue().toString(); + partitionValuesSerialized.put( + getGeneratedColumnName(partitionField), partitionValueSerialized); + } else { + // use appropriate date formatter for value serialization. + partitionValueSerialized = + convertToDeltaPartitionValue( + partitionValue.getRange().getMaxValue(), + partitionField.getSourceField().getSchema().getDataType(), + transformType, + getDateFormat(partitionField.getTransformType())); + partitionValuesSerialized.put( + getGeneratedColumnName(partitionField), partitionValueSerialized); + } + } + return partitionValuesSerialized; + } + + public List partitionValueExtraction( + scala.collection.Map values, List partitionFields) { + return partitionFields.stream() + .map( + partitionField -> { + PartitionTransformType partitionTransformType = partitionField.getTransformType(); + String dateFormat = + partitionTransformType.isTimeBased() + ? getDateFormat(partitionTransformType) + : null; + String serializedValue = + getSerializedPartitionValue(convertScalaMapToJavaMap(values), partitionField); + Object partitionValue = + convertFromDeltaPartitionValue( + serializedValue, + partitionField.getSourceField().getSchema().getDataType(), + partitionField.getTransformType(), + dateFormat); + return PartitionValue.builder() + .partitionField(partitionField) + .range(Range.scalar(partitionValue)) + .build(); + }) + .collect(toList(partitionFields.size())); + } + + private String getSerializedPartitionValue( + Map values, InternalPartitionField partitionField) { + if (partitionField.getPartitionFieldNames() == null + || partitionField.getPartitionFieldNames().isEmpty()) { + return values.getOrDefault(partitionField.getSourceField().getName(), null); + } + List partitionFieldNames = partitionField.getPartitionFieldNames(); + if (partitionFieldNames.size() == 1) { + return values.getOrDefault(partitionFieldNames.get(0), null); + } + return partitionFieldNames.stream() + .map(name -> values.get(name)) + .collect(Collectors.joining("-")); + } + + private String getGeneratedColumnName(InternalPartitionField internalPartitionField) { + return String.format( + DELTA_PARTITION_COL_NAME_FORMAT, + internalPartitionField.getTransformType().toString(), + internalPartitionField.getSourceField().getName()); + } + + private String getDateFormat(PartitionTransformType transformType) { + switch (transformType) { + case YEAR: + return DATE_FORMAT_FOR_YEAR; + case MONTH: + return DATE_FORMAT_FOR_MONTH; + case DAY: + return DATE_FORMAT_FOR_DAY; + case HOUR: + return DATE_FORMAT_FOR_HOUR; + default: + throw new PartitionSpecException("Invalid transform type"); + } + } + + private StructField getGeneratedField(InternalPartitionField internalPartitionField) { + String generatedExpression; + DataType dataType; + String currPartitionColumnName = getGeneratedColumnName(internalPartitionField); + switch (internalPartitionField.getTransformType()) { + case YEAR: + generatedExpression = + String.format(YEAR_FUNCTION, internalPartitionField.getSourceField().getPath()); + dataType = IntegerType.INTEGER; + break; + case MONTH: + case HOUR: + generatedExpression = + String.format( + DATE_FORMAT_FUNCTION, + internalPartitionField.getSourceField().getPath(), + getDateFormat(internalPartitionField.getTransformType())); + dataType = IntegerType.INTEGER; + break; + case DAY: + generatedExpression = + String.format(CAST_FUNCTION, internalPartitionField.getSourceField().getPath()); + dataType = DateType.DATE; + break; + case BUCKET: + generatedExpression = + String.format( + BUCKET_FUNCTION, + internalPartitionField.getSourceField().getPath(), + Integer.MAX_VALUE, + (int) + internalPartitionField + .getTransformOptions() + .get(InternalPartitionField.NUM_BUCKETS)); + dataType = IntegerType.INTEGER; + break; + default: + throw new PartitionSpecException("Invalid transform type"); + } + Map generatedExpressionMetadata = + Collections.singletonMap(DELTA_GENERATION_EXPRESSION, generatedExpression); + Metadata partitionFieldMetadata = + new Metadata(ScalaUtils.convertJavaMapToScala(generatedExpressionMetadata)); + return new StructField(currPartitionColumnName, dataType, true, FieldMetadata.empty()); + } + + private void validate( + List parsedGeneratedExprs, + Set expectedTypesToBePresent) { + Set sourceFields = + parsedGeneratedExprs.stream().map(expr -> expr.sourceColumn).collect(Collectors.toSet()); + if (sourceFields.size() > 1) { + log.error( + String.format("Multiple source columns found for partition transform: %s", sourceFields)); + throw new PartitionSpecException( + String.format("Multiple source columns found for partition transform: %s", sourceFields)); + } + Set actualTypesPresent = + parsedGeneratedExprs.stream() + .map(expr -> expr.generatedExprType) + .collect(Collectors.toSet()); + if (!actualTypesPresent.equals(expectedTypesToBePresent)) { + log.error( + "Mismatched types present. Expected: " + + expectedTypesToBePresent + + ", Found: " + + actualTypesPresent); + throw new PartitionSpecException( + "Mismatched types present. Expected: " + + expectedTypesToBePresent + + ", Found: " + + actualTypesPresent); + } + } + + private Map convertScalaMapToJavaMap( + scala.collection.Map scalaMap) { + return JavaConverters.mapAsJavaMapConverter(scalaMap).asJava(); + } + + @Builder + static class ParsedGeneratedExpr { + private static final Pattern YEAR_PATTERN = Pattern.compile("YEAR\\(([^)]+)\\)"); + private static final Pattern MONTH_PATTERN = Pattern.compile("MONTH\\(([^)]+)\\)"); + private static final Pattern DAY_PATTERN = Pattern.compile("DAY\\(([^)]+)\\)"); + private static final Pattern HOUR_PATTERN = Pattern.compile("HOUR\\(([^)]+)\\)"); + private static final Pattern CAST_PATTERN = Pattern.compile("CAST\\(([^ ]+) AS DATE\\)"); + private static final Pattern DATE_FORMAT_PATTERN = + Pattern.compile("DATE_FORMAT\\(([^,]+),[^']+'([^']+)'\\)"); + + enum GeneratedExprType { + YEAR, + MONTH, + DAY, + HOUR, + CAST, + DATE_FORMAT + } + + String sourceColumn; + String partitionColumnName; + GeneratedExprType generatedExprType; + PartitionTransformType internalPartitionTransformType; + + private static ParsedGeneratedExpr buildFromString(String partitionColumnName, String expr) { + if (expr.contains("YEAR")) { + return ParsedGeneratedExpr.builder() + .generatedExprType(GeneratedExprType.YEAR) + .partitionColumnName(partitionColumnName) + .sourceColumn(extractColumnName(expr, YEAR_PATTERN)) + .internalPartitionTransformType(PartitionTransformType.YEAR) + .build(); + } else if (expr.contains("MONTH")) { + return ParsedGeneratedExpr.builder() + .generatedExprType(GeneratedExprType.MONTH) + .partitionColumnName(partitionColumnName) + .sourceColumn(extractColumnName(expr, MONTH_PATTERN)) + .internalPartitionTransformType(PartitionTransformType.MONTH) + .build(); + } else if (expr.contains("DAY")) { + return ParsedGeneratedExpr.builder() + .generatedExprType(GeneratedExprType.DAY) + .partitionColumnName(partitionColumnName) + .sourceColumn(extractColumnName(expr, DAY_PATTERN)) + .internalPartitionTransformType(PartitionTransformType.DAY) + .build(); + } else if (expr.contains("HOUR")) { + return ParsedGeneratedExpr.builder() + .generatedExprType(GeneratedExprType.HOUR) + .partitionColumnName(partitionColumnName) + .sourceColumn(extractColumnName(expr, HOUR_PATTERN)) + .internalPartitionTransformType(PartitionTransformType.HOUR) + .build(); + } else if (expr.contains("CAST")) { + return ParsedGeneratedExpr.builder() + .generatedExprType(GeneratedExprType.CAST) + .partitionColumnName(partitionColumnName) + .sourceColumn(extractColumnName(expr, CAST_PATTERN)) + .internalPartitionTransformType(PartitionTransformType.DAY) + .build(); + } else if (expr.contains("DATE_FORMAT")) { + Matcher matcher = DATE_FORMAT_PATTERN.matcher(expr); + if (matcher.find()) { + /* + * from DATE_FORMAT(source_col, 'yyyy-MM-dd-HH') the code below extracts yyyy-MM-dd-HH. + */ + String fieldName = matcher.group(1); + String dateFormatExpr = matcher.group(2); + return ParsedGeneratedExpr.builder() + .generatedExprType(GeneratedExprType.DATE_FORMAT) + .partitionColumnName(partitionColumnName) + .sourceColumn(fieldName) + .internalPartitionTransformType(computeInternalPartitionTransform(dateFormatExpr)) + .build(); + } else { + throw new IllegalArgumentException("Could not extract values from: " + expr); + } + } else { + throw new IllegalArgumentException( + "Unsupported expression for generated expression: " + expr); + } + } + + // Supporting granularity as per https://docs.databricks.com/en/delta/generated-columns.html + private static PartitionTransformType computeInternalPartitionTransform(String dateFormatExpr) { + if (DATE_FORMAT_FOR_HOUR.equals(dateFormatExpr)) { + return PartitionTransformType.HOUR; + } else if (DATE_FORMAT_FOR_DAY.equals(dateFormatExpr)) { + return PartitionTransformType.DAY; + } else if (DATE_FORMAT_FOR_MONTH.equals(dateFormatExpr)) { + return PartitionTransformType.MONTH; + } else { + throw new IllegalArgumentException( + String.format( + "Unsupported date format expression: %s for generated expression", dateFormatExpr)); + } + } + + private static String extractColumnName(String expr, Pattern regexPattern) { + Matcher matcher = regexPattern.matcher(expr); + if (matcher.find()) { + return matcher.group(1).trim(); + } + throw new IllegalArgumentException( + "Could not extract column name from: " + + expr + + " using pattern: " + + regexPattern.pattern()); + } + } +} diff --git a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaKernelSchemaExtractor.java b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaKernelSchemaExtractor.java new file mode 100644 index 000000000..5371a2b9b --- /dev/null +++ b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaKernelSchemaExtractor.java @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.delta; + +import java.util.*; + +import io.delta.kernel.types.*; + +import org.apache.xtable.collectors.CustomCollectors; +import org.apache.xtable.model.schema.InternalField; +import org.apache.xtable.model.schema.InternalSchema; +import org.apache.xtable.model.schema.InternalType; +import org.apache.xtable.schema.SchemaUtils; + +public class DeltaKernelSchemaExtractor { + + private static final String DELTA_COLUMN_MAPPING_ID = "delta.columnMapping.id"; + private static final DeltaKernelSchemaExtractor INSTANCE = new DeltaKernelSchemaExtractor(); + private static final Map + DEFAULT_TIMESTAMP_PRECISION_METADATA = + Collections.singletonMap( + InternalSchema.MetadataKey.TIMESTAMP_PRECISION, InternalSchema.MetadataValue.MICROS); + + public static DeltaKernelSchemaExtractor getInstance() { + return INSTANCE; + } + + public InternalSchema toInternalSchema(StructType structType) { + return toInternalSchema(structType, null, false, null, null); + } + + String trimmedTypeName = ""; + InternalType type = null; + + private InternalSchema toInternalSchema( + DataType dataType, + String parentPath, + boolean nullable, + String comment, + FieldMetadata originalMetadata) { + + Map metadata = null; + List fields = null; + + if (dataType instanceof IntegerType) { + type = InternalType.INT; + trimmedTypeName = "integer"; + } else if (dataType instanceof StringType) { + type = InternalType.STRING; + trimmedTypeName = "string"; + } else if (dataType instanceof BooleanType) { + type = InternalType.BOOLEAN; + trimmedTypeName = "boolean"; + } else if (dataType instanceof FloatType) { + type = InternalType.FLOAT; + trimmedTypeName = "float"; + } else if (dataType instanceof DoubleType) { + type = InternalType.DOUBLE; + trimmedTypeName = "double"; + } else if (dataType instanceof BinaryType) { + if (originalMetadata.contains(InternalSchema.XTABLE_LOGICAL_TYPE) + && "uuid".equals(originalMetadata.getString(InternalSchema.XTABLE_LOGICAL_TYPE))) { + type = InternalType.UUID; + trimmedTypeName = "binary"; + } else { + type = InternalType.BYTES; + trimmedTypeName = "binary"; + } + } else if (dataType instanceof LongType) { + type = InternalType.LONG; + trimmedTypeName = "long"; + } else if (dataType instanceof DateType) { + type = InternalType.DATE; + trimmedTypeName = "date"; + } else if (dataType instanceof TimestampType) { + type = InternalType.TIMESTAMP; + metadata = DEFAULT_TIMESTAMP_PRECISION_METADATA; + trimmedTypeName = "timestamp"; + } else if (dataType instanceof TimestampNTZType) { + type = InternalType.TIMESTAMP_NTZ; + metadata = DEFAULT_TIMESTAMP_PRECISION_METADATA; + trimmedTypeName = "timestamp_ntz"; + } else if (dataType instanceof StructType) { + // Handle StructType + StructType structType = (StructType) dataType; + // your logic here + + fields = + structType.fields().stream() + .filter( + field -> + !field + .getMetadata() + .contains(DeltaPartitionExtractor.DELTA_GENERATION_EXPRESSION)) + .map( + field -> { + Integer fieldId = + field.getMetadata().contains(DELTA_COLUMN_MAPPING_ID) + ? Long.valueOf(field.getMetadata().getLong(DELTA_COLUMN_MAPPING_ID)) + .intValue() + : null; + String fieldComment = + field.getMetadata().contains("comment") + ? field.getMetadata().getString("comment") + : null; + InternalSchema schema = + toInternalSchema( + field.getDataType(), + SchemaUtils.getFullyQualifiedPath(parentPath, field.getName()), + field.isNullable(), + fieldComment, + field.getMetadata()); + return InternalField.builder() + .name(field.getName()) + .fieldId(fieldId) + .parentPath(parentPath) + .schema(schema) + .defaultValue( + field.isNullable() ? InternalField.Constants.NULL_DEFAULT_VALUE : null) + .build(); + }) + .collect(CustomCollectors.toList(structType.fields().size())); + type = InternalType.RECORD; + trimmedTypeName = "struct"; + } else if (dataType instanceof DecimalType) { + DecimalType decimalType = (DecimalType) dataType; + metadata = new HashMap<>(2, 1.0f); + metadata.put(InternalSchema.MetadataKey.DECIMAL_PRECISION, decimalType.getPrecision()); + metadata.put(InternalSchema.MetadataKey.DECIMAL_SCALE, decimalType.getScale()); + type = InternalType.DECIMAL; + trimmedTypeName = "decimal"; + + } else if (dataType instanceof ArrayType) { + ArrayType arrayType = (ArrayType) dataType; + InternalSchema elementSchema = + toInternalSchema( + arrayType.getElementType(), + SchemaUtils.getFullyQualifiedPath( + parentPath, InternalField.Constants.ARRAY_ELEMENT_FIELD_NAME), + arrayType.containsNull(), + null, + null); + InternalField elementField = + InternalField.builder() + .name(InternalField.Constants.ARRAY_ELEMENT_FIELD_NAME) + .parentPath(parentPath) + .schema(elementSchema) + .build(); + type = InternalType.LIST; + fields = Collections.singletonList(elementField); + trimmedTypeName = "array"; + } else if (dataType instanceof MapType) { + MapType mapType = (MapType) dataType; + InternalSchema keySchema = + toInternalSchema( + mapType.getKeyType(), + SchemaUtils.getFullyQualifiedPath( + parentPath, InternalField.Constants.MAP_VALUE_FIELD_NAME), + false, + null, + null); + InternalField keyField = + InternalField.builder() + .name(InternalField.Constants.MAP_KEY_FIELD_NAME) + .parentPath(parentPath) + .schema(keySchema) + .build(); + InternalSchema valueSchema = + toInternalSchema( + mapType.getValueType(), + SchemaUtils.getFullyQualifiedPath( + parentPath, InternalField.Constants.MAP_VALUE_FIELD_NAME), + mapType.isValueContainsNull(), + null, + null); + InternalField valueField = + InternalField.builder() + .name(InternalField.Constants.MAP_VALUE_FIELD_NAME) + .parentPath(parentPath) + .schema(valueSchema) + .build(); + type = InternalType.MAP; + fields = Arrays.asList(keyField, valueField); + trimmedTypeName = "map"; + } + return InternalSchema.builder() + .name(trimmedTypeName) + .dataType(type) + .comment(comment) + .isNullable(nullable) + .metadata(metadata) + .fields(fields) + .build(); + } +} diff --git a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaKernelStatsExtractor.java b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaKernelStatsExtractor.java new file mode 100644 index 000000000..bedc063f5 --- /dev/null +++ b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaKernelStatsExtractor.java @@ -0,0 +1,314 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.delta; + +import java.io.IOException; +import java.util.*; +import java.util.function.Function; +import java.util.stream.Collectors; + +import lombok.AccessLevel; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Value; +import lombok.extern.log4j.Log4j2; + +import org.apache.commons.lang3.StringUtils; + +import com.fasterxml.jackson.annotation.JsonAnySetter; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.annotations.VisibleForTesting; + +import io.delta.kernel.internal.actions.AddFile; + +import org.apache.xtable.collectors.CustomCollectors; +import org.apache.xtable.model.exception.ParseException; +import org.apache.xtable.model.schema.InternalField; +import org.apache.xtable.model.schema.InternalSchema; +import org.apache.xtable.model.schema.InternalType; +import org.apache.xtable.model.stat.ColumnStat; +import org.apache.xtable.model.stat.FileStats; +import org.apache.xtable.model.stat.Range; + +/** + * DeltaStatsExtractor extracts column stats and also responsible for their serialization leveraging + * {@link DeltaValueConverter}. + */ +@Log4j2 +@NoArgsConstructor(access = AccessLevel.PRIVATE) +public class DeltaKernelStatsExtractor { + private static final Set FIELD_TYPES_WITH_STATS_SUPPORT = + new HashSet<>( + Arrays.asList( + InternalType.BOOLEAN, + InternalType.DATE, + InternalType.DECIMAL, + InternalType.DOUBLE, + InternalType.INT, + InternalType.LONG, + InternalType.FLOAT, + InternalType.STRING, + InternalType.TIMESTAMP, + InternalType.TIMESTAMP_NTZ)); + + private static final DeltaKernelStatsExtractor INSTANCE = new DeltaKernelStatsExtractor(); + + private static final ObjectMapper MAPPER = new ObjectMapper(); + + /* this data structure collects type names of all unrecognized Delta Lake stats. For instance + data file stats in presence of delete vectors would contain 'tightBounds' stat which is + currently not handled by XTable */ + private final Set unsupportedStats = new HashSet<>(); + + public static DeltaKernelStatsExtractor getInstance() { + return INSTANCE; + } + + public String convertStatsToDeltaFormat( + InternalSchema schema, long numRecords, List columnStats) + throws JsonProcessingException { + DeltaStats.DeltaStatsBuilder deltaStatsBuilder = DeltaStats.builder(); + deltaStatsBuilder.numRecords(numRecords); + if (columnStats == null) { + return MAPPER.writeValueAsString(deltaStatsBuilder.build()); + } + Set validPaths = getPathsFromStructSchemaForMinAndMaxStats(schema); + List validColumnStats = + columnStats.stream() + .filter(stat -> validPaths.contains(stat.getField().getPath())) + .collect(Collectors.toList()); + DeltaStats deltaStats = + deltaStatsBuilder + .minValues(getMinValues(validColumnStats)) + .maxValues(getMaxValues(validColumnStats)) + .nullCount(getNullCount(validColumnStats)) + .build(); + return MAPPER.writeValueAsString(deltaStats); + } + + private Set getPathsFromStructSchemaForMinAndMaxStats(InternalSchema schema) { + return schema.getAllFields().stream() + .filter( + field -> { + InternalType type = field.getSchema().getDataType(); + return FIELD_TYPES_WITH_STATS_SUPPORT.contains(type); + }) + .map(InternalField::getPath) + .collect(Collectors.toSet()); + } + + private Map getMinValues(List validColumnStats) { + return getValues(validColumnStats, columnStat -> columnStat.getRange().getMinValue()); + } + + private Map getMaxValues(List validColumnStats) { + return getValues(validColumnStats, columnStat -> columnStat.getRange().getMaxValue()); + } + + private Map getValues( + List validColumnStats, Function valueExtractor) { + Map jsonObject = new HashMap<>(); + validColumnStats.forEach( + columnStat -> { + InternalField field = columnStat.getField(); + String[] pathParts = field.getPathParts(); + insertValueAtPath( + jsonObject, + pathParts, + DeltaValueConverter.convertToDeltaColumnStatValue( + valueExtractor.apply(columnStat), field.getSchema())); + }); + return jsonObject; + } + + private Map getNullCount(List validColumnStats) { + // TODO: Additional work needed to track nulls maps & arrays. + Map jsonObject = new HashMap<>(); + validColumnStats.forEach( + columnStat -> { + String[] pathParts = columnStat.getField().getPathParts(); + insertValueAtPath(jsonObject, pathParts, columnStat.getNumNulls()); + }); + return jsonObject; + } + + private void insertValueAtPath(Map jsonObject, String[] pathParts, Object value) { + if (pathParts == null || pathParts.length == 0) { + return; + } + Map currObject = jsonObject; + for (int i = 0; i < pathParts.length; i++) { + String part = pathParts[i]; + if (i == pathParts.length - 1) { + currObject.put(part, value); + } else { + if (!currObject.containsKey(part)) { + currObject.put(part, new HashMap()); + } + try { + currObject = (HashMap) currObject.get(part); + } catch (ClassCastException e) { + throw new RuntimeException( + String.format( + "Cannot cast to hashmap while inserting stats at path %s", + String.join("->", pathParts)), + e); + } + } + } + } + + public FileStats getColumnStatsForFile(AddFile addFile, List fields) { + + Optional statsOpt = addFile.getStatsJson(); + if (!statsOpt.isPresent() || StringUtils.isEmpty(statsOpt.get())) { + // No statistics available + return FileStats.builder().columnStats(Collections.emptyList()).numRecords(0).build(); + } + // TODO: Additional work needed to track maps & arrays. + try { + DeltaStats deltaStats = MAPPER.readValue(statsOpt.get(), DeltaStats.class); + + collectUnsupportedStats(deltaStats.getAdditionalStats()); + + Map fieldPathToMaxValue = flattenStatMap(deltaStats.getMaxValues()); + Map fieldPathToMinValue = flattenStatMap(deltaStats.getMinValues()); + Map fieldPathToNullCount = flattenStatMap(deltaStats.getNullCount()); + List columnStats = + fields.stream() + .filter(field -> fieldPathToMaxValue.containsKey(field.getPath())) + .map( + field -> { + String fieldPath = field.getPath(); + Object minRaw = fieldPathToMinValue.get(fieldPath); + Object maxRaw = fieldPathToMaxValue.get(fieldPath); + Object nullCountRaw = fieldPathToNullCount.get(fieldPath); + Object minValue = + minRaw != null + ? DeltaValueConverter.convertFromDeltaColumnStatValue( + minRaw, field.getSchema()) + : null; + Object maxValue = + maxRaw != null + ? DeltaValueConverter.convertFromDeltaColumnStatValue( + maxRaw, field.getSchema()) + : null; + long nullCount = + nullCountRaw instanceof Number ? ((Number) nullCountRaw).longValue() : 0; + Range range = Range.vector(minValue, maxValue); + return ColumnStat.builder() + .field(field) + .numValues(deltaStats.getNumRecords()) + .numNulls(nullCount) + .range(range) + .build(); + }) + .collect(CustomCollectors.toList(fields.size())); + return FileStats.builder() + .columnStats(columnStats) + .numRecords(deltaStats.getNumRecords()) + .build(); + } catch (IOException ex) { + throw new ParseException("Unable to parse stats json", ex); + } + } + + private void collectUnsupportedStats(Map additionalStats) { + if (additionalStats == null || additionalStats.isEmpty()) { + return; + } + + additionalStats.keySet().stream() + .filter(key -> !unsupportedStats.contains(key)) + .forEach( + key -> { + log.info("Unrecognized/unsupported Delta data file stat: {}", key); + unsupportedStats.add(key); + }); + } + + /** + * Takes the input map which represents a json object and flattens it. + * + * @param statMap input json map + * @return map with keys representing the dot-path for the field + */ + private Map flattenStatMap(Map statMap) { + Map result = new HashMap<>(); + Queue statFieldQueue = new ArrayDeque<>(); + statFieldQueue.add(StatField.of("", statMap)); + while (!statFieldQueue.isEmpty()) { + StatField statField = statFieldQueue.poll(); + String prefix = statField.getParentPath().isEmpty() ? "" : statField.getParentPath() + "."; + statField + .getValues() + .forEach( + (fieldName, value) -> { + String fullName = prefix + fieldName; + if (value instanceof Map) { + statFieldQueue.add(StatField.of(fullName, (Map) value)); + } else { + result.put(fullName, value); + } + }); + } + return result; + } + + /** + * Returns the names of all unsupported stats that have been discovered during the parsing of + * Delta Lake stats. + * + * @return set of unsupported stats + */ + @VisibleForTesting + Set getUnsupportedStats() { + return Collections.unmodifiableSet(unsupportedStats); + } + + @Builder + @Value + private static class DeltaStats { + long numRecords; + Map minValues; + Map maxValues; + Map nullCount; + + /* this is a catch-all for any additional stats that are not explicitly handled */ + @JsonIgnore + @Getter(lazy = true) + Map additionalStats = new HashMap<>(); + + @JsonAnySetter + public void setAdditionalStat(String key, Object value) { + getAdditionalStats().put(key, value); + } + } + + @Value + @AllArgsConstructor(staticName = "of") + private static class StatField { + String parentPath; + Map values; + } +} diff --git a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaKernelTableExtractor.java b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaKernelTableExtractor.java new file mode 100644 index 000000000..f1e4ed780 --- /dev/null +++ b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaKernelTableExtractor.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.delta; + +import java.time.Instant; +import java.util.List; +import java.util.stream.Collectors; + +import lombok.Builder; + +import io.delta.kernel.*; +import io.delta.kernel.engine.Engine; +import io.delta.kernel.types.StructField; +import io.delta.kernel.types.StructType; + +import org.apache.xtable.model.InternalTable; +import org.apache.xtable.model.schema.InternalPartitionField; +import org.apache.xtable.model.schema.InternalSchema; +import org.apache.xtable.model.storage.DataLayoutStrategy; +import org.apache.xtable.model.storage.TableFormat; + +/** + * Extracts {@link InternalTable} canonical representation of a table at a point in time for Delta. + */ +@Builder +public class DeltaKernelTableExtractor { + @Builder.Default + private static final DeltaKernelSchemaExtractor schemaExtractor = + DeltaKernelSchemaExtractor.getInstance(); + + private final String basePath; + + public InternalTable table( + Table deltaKernelTable, Snapshot snapshot, Engine engine, String tableName, String basePath) { + try { + // Get schema from Delta Kernel's snapshot + io.delta.kernel.types.StructType schema = snapshot.getSchema(); + InternalSchema internalSchema = schemaExtractor.toInternalSchema(schema); + // Get partition columns); + StructType fullSchema = snapshot.getSchema(); // The full table schema + List partitionColumns = snapshot.getPartitionColumnNames(); // List + + List partitionFields_strfld = + fullSchema.fields().stream() + .filter(field -> partitionColumns.contains(field.getName())) + .collect(Collectors.toList()); + + StructType partitionSchema = new StructType(partitionFields_strfld); + + List partitionFields = + DeltaKernelPartitionExtractor.getInstance() + .convertFromDeltaPartitionFormat(internalSchema, partitionSchema); + + DataLayoutStrategy dataLayoutStrategy = + !partitionFields.isEmpty() + ? DataLayoutStrategy.HIVE_STYLE_PARTITION + : DataLayoutStrategy.FLAT; + + // Get the timestamp + long timestamp = snapshot.getTimestamp(engine) * 1000; // Convert to milliseconds + return InternalTable.builder() + .tableFormat(TableFormat.DELTA) + .basePath(basePath) + .name(tableName) + .layoutStrategy(dataLayoutStrategy) + .partitioningFields(partitionFields) + .readSchema(internalSchema) + .latestCommitTime(Instant.ofEpochMilli(timestamp)) + .latestMetadataPath(basePath + "/_delta_log") + .build(); + } catch (Exception e) { + throw new RuntimeException("Failed to extract table information using Delta Kernel", e); + } + } +} diff --git a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaSchemaExtractor.java b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaSchemaExtractor.java index 1376f884e..3b770adf0 100644 --- a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaSchemaExtractor.java +++ b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaSchemaExtractor.java @@ -18,11 +18,7 @@ package org.apache.xtable.delta; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; import lombok.AccessLevel; import lombok.NoArgsConstructor; @@ -41,22 +37,10 @@ import org.apache.xtable.model.schema.InternalType; import org.apache.xtable.schema.SchemaUtils; -/** - * Converts between Delta and InternalTable schemas. Some items to be aware of: - * - *
    - *
  • Delta schemas are represented as Spark StructTypes which do not have enums so the enum - * types are lost when converting from XTable to Delta Lake representations - *
  • Delta does not have a fixed length byte array option so {@link InternalType#FIXED} is - * simply translated to a {@link org.apache.spark.sql.types.BinaryType} - *
  • Similarly, {@link InternalType#TIMESTAMP_NTZ} is translated to a long in Delta Lake - *
- */ @NoArgsConstructor(access = AccessLevel.PRIVATE) public class DeltaSchemaExtractor { private static final String DELTA_COLUMN_MAPPING_ID = "delta.columnMapping.id"; private static final DeltaSchemaExtractor INSTANCE = new DeltaSchemaExtractor(); - // Timestamps in Delta are microsecond precision by default private static final Map DEFAULT_TIMESTAMP_PRECISION_METADATA = Collections.singletonMap( diff --git a/xtable-core/src/main/java/org/apache/xtable/kernel/DeltaKernelConversionSource.java b/xtable-core/src/main/java/org/apache/xtable/kernel/DeltaKernelConversionSource.java new file mode 100644 index 000000000..aa63cc581 --- /dev/null +++ b/xtable-core/src/main/java/org/apache/xtable/kernel/DeltaKernelConversionSource.java @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.kernel; + +import java.io.IOException; +import java.sql.Timestamp; +import java.time.Instant; +import java.util.*; + +import lombok.Builder; + +import org.apache.hadoop.conf.Configuration; + +import io.delta.kernel.Snapshot; +import io.delta.kernel.Table; +import io.delta.kernel.data.Row; +import io.delta.kernel.defaults.engine.DefaultEngine; +import io.delta.kernel.engine.Engine; +import io.delta.kernel.internal.DeltaLogActionUtils; +import io.delta.kernel.internal.InternalScanFileUtils; +import io.delta.kernel.internal.SnapshotImpl; +import io.delta.kernel.internal.actions.*; +import io.delta.kernel.internal.actions.SingleAction; +import io.delta.kernel.internal.fs.Path; +import io.delta.kernel.internal.replay.ActionsIterator; +import io.delta.kernel.internal.util.FileNames; +import io.delta.kernel.types.StructType; +import io.delta.kernel.utils.CloseableIterator; +import io.delta.kernel.utils.FileStatus; + +import org.apache.xtable.delta.*; +import org.apache.xtable.exception.ReadException; +import org.apache.xtable.model.*; +import org.apache.xtable.model.schema.InternalSchema; +import org.apache.xtable.model.storage.FileFormat; +import org.apache.xtable.model.storage.InternalDataFile; +import org.apache.xtable.model.storage.InternalFilesDiff; +import org.apache.xtable.model.storage.PartitionFileGroup; +import org.apache.xtable.spi.extractor.ConversionSource; +import org.apache.xtable.spi.extractor.DataFileIterator; + +@Builder +public class DeltaKernelConversionSource implements ConversionSource { + + @Builder.Default + private final DeltaKernelDataFileExtractor dataFileExtractor = + DeltaKernelDataFileExtractor.builder().build(); + + @Builder.Default + private final DeltaKernelActionsConverter actionsConverter = + DeltaKernelActionsConverter.getInstance(); + + private final String basePath; + private final String tableName; + private final Engine engine; + + private final StructType actionSchema = SingleAction.FULL_SCHEMA; + // private final DeltaKernelTableExtractor tableExtractor; + + @Builder.Default + private final DeltaKernelTableExtractor tableExtractor = + DeltaKernelTableExtractor.builder().build(); + + private Optional deltaIncrementalChangesState = Optional.empty(); + + @Override + public InternalTable getTable(Long version) { + Configuration hadoopConf = new Configuration(); + try { + Engine engine = DefaultEngine.create(hadoopConf); + Table table = Table.forPath(engine, basePath); + Snapshot snapshot = table.getSnapshotAsOfVersion(engine, version); + return tableExtractor.table(table, snapshot, engine, tableName, basePath); + } catch (Exception e) { + throw new ReadException("Failed to get table at version " + version, e); + } + } + + @Override + public InternalTable getCurrentTable() { + Configuration hadoopConf = new Configuration(); + Engine engine = DefaultEngine.create(hadoopConf); + Table table = Table.forPath(engine, basePath); + Snapshot snapshot = table.getLatestSnapshot(engine); + return getTable(snapshot.getVersion()); + } + + @Override + public InternalSnapshot getCurrentSnapshot() { + Configuration hadoopConf = new Configuration(); + Engine engine = DefaultEngine.create(hadoopConf); + Table table_snapshot = Table.forPath(engine, basePath); + Snapshot snapshot = table_snapshot.getLatestSnapshot(engine); + InternalTable table = getTable(snapshot.getVersion()); + return InternalSnapshot.builder() + .table(table) + .partitionedDataFiles( + getInternalDataFiles(snapshot, table_snapshot, engine, table.getReadSchema())) + .sourceIdentifier(getCommitIdentifier(snapshot.getVersion())) + .build(); + } + + @Override + public TableChange getTableChangeForCommit(Long versionNumber) { + Configuration hadoopConf = new Configuration(); + Engine engine = DefaultEngine.create(hadoopConf); + Table table = Table.forPath(engine, basePath); + Snapshot snapshot = table.getSnapshotAsOfVersion(engine, versionNumber); + InternalTable tableAtVersion = + tableExtractor.table(table, snapshot, engine, tableName, basePath); + Map addedFiles = new HashMap<>(); + String provider = ((SnapshotImpl) snapshot).getMetadata().getFormat().getProvider(); + FileFormat fileFormat = actionsConverter.convertToFileFormat(provider); + List files = + DeltaLogActionUtils.listDeltaLogFilesAsIter( + engine, + Collections.singleton(FileNames.DeltaLogFileType.COMMIT), + new Path(basePath), + versionNumber, + Optional.of(versionNumber), + false) + .toInMemoryList(); + + List actions = new ArrayList<>(); + ActionsIterator actionsIterator = + new ActionsIterator(engine, files, actionSchema, Optional.empty()); + while (actionsIterator.hasNext()) { + // Each ActionWrapper may wrap a batch of rows (actions) + CloseableIterator scanFileRows = actionsIterator.next().getColumnarBatch().getRows(); + while (scanFileRows.hasNext()) { + Row scanFileRow = scanFileRows.next(); + if (scanFileRow instanceof AddFile) { + Map partitionValues = + InternalScanFileUtils.getPartitionValues(scanFileRow); + // List actionsForVersion = + // getChangesState().getActionsForVersion(versionNumber); + InternalDataFile dataFile = + actionsConverter.convertAddActionToInternalDataFile( + (AddFile) scanFileRow, + table, + fileFormat, + tableAtVersion.getPartitioningFields(), + tableAtVersion.getReadSchema().getFields(), + true, + DeltaKernelPartitionExtractor.getInstance(), + DeltaKernelStatsExtractor.getInstance(), + partitionValues); + addedFiles.put(dataFile.getPhysicalPath(), dataFile); + } + } + } + + InternalFilesDiff internalFilesDiff = + InternalFilesDiff.builder().filesAdded(addedFiles.values()).build(); + return TableChange.builder() + .tableAsOfChange(tableAtVersion) + .filesDiff(internalFilesDiff) + .sourceIdentifier(getCommitIdentifier(versionNumber)) + .build(); + } + + @Override + public CommitsBacklog getCommitsBacklog( + InstantsForIncrementalSync instantsForIncrementalSync) { + Configuration hadoopConf = new Configuration(); + Engine engine = DefaultEngine.create(hadoopConf); + Table table = Table.forPath(engine, basePath); + Snapshot snapshot = + table.getSnapshotAsOfTimestamp( + engine, Timestamp.from(instantsForIncrementalSync.getLastSyncInstant()).getTime()); + + long versionNumberAtLastSyncInstant = snapshot.getVersion(); + System.out.println("versionNumberAtLastSyncInstant: " + versionNumberAtLastSyncInstant); + // resetState(versionNumberAtLastSyncInstant + 1); + return CommitsBacklog.builder() + .commitsToProcess(getChangesState().getVersionsInSortedOrder()) + .build(); + } + + @Override + public boolean isIncrementalSyncSafeFrom(Instant instant) { + Configuration hadoopConf = new Configuration(); + Engine engine = DefaultEngine.create(hadoopConf); + Table table = Table.forPath(engine, basePath); + Snapshot snapshot = table.getSnapshotAsOfTimestamp(engine, Timestamp.from(instant).getTime()); + + // There is a chance earliest commit of the table is returned if the instant is before the + // earliest commit of the table, hence the additional check. + Instant deltaCommitInstant = Instant.ofEpochMilli(snapshot.getTimestamp(engine)); + return deltaCommitInstant.equals(instant) || deltaCommitInstant.isBefore(instant); + } + + @Override + public String getCommitIdentifier(Long commit) { + return String.valueOf(commit); + } + + // private void resetState(long versionToStartFrom) { + // deltaIncrementalChangesState = + // Optional.of( + // DeltaIncrementalChangesState.builder() + // .deltaLog(deltaLog) + // .versionToStartFrom(versionToStartFrom) + // .build()); + // } + + private List getInternalDataFiles( + io.delta.kernel.Snapshot snapshot, Table table, Engine engine, InternalSchema schema) { + try (DataFileIterator fileIterator = + dataFileExtractor.iterator(snapshot, table, engine, schema)) { + + List dataFiles = new ArrayList<>(); + fileIterator.forEachRemaining(dataFiles::add); + return PartitionFileGroup.fromFiles(dataFiles); + } catch (Exception e) { + throw new ReadException("Failed to iterate through Delta data files", e); + } + } + + @Override + public void close() throws IOException {} + + private DeltaIncrementalChangesState getChangesState() { + return deltaIncrementalChangesState.orElseThrow( + () -> new IllegalStateException("DeltaIncrementalChangesState is not initialized")); + } +} diff --git a/xtable-core/src/test/java/org/apache/xtable/DeltaTableKernel.java b/xtable-core/src/test/java/org/apache/xtable/DeltaTableKernel.java new file mode 100644 index 000000000..050d12e64 --- /dev/null +++ b/xtable-core/src/test/java/org/apache/xtable/DeltaTableKernel.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable; + +// import org.junit.jupiter.api.Test; +// +import static io.delta.kernel.internal.util.Utils.singletonCloseableIterator; + +import java.io.IOException; +import java.util.Optional; + +import org.apache.hadoop.conf.Configuration; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.delta.kernel.*; +import io.delta.kernel.data.ColumnVector; +import io.delta.kernel.data.ColumnarBatch; +import io.delta.kernel.data.FilteredColumnarBatch; +import io.delta.kernel.data.Row; +import io.delta.kernel.defaults.*; +import io.delta.kernel.defaults.engine.DefaultEngine; +import io.delta.kernel.engine.Engine; +import io.delta.kernel.internal.InternalScanFileUtils; +import io.delta.kernel.internal.data.ScanStateRow; +import io.delta.kernel.types.StructType; +import io.delta.kernel.utils.CloseableIterator; +import io.delta.kernel.utils.FileStatus; + +public class DeltaTableKernel { + private static final Logger logger = LoggerFactory.getLogger(DeltaTableKernel.class); + + @Test + public void readDeltaKernel() throws IOException { + String myTablePath = + "/Users/vaibhakumar/Desktop/opensource/iceberg/warehouse/demo/nyc/taxis"; // fully qualified + Configuration hadoopConf = new Configuration(); + Engine myEngine = DefaultEngine.create(hadoopConf); + Table myTable = Table.forPath(myEngine, myTablePath); + Snapshot mySnapshot = myTable.getLatestSnapshot(myEngine); + long version = mySnapshot.getVersion(); + StructType tableSchema = mySnapshot.getSchema(); + Scan myScan = mySnapshot.getScanBuilder().build(); + + // Common information about scanning for all data files to read. + Row scanState = myScan.getScanState(myEngine); + + // Information about the list of scan files to read + CloseableIterator fileIter = myScan.getScanFiles(myEngine); + int readRecordCount = 0; + try { + StructType physicalReadSchema = ScanStateRow.getPhysicalDataReadSchema(myEngine, scanState); + while (fileIter.hasNext()) { + FilteredColumnarBatch scanFilesBatch = fileIter.next(); + try (CloseableIterator scanFileRows = scanFilesBatch.getRows()) { + while (scanFileRows.hasNext()) { + Row scanFileRow = scanFileRows.next(); + FileStatus fileStatus = InternalScanFileUtils.getAddFileStatus(scanFileRow); + CloseableIterator physicalDataIter = + myEngine + .getParquetHandler() + .readParquetFiles( + singletonCloseableIterator(fileStatus), + physicalReadSchema, + Optional.empty()); + try (CloseableIterator transformedData = + Scan.transformPhysicalData(myEngine, scanState, scanFileRow, physicalDataIter)) { + while (transformedData.hasNext()) { + FilteredColumnarBatch logicalData = transformedData.next(); + ColumnarBatch dataBatch = logicalData.getData(); + + // access the data for the column at ordinal 0 + ColumnVector column0 = dataBatch.getColumnVector(0); + ColumnVector column1 = dataBatch.getColumnVector(1); + ColumnVector column2 = dataBatch.getColumnVector(2); + ColumnVector column3 = dataBatch.getColumnVector(3); + + for (int rowIndex = 0; rowIndex < column0.getSize(); rowIndex++) { + System.out.println(column0.getInt(rowIndex)); + } + for (int rowIndex = 0; rowIndex < column1.getSize(); rowIndex++) { + System.out.println(column1.getString(rowIndex)); + } + } + } + } + } + } + } catch (IOException e) { + e.printStackTrace(); + System.out.println("IOException occurred: " + e.getMessage()); + } + } +} diff --git a/xtable-core/src/test/java/org/apache/xtable/delta/ITDeltaKernelConversionSource.java b/xtable-core/src/test/java/org/apache/xtable/delta/ITDeltaKernelConversionSource.java new file mode 100644 index 000000000..13ac7a059 --- /dev/null +++ b/xtable-core/src/test/java/org/apache/xtable/delta/ITDeltaKernelConversionSource.java @@ -0,0 +1,448 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.delta; + +import static org.apache.xtable.testutil.ITTestUtils.validateTable; +import static org.junit.jupiter.api.Assertions.*; + +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.stream.Stream; + +import org.apache.hadoop.conf.Configuration; +import org.apache.spark.serializer.KryoSerializer; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.junit.jupiter.api.*; +import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import io.delta.kernel.*; + +import org.apache.xtable.GenericTable; +import org.apache.xtable.TestSparkDeltaTable; +import org.apache.xtable.ValidationTestHelper; +import org.apache.xtable.conversion.SourceTable; +import org.apache.xtable.kernel.DeltaKernelConversionSource; +import org.apache.xtable.model.*; +import org.apache.xtable.model.schema.*; +import org.apache.xtable.model.stat.ColumnStat; +import org.apache.xtable.model.stat.PartitionValue; +import org.apache.xtable.model.stat.Range; +import org.apache.xtable.model.storage.*; +import org.apache.xtable.model.storage.DataLayoutStrategy; +import org.apache.xtable.model.storage.TableFormat; + +public class ITDeltaKernelConversionSource { + private static final InternalField COL1_INT_FIELD = + InternalField.builder() + .name("col1") + .schema( + InternalSchema.builder() + .name("integer") + .dataType(InternalType.INT) + .isNullable(true) + .build()) + .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE) + .build(); + + private static final InternalField COL2_INT_FIELD = + InternalField.builder() + .name("col2") + .schema( + InternalSchema.builder() + .name("integer") + .dataType(InternalType.INT) + .isNullable(true) + .build()) + .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE) + .build(); + + private static final InternalField COL3_STR_FIELD = + InternalField.builder() + .name("col3") + .schema( + InternalSchema.builder() + .name("integer") + .dataType(InternalType.INT) + .isNullable(true) + .build()) + .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE) + .build(); + private static final ColumnStat COL2_COLUMN_STAT = + ColumnStat.builder() + .field(COL2_INT_FIELD) + .range(Range.vector(2, 2)) + .numNulls(0) + .numValues(1) + .totalSize(0) + .build(); + private static final ColumnStat COL1_COLUMN_STAT = + ColumnStat.builder() + .field(COL1_INT_FIELD) + .range(Range.vector(1, 1)) + .numNulls(0) + .numValues(1) + .totalSize(0) + .build(); + + private DeltaKernelConversionSourceProvider conversionSourceProvider; + private static SparkSession sparkSession; + + @BeforeAll + public static void setupOnce() { + sparkSession = + SparkSession.builder() + .appName("TestDeltaTable") + .master("local[4]") + .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") + .config( + "spark.sql.catalog.spark_catalog", + "org.apache.spark.sql.delta.catalog.DeltaCatalog") + .config("spark.databricks.delta.retentionDurationCheck.enabled", "false") + .config("spark.databricks.delta.schema.autoMerge.enabled", "true") + .config("spark.sql.shuffle.partitions", "1") + .config("spark.default.parallelism", "1") + .config("spark.serializer", KryoSerializer.class.getName()) + .getOrCreate(); + } + + @TempDir private static Path tempDir; + + @AfterAll + public static void tearDownSparkSession() { + if (sparkSession != null) { + sparkSession.catalog().clearCache(); + sparkSession.stop(); + } + } + + @BeforeEach + void setUp() { + Configuration hadoopConf = new Configuration(); + hadoopConf.set("fs.defaultFS", "file:///"); + + conversionSourceProvider = new DeltaKernelConversionSourceProvider(); + conversionSourceProvider.init(hadoopConf); + } + + @Test + void getCurrentSnapshotNonPartitionedTest() throws URISyntaxException { + // Table name + final String tableName = GenericTable.getTableName(); + final Path basePath = tempDir.resolve(tableName); + // Create table with a single row using Spark + sparkSession.sql( + "CREATE TABLE `" + + tableName + + "` USING DELTA LOCATION '" + + basePath + + "' AS SELECT * FROM VALUES (1, 2)"); + // Create Delta source + SourceTable tableConfig = + SourceTable.builder() + .name(tableName) + .basePath(basePath.toString()) + .formatName(TableFormat.DELTA) + .build(); + DeltaKernelConversionSource conversionSource = + conversionSourceProvider.getConversionSourceInstance(tableConfig); + // Get current snapshot + InternalSnapshot snapshot = conversionSource.getCurrentSnapshot(); + // Validate table + List fields = Arrays.asList(COL1_INT_FIELD, COL2_INT_FIELD); + validateTable( + snapshot.getTable(), + tableName, + TableFormat.DELTA, + InternalSchema.builder() + .name("struct") + .dataType(InternalType.RECORD) + .fields(fields) + .build(), + DataLayoutStrategy.FLAT, + "file://" + basePath, + snapshot.getTable().getLatestMetadataPath(), + Collections.emptyList()); + // Validate data files + List columnStats = Arrays.asList(COL1_COLUMN_STAT, COL2_COLUMN_STAT); + Assertions.assertEquals(1, snapshot.getPartitionedDataFiles().size()); + + validatePartitionDataFiles( + PartitionFileGroup.builder() + .files( + Collections.singletonList( + InternalDataFile.builder() + .physicalPath("file:/fake/path") + .fileFormat(FileFormat.APACHE_PARQUET) + .partitionValues(Collections.emptyList()) + .fileSizeBytes(716) + .recordCount(1) + .columnStats(columnStats) + .build())) + .partitionValues(Collections.emptyList()) + .build(), + snapshot.getPartitionedDataFiles().get(0)); + } + + @Test + void getCurrentTableTest() { + // Table name + final String tableName = GenericTable.getTableName(); + final Path basePath = tempDir.resolve(tableName); + ; + // Create table with a single row using Spark + sparkSession.sql( + "CREATE TABLE `" + + tableName + + "` USING DELTA LOCATION '" + + basePath + + "' AS SELECT * FROM VALUES (1, 2, 3)"); + // Create Delta source + SourceTable tableConfig = + SourceTable.builder() + .name(tableName) + .basePath(basePath.toString()) + .formatName(TableFormat.DELTA) + .build(); + DeltaKernelConversionSource conversionSource = + conversionSourceProvider.getConversionSourceInstance(tableConfig); + // Get current table + InternalTable internalTable = conversionSource.getCurrentTable(); + List fields = Arrays.asList(COL1_INT_FIELD, COL2_INT_FIELD, COL3_STR_FIELD); + validateTable( + internalTable, + tableName, + TableFormat.DELTA, + InternalSchema.builder() + .name("struct") + .dataType(InternalType.RECORD) + .fields(fields) + .build(), + DataLayoutStrategy.FLAT, + "file://" + basePath, + internalTable.getLatestMetadataPath(), + Collections.emptyList()); + } + + @Test + void getCurrentSnapshotPartitionedTest() throws URISyntaxException { + // Table name + final String tableName = GenericTable.getTableName(); + final Path basePath = tempDir.resolve(tableName); + // Create table with a single row using Spark + sparkSession.sql( + "CREATE TABLE `" + + tableName + + "` USING DELTA PARTITIONED BY (part_col)\n" + + "LOCATION '" + + basePath + + "' AS SELECT 'SingleValue' AS part_col, 1 AS col1, 2 AS col2"); + // Create Delta source + SourceTable tableConfig = + SourceTable.builder() + .name(tableName) + .basePath(basePath.toString()) + .formatName(TableFormat.DELTA) + .build(); + DeltaKernelConversionSource conversionSource = + conversionSourceProvider.getConversionSourceInstance(tableConfig); + // Get current snapshot + InternalSnapshot snapshot = conversionSource.getCurrentSnapshot(); + // Validate table + InternalField partCol = + InternalField.builder() + .name("part_col") + .schema( + InternalSchema.builder() + .name("string") + .dataType(InternalType.STRING) + .isNullable(true) + .build()) + .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE) + .build(); + List fields = Arrays.asList(partCol, COL1_INT_FIELD, COL2_INT_FIELD); + validateTable( + snapshot.getTable(), + tableName, + TableFormat.DELTA, + InternalSchema.builder() + .name("struct") + .dataType(InternalType.RECORD) + .fields(fields) + .build(), + DataLayoutStrategy.HIVE_STYLE_PARTITION, + "file://" + basePath, + snapshot.getTable().getLatestMetadataPath(), + Collections.singletonList( + InternalPartitionField.builder() + .sourceField(partCol) + .transformType(PartitionTransformType.VALUE) + .build())); + // Validate data files + List columnStats = Arrays.asList(COL1_COLUMN_STAT, COL2_COLUMN_STAT); + Assertions.assertEquals(1, snapshot.getPartitionedDataFiles().size()); + List partitionValue = + Collections.singletonList( + PartitionValue.builder() + .partitionField( + InternalPartitionField.builder() + .sourceField(partCol) + .transformType(PartitionTransformType.VALUE) + .build()) + .range(Range.scalar("SingleValue")) + .build()); + validatePartitionDataFiles( + PartitionFileGroup.builder() + .partitionValues(partitionValue) + .files( + Collections.singletonList( + InternalDataFile.builder() + .physicalPath("file:/fake/path") + .fileFormat(FileFormat.APACHE_PARQUET) + .partitionValues(partitionValue) + .fileSizeBytes(716) + .recordCount(1) + .columnStats(columnStats) + .build())) + .build(), + snapshot.getPartitionedDataFiles().get(0)); + } + + @ParameterizedTest + @MethodSource("testWithPartitionToggle") + public void testInsertsUpsertsAndDeletes(boolean isPartitioned) { + String tableName = GenericTable.getTableName(); + TestSparkDeltaTable testSparkDeltaTable = + new TestSparkDeltaTable( + tableName, tempDir, sparkSession, isPartitioned ? "yearOfBirth" : null, false); + // System.out.println("testSparkDeltaTable" + testSparkDeltaTable.getColumnsToSelect()); + List> allActiveFiles = new ArrayList<>(); + List allTableChanges = new ArrayList<>(); + List rows = testSparkDeltaTable.insertRows(50); + Long timestamp1 = testSparkDeltaTable.getLastCommitTimestamp(); + allActiveFiles.add(testSparkDeltaTable.getAllActiveFiles()); + + testSparkDeltaTable.insertRows(50); + allActiveFiles.add(testSparkDeltaTable.getAllActiveFiles()); + + testSparkDeltaTable.upsertRows(rows.subList(0, 20)); + allActiveFiles.add(testSparkDeltaTable.getAllActiveFiles()); + + testSparkDeltaTable.insertRows(50); + allActiveFiles.add(testSparkDeltaTable.getAllActiveFiles()); + + testSparkDeltaTable.insertRows(50); + allActiveFiles.add(testSparkDeltaTable.getAllActiveFiles()); + SourceTable tableConfig = + SourceTable.builder() + .name(testSparkDeltaTable.getTableName()) + .basePath(testSparkDeltaTable.getBasePath()) + .formatName(TableFormat.DELTA) + .build(); + DeltaKernelConversionSource conversionSource = + conversionSourceProvider.getConversionSourceInstance(tableConfig); + assertEquals(200L, testSparkDeltaTable.getNumRows()); + InternalSnapshot internalSnapshot = conversionSource.getCurrentSnapshot(); + + if (isPartitioned) { + validateDeltaPartitioning(internalSnapshot); + } + ValidationTestHelper.validateSnapshot( + internalSnapshot, allActiveFiles.get(allActiveFiles.size() - 1)); + // Get changes in incremental format. + InstantsForIncrementalSync instantsForIncrementalSync = + InstantsForIncrementalSync.builder() + .lastSyncInstant(Instant.ofEpochMilli(timestamp1)) + .build(); + // CommitsBacklog commitsBacklog = + // conversionSource.getCommitsBacklog(instantsForIncrementalSync); + // for (Long version : commitsBacklog.getCommitsToProcess()) { + // TableChange tableChange = conversionSource.getTableChangeForCommit(version); + // allTableChanges.add(tableChange); + // } + // ValidationTestHelper.validateTableChanges(allActiveFiles, allTableChanges); + } + + private void validateDeltaPartitioning(InternalSnapshot internalSnapshot) { + List partitionFields = + internalSnapshot.getTable().getPartitioningFields(); + assertEquals(1, partitionFields.size()); + InternalPartitionField partitionField = partitionFields.get(0); + assertEquals("birthDate", partitionField.getSourceField().getName()); + assertEquals(PartitionTransformType.YEAR, partitionField.getTransformType()); + } + + private void validatePartitionDataFiles( + PartitionFileGroup expectedPartitionFiles, PartitionFileGroup actualPartitionFiles) + throws URISyntaxException { + assertEquals( + expectedPartitionFiles.getPartitionValues(), actualPartitionFiles.getPartitionValues()); + validateDataFiles(expectedPartitionFiles.getDataFiles(), actualPartitionFiles.getDataFiles()); + } + + private void validateDataFiles( + List expectedFiles, List actualFiles) + throws URISyntaxException { + Assertions.assertEquals(expectedFiles.size(), actualFiles.size()); + for (int i = 0; i < expectedFiles.size(); i++) { + InternalDataFile expected = expectedFiles.get(i); + InternalDataFile actual = actualFiles.get(i); + validatePropertiesDataFile(expected, actual); + } + } + + private static Stream testWithPartitionToggle() { + return Stream.of(Arguments.of(false), Arguments.of(true)); + } + + private void validatePropertiesDataFile(InternalDataFile expected, InternalDataFile actual) + throws URISyntaxException { + Assertions.assertTrue( + Paths.get(new URI(actual.getPhysicalPath()).getPath()).isAbsolute(), + () -> "path == " + actual.getPhysicalPath() + " is not absolute"); + Assertions.assertEquals(expected.getFileFormat(), actual.getFileFormat()); + Assertions.assertEquals(expected.getPartitionValues(), actual.getPartitionValues()); + Assertions.assertEquals(expected.getFileSizeBytes(), actual.getFileSizeBytes()); + Assertions.assertEquals(expected.getRecordCount(), actual.getRecordCount()); + Instant now = Instant.now(); + long minRange = now.minus(1, ChronoUnit.HOURS).toEpochMilli(); + long maxRange = now.toEpochMilli(); + Assertions.assertTrue( + actual.getLastModified() > minRange && actual.getLastModified() <= maxRange, + () -> + "last modified == " + + actual.getLastModified() + + " is expected between " + + minRange + + " and " + + maxRange); + Assertions.assertEquals(expected.getColumnStats(), actual.getColumnStats()); + } +} diff --git a/xtable-utilities/src/test/resources/my_config.yaml b/xtable-utilities/src/test/resources/my_config.yaml index 1416c04c2..f0594eb9f 100644 --- a/xtable-utilities/src/test/resources/my_config.yaml +++ b/xtable-utilities/src/test/resources/my_config.yaml @@ -19,6 +19,6 @@ targetFormats: - DELTA datasets: - - tableBasePath: /Desktop/opensource/iceberg/warehouse/demo/nyc/taxis - tableDataPath: /Desktop/opensource/iceberg/warehouse/demo/nyc/taxis/data + tableBasePath: /Users/vaibhakumar/Desktop/opensource/iceberg/warehouse/demo/nyc/taxis + tableDataPath: /Users/vaibhakumar/Desktop/opensource/iceberg/warehouse/demo/nyc/taxis/data tableName: taxis \ No newline at end of file