Skip to content

Commit 1c42e6e

Browse files
author
Zac Sanchez
committed
Support configuring avro compression in Manifests
Change-Id: I7db2ee9d599f1af28ac61da00c1a6c31691d4975 Reviewed-on: https://gerrit.trading.imc.intra/c/data-engineering/iceberg/+/660816 Reviewed-by: Shiva Ganapathy <shiva.ganapathy@imc.com> Tested-by: Teamcity Reviewed-by: Lorenzo Siega Battel <lorenzo.siegabattel@imc.com> Static-Analysis: Teamcity Reviewed-by: Alexander Borodin <alex.borodin@imc.com>
1 parent 15f6123 commit 1c42e6e

File tree

16 files changed

+535
-167
lines changed

16 files changed

+535
-167
lines changed

.palantir/revapi.yml

Lines changed: 181 additions & 97 deletions
Large diffs are not rendered by default.

core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,8 @@ private ManifestFile copyManifest(ManifestFile manifest) {
163163
specsById,
164164
newFile,
165165
snapshotId(),
166-
summaryBuilder);
166+
summaryBuilder,
167+
current.properties());
167168
}
168169

169170
@Override

core/src/main/java/org/apache/iceberg/FastAppend.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,8 @@ private ManifestFile copyManifest(ManifestFile manifest) {
140140
current.specsById(),
141141
newManifestFile,
142142
snapshotId(),
143-
summaryBuilder);
143+
summaryBuilder,
144+
current.properties());
144145
}
145146

146147
@Override

core/src/main/java/org/apache/iceberg/ManifestFiles.java

Lines changed: 136 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,22 @@ public static ManifestWriter<DataFile> write(PartitionSpec spec, OutputFile outp
153153
return write(1, spec, outputFile, null);
154154
}
155155

156+
/**
157+
* Create a new {@link ManifestWriter}.
158+
*
159+
* <p>Manifests created by this writer have all entry snapshot IDs set to null. All entries will
160+
* inherit the snapshot ID that will be assigned to the manifest on commit.
161+
*
162+
* @param spec {@link PartitionSpec} used to produce {@link DataFile} partition tuples
163+
* @param outputFile the destination file location
164+
* @param tableProperties a map of table properties
165+
* @return a manifest writer
166+
*/
167+
public static ManifestWriter<DataFile> write(
168+
PartitionSpec spec, OutputFile outputFile, Map<String, String> tableProperties) {
169+
return write(1, spec, outputFile, null, tableProperties);
170+
}
171+
156172
/**
157173
* Create a new {@link ManifestWriter} for the given format version.
158174
*
@@ -165,7 +181,35 @@ public static ManifestWriter<DataFile> write(PartitionSpec spec, OutputFile outp
165181
public static ManifestWriter<DataFile> write(
166182
int formatVersion, PartitionSpec spec, OutputFile outputFile, Long snapshotId) {
167183
return write(
168-
formatVersion, spec, EncryptedFiles.plainAsEncryptedOutput(outputFile), snapshotId);
184+
formatVersion,
185+
spec,
186+
EncryptedFiles.plainAsEncryptedOutput(outputFile),
187+
snapshotId,
188+
Map.of());
189+
}
190+
191+
/**
192+
* Create a new {@link ManifestWriter} for the given format version.
193+
*
194+
* @param formatVersion a target format version
195+
* @param spec a {@link PartitionSpec}
196+
* @param outputFile an {@link OutputFile} where the manifest will be written
197+
* @param snapshotId a snapshot ID for the manifest entries, or null for an inherited
198+
* @param tableProperties a map of table properties
199+
* @return a manifest writer
200+
*/
201+
public static ManifestWriter<DataFile> write(
202+
int formatVersion,
203+
PartitionSpec spec,
204+
OutputFile outputFile,
205+
Long snapshotId,
206+
Map<String, String> tableProperties) {
207+
return write(
208+
formatVersion,
209+
spec,
210+
EncryptedFiles.plainAsEncryptedOutput(outputFile),
211+
snapshotId,
212+
tableProperties);
169213
}
170214

