-
Notifications
You must be signed in to change notification settings - Fork 182
Delta Kernel Draft PR #729
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 12 commits
dfea6a4
b75bc7c
16134b3
3929e95
c6379b5
6deb5f7
05d9984
c7ba4b9
0ff36a5
18ab9d6
e906091
e00241c
3fdfd31
e0102e3
381722a
809bfe8
40172f2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -53,7 +53,7 @@ | |
<module>xtable-utilities</module> | ||
<module>xtable-aws</module> | ||
<module>xtable-hive-metastore</module> | ||
<module>xtable-service</module> | ||
<!-- <module>xtable-service</module>--> | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This should be added back, any reason why you had to comment this out? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @vaibhavk1992 I think if you rebase with latest main branch you shouldn't see those failures. |
||
</modules> | ||
|
||
<properties> | ||
|
@@ -713,7 +713,7 @@ | |
</executions> | ||
<configuration> | ||
<skip>${skipUTs}</skip> | ||
<redirectTestOutputToFile>true</redirectTestOutputToFile> | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why revert this? |
||
<redirectTestOutputToFile>false</redirectTestOutputToFile> | ||
<trimStackTrace>false</trimStackTrace> | ||
<forkedProcessExitTimeoutInSeconds>120</forkedProcessExitTimeoutInSeconds> | ||
</configuration> | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -110,6 +110,19 @@ | |
<scope>test</scope> | ||
</dependency> | ||
|
||
<dependency> | ||
<groupId>io.delta</groupId> | ||
<artifactId>delta-kernel-api</artifactId> | ||
<version>4.0.0</version> | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you add a properly in the root pom called <delta.kernel.version>4.0.0</delta.kernel.version>, instead of using the hardcoded value? Also curious how you ended up choosing delta kernel version, is there some specific version that needs to align with delta lake version we have in the repo? |
||
</dependency> | ||
|
||
<dependency> | ||
<groupId>io.delta</groupId> | ||
<artifactId>delta-kernel-defaults</artifactId> | ||
<version>4.0.0</version> | ||
</dependency> | ||
|
||
|
||
<!-- Hadoop dependencies --> | ||
<dependency> | ||
<groupId>org.apache.hadoop</groupId> | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,158 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.xtable.delta; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's move new DeltaKernel* classes to org.apache.xtable.kernel? |
||
|
||
import static org.apache.xtable.delta.DeltaActionsConverter.getFullPathToFile; | ||
|
||
import java.util.Collections; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.Optional; | ||
|
||
import lombok.AccessLevel; | ||
import lombok.NoArgsConstructor; | ||
|
||
import org.apache.hadoop.conf.Configuration; | ||
import org.apache.hadoop.fs.Path; | ||
|
||
import scala.collection.JavaConverters; | ||
|
||
import io.delta.kernel.Snapshot; | ||
import io.delta.kernel.Table; | ||
import io.delta.kernel.defaults.engine.DefaultEngine; | ||
import io.delta.kernel.engine.Engine; | ||
import io.delta.kernel.types.*; | ||
import io.delta.kernel.utils.DataFileStatus; | ||
import io.delta.kernel.utils.FileStatus; | ||
|
||
import org.apache.xtable.exception.NotSupportedException; | ||
import org.apache.xtable.model.schema.InternalField; | ||
import org.apache.xtable.model.schema.InternalPartitionField; | ||
import org.apache.xtable.model.stat.ColumnStat; | ||
import org.apache.xtable.model.stat.FileStats; | ||
import org.apache.xtable.model.storage.FileFormat; | ||
import org.apache.xtable.model.storage.InternalDataFile; | ||
|
||
@NoArgsConstructor(access = AccessLevel.PRIVATE) | ||
public class DeltaKernelActionsConverter { | ||
private static final DeltaKernelActionsConverter INSTANCE = new DeltaKernelActionsConverter(); | ||
|
||
public static DeltaKernelActionsConverter getInstance() { | ||
return INSTANCE; | ||
} | ||
|
||
public InternalDataFile convertAddActionToInternalDataFile( | ||
FileStatus addFile, | ||
Snapshot deltaSnapshot, | ||
FileFormat fileFormat, | ||
List<InternalPartitionField> partitionFields, | ||
List<InternalField> fields, | ||
boolean includeColumnStats, | ||
DeltaKernelPartitionExtractor partitionExtractor, | ||
DeltaKernelStatsExtractor fileStatsExtractor, | ||
Map<String, String> partitionValues) { | ||
DataFileStatus dataFileStatus = | ||
new DataFileStatus( | ||
addFile.getPath(), | ||
addFile.getModificationTime(), | ||
addFile.getSize(), | ||
Optional.empty() // or Optional.empty() if not available | ||
); | ||
System.out.println("dataFileStatus:" + dataFileStatus); | ||
FileStats fileStats = fileStatsExtractor.getColumnStatsForFile(dataFileStatus, fields); | ||
System.out.println("fileStats:" + fileStats); | ||
List<ColumnStat> columnStats = | ||
includeColumnStats ? fileStats.getColumnStats() : Collections.emptyList(); | ||
long recordCount = fileStats.getNumRecords(); | ||
Configuration hadoopConf = new Configuration(); | ||
Engine myEngine = DefaultEngine.create(hadoopConf); | ||
Table myTable = Table.forPath(myEngine, addFile.getPath()); | ||
// The immutable map from Java to Scala is not working, need to | ||
scala.collection.mutable.Map<String, String> scalaMap = | ||
JavaConverters.mapAsScalaMap(partitionValues); | ||
|
||
return InternalDataFile.builder() | ||
.physicalPath(getFullPathToFile(deltaSnapshot, addFile.getPath(), myTable)) | ||
.fileFormat(fileFormat) | ||
.fileSizeBytes(addFile.getSize()) | ||
.lastModified(addFile.getModificationTime()) | ||
.partitionValues(partitionExtractor.partitionValueExtraction(scalaMap, partitionFields)) | ||
.columnStats(columnStats) | ||
.recordCount(recordCount) | ||
.build(); | ||
} | ||
|
||
// | ||
// public InternalDataFile convertRemoveActionToInternalDataFile( | ||
// RemoveFile removeFile, | ||
// Snapshot deltaSnapshot, | ||
// FileFormat fileFormat, | ||
// List<InternalPartitionField> partitionFields, | ||
// DeltaPartitionExtractor partitionExtractor) { | ||
// return InternalDataFile.builder() | ||
// .physicalPath(getFullPathToFile(deltaSnapshot, removeFile.path())) | ||
// .fileFormat(fileFormat) | ||
// .partitionValues( | ||
// partitionExtractor.partitionValueExtraction( | ||
// removeFile.partitionValues(), partitionFields)) | ||
// .build(); | ||
// } | ||
|
||
public FileFormat convertToFileFormat(String provider) { | ||
if (provider.equals("parquet")) { | ||
return FileFormat.APACHE_PARQUET; | ||
} else if (provider.equals("orc")) { | ||
return FileFormat.APACHE_ORC; | ||
} | ||
throw new NotSupportedException( | ||
String.format("delta file format %s is not recognized", provider)); | ||
} | ||
|
||
static String getFullPathToFile(Snapshot snapshot, String dataFilePath, Table myTable) { | ||
Configuration hadoopConf = new Configuration(); | ||
Engine myEngine = DefaultEngine.create(hadoopConf); | ||
|
||
String tableBasePath = myTable.getPath(myEngine); | ||
// String tableBasePath = snapshot.dataPath().toUri().toString(); | ||
if (dataFilePath.startsWith(tableBasePath)) { | ||
return dataFilePath; | ||
} | ||
return tableBasePath + Path.SEPARATOR + dataFilePath; | ||
} | ||
|
||
/** | ||
* Extracts the representation of the deletion vector information corresponding to an AddFile | ||
* action. Currently, this method extracts and returns the path to the data file for which a | ||
* deletion vector data is present. | ||
* | ||
* @param snapshot the commit snapshot | ||
* @param addFile the add file action | ||
* @return the deletion vector representation (path of data file), or null if no deletion vector | ||
* is present | ||
*/ | ||
// public String extractDeletionVectorFile(Snapshot snapshot, AddFile addFile) { | ||
// DeletionVectorDescriptor deletionVector = addFile.deletionVector(); | ||
// if (deletionVector == null) { | ||
// return null; | ||
// } | ||
// | ||
// String dataFilePath = addFile.path(); | ||
// return getFullPathToFile(snapshot, dataFilePath); | ||
// } | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,42 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.xtable.delta; | ||
|
||
import org.apache.hadoop.conf.Configuration; | ||
|
||
import io.delta.kernel.defaults.engine.DefaultEngine; | ||
import io.delta.kernel.engine.Engine; | ||
|
||
import org.apache.xtable.conversion.ConversionSourceProvider; | ||
import org.apache.xtable.conversion.SourceTable; | ||
import org.apache.xtable.kernel.DeltaKernelConversionSource; | ||
|
||
public class DeltaKernelConversionSourceProvider extends ConversionSourceProvider<Long> { | ||
@Override | ||
public DeltaKernelConversionSource getConversionSourceInstance(SourceTable sourceTable) { | ||
Configuration hadoopConf = new Configuration(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there any reason why you are creating a new hadoopConf, can you instead use the hadoopConf from the parent class similar to what |
||
Engine engine = DefaultEngine.create(hadoopConf); | ||
// DeltaTable deltaTable = DeltaT/able.forPath(sourceTable.getBasePath()); | ||
return DeltaKernelConversionSource.builder() | ||
.tableName(sourceTable.getName()) | ||
.basePath(sourceTable.getBasePath()) | ||
.engine(engine) | ||
.build(); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,152 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.xtable.delta; | ||
|
||
// import scala.collection.Map; | ||
import java.util.*; | ||
import java.util.stream.Collectors; | ||
|
||
import lombok.Builder; | ||
|
||
import org.apache.hadoop.conf.Configuration; | ||
|
||
import io.delta.kernel.Scan; | ||
import io.delta.kernel.Snapshot; | ||
import io.delta.kernel.data.FilteredColumnarBatch; | ||
import io.delta.kernel.data.Row; | ||
import io.delta.kernel.defaults.engine.DefaultEngine; | ||
import io.delta.kernel.engine.Engine; | ||
import io.delta.kernel.internal.InternalScanFileUtils; | ||
import io.delta.kernel.internal.SnapshotImpl; | ||
import io.delta.kernel.types.StructField; | ||
import io.delta.kernel.types.StructType; | ||
import io.delta.kernel.utils.CloseableIterator; | ||
import io.delta.kernel.utils.FileStatus; | ||
|
||
import org.apache.xtable.model.schema.InternalField; | ||
import org.apache.xtable.model.schema.InternalPartitionField; | ||
import org.apache.xtable.model.schema.InternalSchema; | ||
import org.apache.xtable.model.storage.FileFormat; | ||
import org.apache.xtable.model.storage.InternalDataFile; | ||
import org.apache.xtable.spi.extractor.DataFileIterator; | ||
|
||
/** DeltaDataFileExtractor lets the consumer iterate over partitions. */ | ||
@Builder | ||
public class DeltaKernelDataFileExtractor { | ||
|
||
@Builder.Default | ||
private final DeltaKernelPartitionExtractor partitionExtractor = | ||
DeltaKernelPartitionExtractor.getInstance(); | ||
|
||
@Builder.Default | ||
private final DeltaKernelStatsExtractor fileStatsExtractor = | ||
DeltaKernelStatsExtractor.getInstance(); | ||
|
||
@Builder.Default | ||
private final DeltaKernelActionsConverter actionsConverter = | ||
DeltaKernelActionsConverter.getInstance(); | ||
|
||
private final String basePath; | ||
|
||
/** | ||
* Initializes an iterator for Delta Lake files. | ||
* | ||
* @return Delta table file iterator | ||
*/ | ||
public DataFileIterator iterator(Snapshot deltaSnapshot, InternalSchema schema) { | ||
return new DeltaDataFileIterator(deltaSnapshot, schema, true); | ||
} | ||
|
||
public class DeltaDataFileIterator implements DataFileIterator { | ||
private final FileFormat fileFormat; | ||
private final List<InternalField> fields; | ||
private final List<InternalPartitionField> partitionFields; | ||
private Iterator<InternalDataFile> dataFilesIterator = Collections.emptyIterator(); | ||
|
||
private DeltaDataFileIterator( | ||
Snapshot snapshot, InternalSchema schema, boolean includeColumnStats) { | ||
String provider = ((SnapshotImpl) snapshot).getMetadata().getFormat().getProvider(); | ||
this.fileFormat = actionsConverter.convertToFileFormat(provider); | ||
|
||
this.fields = schema.getFields(); | ||
|
||
StructType fullSchema = snapshot.getSchema(); // The full table schema | ||
List<String> partitionColumns = snapshot.getPartitionColumnNames(); // List<String> | ||
|
||
List<StructField> partitionFields_strfld = | ||
fullSchema.fields().stream() | ||
.filter(field -> partitionColumns.contains(field.getName())) | ||
.collect(Collectors.toList()); | ||
|
||
StructType partitionSchema = new StructType(partitionFields_strfld); | ||
|
||
this.partitionFields = | ||
partitionExtractor.convertFromDeltaPartitionFormat(schema, partitionSchema); | ||
Configuration hadoopConf = new Configuration(); | ||
Engine engine = DefaultEngine.create(hadoopConf); | ||
|
||
Scan myScan = snapshot.getScanBuilder().build(); | ||
CloseableIterator<FilteredColumnarBatch> scanFiles = myScan.getScanFiles(engine); | ||
vaibhavk1992 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
this.dataFilesIterator = | ||
Collections | ||
.emptyIterator(); // Initialize the dataFilesIterator by iterating over the scan files | ||
while (scanFiles.hasNext()) { | ||
FilteredColumnarBatch scanFileColumnarBatch = scanFiles.next(); | ||
CloseableIterator<Row> scanFileRows = scanFileColumnarBatch.getRows(); | ||
while (scanFileRows.hasNext()) { | ||
Row scanFileRow = scanFileRows.next(); | ||
vaibhavk1992 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
// From the scan file row, extract the file path, size and modification time metadata | ||
// needed to read the file. | ||
FileStatus fileStatus = InternalScanFileUtils.getAddFileStatus(scanFileRow); | ||
Map<String, String> partitionValues = | ||
InternalScanFileUtils.getPartitionValues(scanFileRow); | ||
// Convert the FileStatus to InternalDataFile using the actionsConverter | ||
System.out.println("Calling the ActionToInternalDataFile"); | ||
this.dataFilesIterator = | ||
Collections.singletonList( | ||
actionsConverter.convertAddActionToInternalDataFile( | ||
fileStatus, | ||
snapshot, | ||
fileFormat, | ||
partitionFields, | ||
fields, | ||
includeColumnStats, | ||
partitionExtractor, | ||
fileStatsExtractor, | ||
partitionValues)) | ||
.iterator(); | ||
} | ||
} | ||
} | ||
|
||
@Override | ||
public void close() throws Exception {} | ||
|
||
@Override | ||
public boolean hasNext() { | ||
return this.dataFilesIterator.hasNext(); | ||
} | ||
|
||
@Override | ||
public InternalDataFile next() { | ||
return dataFilesIterator.next(); | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why comment this?