171215
/**
@@ -182,7 +226,26 @@ public static ManifestWriter<DataFile> write(
182226
PartitionSpec spec,
183227
EncryptedOutputFile encryptedOutputFile,
184228
Long snapshotId) {
185-
return newWriter(formatVersion, spec, encryptedOutputFile, snapshotId, null);
229+
return newWriter(formatVersion, spec, encryptedOutputFile, snapshotId, null, Map.of());
230+
}
231+
232+
/**
233+
* Create a new {@link ManifestWriter} for the given format version.
234+
*
235+
* @param formatVersion a target format version
236+
* @param spec a {@link PartitionSpec}
237+
* @param encryptedOutputFile an {@link EncryptedOutputFile} where the manifest will be written
238+
* @param snapshotId a snapshot ID for the manifest entries, or null for an inherited ID
239+
* @param tableProperties a map of table properties
240+
* @return a manifest writer
241+
*/
242+
public static ManifestWriter<DataFile> write(
243+
int formatVersion,
244+
PartitionSpec spec,
245+
EncryptedOutputFile encryptedOutputFile,
246+
Long snapshotId,
247+
Map<String, String> tableProperties) {
248+
return newWriter(formatVersion, spec, encryptedOutputFile, snapshotId, null, tableProperties);
186249
}
187250

188251
/**
@@ -200,16 +263,19 @@ static ManifestWriter<DataFile> newWriter(
200263
PartitionSpec spec,
201264
EncryptedOutputFile encryptedOutputFile,
202265
Long snapshotId,
203-
Long firstRowId) {
266+
Long firstRowId,
267+
Map<String, String> tableProperties) {
204268
switch (formatVersion) {
205269
case 1:
206-
return new ManifestWriter.V1Writer(spec, encryptedOutputFile, snapshotId);
270+
return new ManifestWriter.V1Writer(spec, encryptedOutputFile, snapshotId, tableProperties);
207271
case 2:
208-
return new ManifestWriter.V2Writer(spec, encryptedOutputFile, snapshotId);
272+
return new ManifestWriter.V2Writer(spec, encryptedOutputFile, snapshotId, tableProperties);
209273
case 3:
210-
return new ManifestWriter.V3Writer(spec, encryptedOutputFile, snapshotId, firstRowId);
274+
return new ManifestWriter.V3Writer(
275+
spec, encryptedOutputFile, snapshotId, firstRowId, tableProperties);
211276
case 4:
212-
return new ManifestWriter.V4Writer(spec, encryptedOutputFile, snapshotId, firstRowId);
277+
return new ManifestWriter.V4Writer(
278+
spec, encryptedOutputFile, snapshotId, firstRowId, tableProperties);
213279
}
214280
throw new UnsupportedOperationException(
215281
"Cannot write manifest for table version: " + formatVersion);
@@ -247,7 +313,35 @@ public static ManifestReader<DeleteFile> readDeleteManifest(
247313
public static ManifestWriter<DeleteFile> writeDeleteManifest(
248314
int formatVersion, PartitionSpec spec, OutputFile outputFile, Long snapshotId) {
249315
return writeDeleteManifest(
250-
formatVersion, spec, EncryptedFiles.plainAsEncryptedOutput(outputFile), snapshotId);
316+
formatVersion,
317+
spec,
318+
EncryptedFiles.plainAsEncryptedOutput(outputFile),
319+
snapshotId,
320+
Map.of());
321+
}
322+
323+
/**
324+
* Create a new {@link ManifestWriter} for the given format version.
325+
*
326+
* @param formatVersion a target format version
327+
* @param spec a {@link PartitionSpec}
328+
* @param outputFile an {@link OutputFile} where the manifest will be written
329+
* @param snapshotId a snapshot ID for the manifest entries, or null for an inherited ID
330+
* @param tableProperties a map of table properties
331+
* @return a manifest writer
332+
*/
333+
public static ManifestWriter<DeleteFile> writeDeleteManifest(
334+
int formatVersion,
335+
PartitionSpec spec,
336+
OutputFile outputFile,
337+
Long snapshotId,
338+
Map<String, String> tableProperties) {
339+
return writeDeleteManifest(
340+
formatVersion,
341+
spec,
342+
EncryptedFiles.plainAsEncryptedOutput(outputFile),
343+
snapshotId,
344+
tableProperties);
251345
}
252346

253347
/**
@@ -261,15 +355,34 @@ public static ManifestWriter<DeleteFile> writeDeleteManifest(
261355
*/
262356
public static ManifestWriter<DeleteFile> writeDeleteManifest(
263357
int formatVersion, PartitionSpec spec, EncryptedOutputFile outputFile, Long snapshotId) {
358+
return writeDeleteManifest(formatVersion, spec, outputFile, snapshotId, Map.of());
359+
}
360+
361+
/**
362+
* Create a new {@link ManifestWriter} for the given format version.
363+
*
364+
* @param formatVersion a target format version
365+
* @param spec a {@link PartitionSpec}
366+
* @param outputFile an {@link EncryptedOutputFile} where the manifest will be written
367+
* @param snapshotId a snapshot ID for the manifest entries, or null for an inherited ID
368+
* @param tableProperties a map of table properties
369+
* @return a manifest writer
370+
*/
371+
public static ManifestWriter<DeleteFile> writeDeleteManifest(
372+
int formatVersion,
373+
PartitionSpec spec,
374+
EncryptedOutputFile outputFile,
375+
Long snapshotId,
376+
Map<String, String> tableProperties) {
264377
switch (formatVersion) {
265378
case 1:
266379
throw new IllegalArgumentException("Cannot write delete files in a v1 table");
267380
case 2:
268-
return new ManifestWriter.V2DeleteWriter(spec, outputFile, snapshotId);
381+
return new ManifestWriter.V2DeleteWriter(spec, outputFile, snapshotId, tableProperties);
269382
case 3:
270-
return new ManifestWriter.V3DeleteWriter(spec, outputFile, snapshotId);
383+
return new ManifestWriter.V3DeleteWriter(spec, outputFile, snapshotId, tableProperties);
271384
case 4:
272-
return new ManifestWriter.V4DeleteWriter(spec, outputFile, snapshotId);
385+
return new ManifestWriter.V4DeleteWriter(spec, outputFile, snapshotId, tableProperties);
273386
}
274387
throw new UnsupportedOperationException(
275388
"Cannot write manifest for table version: " + formatVersion);
@@ -323,7 +436,8 @@ static ManifestFile copyAppendManifest(
323436
Map<Integer, PartitionSpec> specsById,
324437
EncryptedOutputFile outputFile,
325438
long snapshotId,
326-
SnapshotSummary.Builder summaryBuilder) {
439+
SnapshotSummary.Builder summaryBuilder,
440+
Map<String, String> tableProperties) {
327441
// use metadata that will add the current snapshot's ID for the rewrite
328442
// read first_row_id as null because this copies the incoming manifest before commit
329443
InheritableMetadata inheritableMetadata = InheritableMetadataFactory.forCopy(snapshotId);
@@ -337,7 +451,8 @@ static ManifestFile copyAppendManifest(
337451
outputFile,
338452
snapshotId,
339453
summaryBuilder,
340-
ManifestEntry.Status.ADDED);
454+
ManifestEntry.Status.ADDED,
455+
tableProperties);
341456
} catch (IOException e) {
342457
throw new RuntimeIOException(e, "Failed to close manifest: %s", toCopy.location());
343458
}
@@ -351,7 +466,8 @@ static ManifestFile copyRewriteManifest(
351466
Map<Integer, PartitionSpec> specsById,
352467
EncryptedOutputFile outputFile,
353468
long snapshotId,
354-
SnapshotSummary.Builder summaryBuilder) {
469+
SnapshotSummary.Builder summaryBuilder,
470+
Map<String, String> tableProperties) {
355471
// for a rewritten manifest all snapshot ids should be set. use empty metadata to throw an
356472
// exception if it is not
357473
InheritableMetadata inheritableMetadata = InheritableMetadataFactory.empty();
@@ -365,7 +481,8 @@ static ManifestFile copyRewriteManifest(
365481
outputFile,
366482
snapshotId,
367483
summaryBuilder,
368-
ManifestEntry.Status.EXISTING);
484+
ManifestEntry.Status.EXISTING,
485+
tableProperties);
369486
} catch (IOException e) {
370487
throw new RuntimeIOException(e, "Failed to close manifest: %s", toCopy.location());
371488
}
@@ -379,9 +496,11 @@ private static ManifestFile copyManifestInternal(
379496
EncryptedOutputFile outputFile,
380497
long snapshotId,
381498
SnapshotSummary.Builder summaryBuilder,
382-
ManifestEntry.Status allowedEntryStatus) {
499+
ManifestEntry.Status allowedEntryStatus,
500+
Map<String, String> tableProperties) {
383501
ManifestWriter<DataFile> writer =
384-
newWriter(formatVersion, reader.spec(), outputFile, snapshotId, firstRowId);
502+
newWriter(
503+
formatVersion, reader.spec(), outputFile, snapshotId, firstRowId, tableProperties);
385504
boolean threw = true;
386505
try {
387506
for (ManifestEntry<DataFile> entry : reader.entries()) {

0 commit comments

Comments
 (0)