Skip to content

Commit 15f6123

Browse files
author
Zac Sanchez
committed
Support configuring avro compression in ManifestLists
The iceberg table properties are never actually propagated to Avro writers in Iceberg. As a result of this, the configuration for Avro compression can never actually be altered, and all avro files are compressed with gzip. In this change, let's fix support adding table properties to ManifestList avro files. Change-Id: I6596db6724dea13f5979c9380232f23dcdc3cfef Reviewed-on: https://gerrit.trading.imc.intra/c/data-engineering/iceberg/+/660815 Tested-by: Teamcity Reviewed-by: Shiva Ganapathy <shiva.ganapathy@imc.com> Reviewed-by: Lorenzo Siega Battel <lorenzo.siegabattel@imc.com> Reviewed-by: Alexander Borodin <alex.borodin@imc.com> Static-Analysis: Teamcity
1 parent bd1b890 commit 15f6123

13 files changed

+136
-32
lines changed

core/src/jmh/java/org/apache/iceberg/ManifestReadBenchmark.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,8 @@ public void before() {
7979
0,
8080
1L,
8181
0,
82-
0L)) {
82+
0L,
83+
Map.of())) {
8384
for (int i = 0; i < NUM_FILES; i++) {
8485
OutputFile manifestFile =
8586
org.apache.iceberg.Files.localOutput(

core/src/jmh/java/org/apache/iceberg/ManifestWriteBenchmark.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,8 @@ public void writeManifestFile(BenchmarkState state) throws IOException {
108108
0,
109109
1L,
110110
0,
111-
0L)) {
111+
0L,
112+
Map.of())) {
112113
for (int i = 0; i < NUM_FILES; i++) {
113114
OutputFile manifestFile =
114115
org.apache.iceberg.Files.localOutput(

core/src/main/java/org/apache/iceberg/InternalData.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,20 @@ public interface WriteBuilder {
114114
*/
115115
WriteBuilder set(String property, String value);
116116

117+
/**
118+
* Set a writer configuration properties from a Map.
119+
*
120+
* <p>Write configuration affects writer behavior. To add file metadata properties, use {@link
121+
* #meta(String, String)}.
122+
*
123+
* @param tableProperties a map of writer config properties
124+
* @return this for method chaining
125+
*/
126+
default WriteBuilder set(Map<String, String> tableProperties) {
127+
tableProperties.forEach(this::set);
128+
return this;
129+
}
130+
117131
/**
118132
* Set a file metadata property.
119133
*

core/src/main/java/org/apache/iceberg/ManifestListWriter.java

Lines changed: 34 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,10 @@ abstract class ManifestListWriter implements FileAppender<ManifestFile> {
3838
private final OutputFile outputFile;
3939

4040
private ManifestListWriter(
41-
OutputFile file, EncryptionManager encryptionManager, Map<String, String> meta) {
41+
OutputFile file,
42+
EncryptionManager encryptionManager,
43+
Map<String, String> meta,
44+
Map<String, String> tableProperties) {
4245
if (encryptionManager instanceof StandardEncryptionManager) {
4346
// ability to encrypt the manifest list key is introduced for standard encryption.
4447
this.standardEncryptionManager = (StandardEncryptionManager) encryptionManager;
@@ -50,14 +53,13 @@ private ManifestListWriter(
5053
this.outputFile = file;
5154
this.manifestListKeyMetadata = null;
5255
}
53-
54-
this.writer = newAppender(outputFile, meta);
56+
this.writer = newAppender(outputFile, meta, tableProperties);
5557
}
5658

5759
protected abstract ManifestFile prepare(ManifestFile manifest);
5860

5961
protected abstract FileAppender<ManifestFile> newAppender(
60-
OutputFile file, Map<String, String> meta);
62+
OutputFile file, Map<String, String> meta, Map<String, String> tableProps);
6163

6264
@Override
6365
public void add(ManifestFile manifest) {
@@ -114,7 +116,8 @@ static class V4Writer extends ManifestListWriter {
114116
long snapshotId,
115117
Long parentSnapshotId,
116118
long sequenceNumber,
117-
long firstRowId) {
119+
long firstRowId,
120+
Map<String, String> tableProperties) {
118121
super(
119122
snapshotFile,
120123
encryptionManager,
@@ -123,7 +126,8 @@ static class V4Writer extends ManifestListWriter {
123126
"parent-snapshot-id", String.valueOf(parentSnapshotId),
124127
"sequence-number", String.valueOf(sequenceNumber),
125128
"first-row-id", String.valueOf(firstRowId),
126-
"format-version", "4"));
129+
"format-version", "4"),
130+
tableProperties);
127131
this.wrapper = new V4Metadata.ManifestFileWrapper(snapshotId, sequenceNumber);
128132
this.nextRowId = firstRowId;
129133
}
@@ -143,12 +147,14 @@ protected ManifestFile prepare(ManifestFile manifest) {
143147
}
144148

145149
@Override
146-
protected FileAppender<ManifestFile> newAppender(OutputFile file, Map<String, String> meta) {
150+
protected FileAppender<ManifestFile> newAppender(
151+
OutputFile file, Map<String, String> meta, Map<String, String> tableProperties) {
147152
try {
148153
return InternalData.write(FileFormat.AVRO, file)
149154
.schema(V4Metadata.MANIFEST_LIST_SCHEMA)
150155
.named("manifest_file")
151156
.meta(meta)
157+
.set(tableProperties)
152158
.overwrite()
153159
.build();
154160

@@ -174,7 +180,8 @@ static class V3Writer extends ManifestListWriter {
174180
long snapshotId,
175181
Long parentSnapshotId,
176182
long sequenceNumber,
177-
long firstRowId) {
183+
long firstRowId,
184+
Map<String, String> tableProperties) {
178185
super(
179186
snapshotFile,
180187
encryptionManager,
@@ -183,7 +190,8 @@ static class V3Writer extends ManifestListWriter {
183190
"parent-snapshot-id", String.valueOf(parentSnapshotId),
184191
"sequence-number", String.valueOf(sequenceNumber),
185192
"first-row-id", String.valueOf(firstRowId),
186-
"format-version", "3"));
193+
"format-version", "3"),
194+
tableProperties);
187195
this.wrapper = new V3Metadata.ManifestFileWrapper(snapshotId, sequenceNumber);
188196
this.nextRowId = firstRowId;
189197
}
@@ -203,12 +211,14 @@ protected ManifestFile prepare(ManifestFile manifest) {
203211
}
204212

205213
@Override
206-
protected FileAppender<ManifestFile> newAppender(OutputFile file, Map<String, String> meta) {
214+
protected FileAppender<ManifestFile> newAppender(
215+
OutputFile file, Map<String, String> meta, Map<String, String> tableProperties) {
207216
try {
208217
return InternalData.write(FileFormat.AVRO, file)
209218
.schema(V3Metadata.MANIFEST_LIST_SCHEMA)
210219
.named("manifest_file")
211220
.meta(meta)
221+
.set(tableProperties)
212222
.overwrite()
213223
.build();
214224

@@ -232,15 +242,17 @@ static class V2Writer extends ManifestListWriter {
232242
EncryptionManager encryptionManager,
233243
long snapshotId,
234244
Long parentSnapshotId,
235-
long sequenceNumber) {
245+
long sequenceNumber,
246+
Map<String, String> tableProperties) {
236247
super(
237248
snapshotFile,
238249
encryptionManager,
239250
ImmutableMap.of(
240251
"snapshot-id", String.valueOf(snapshotId),
241252
"parent-snapshot-id", String.valueOf(parentSnapshotId),
242253
"sequence-number", String.valueOf(sequenceNumber),
243-
"format-version", "2"));
254+
"format-version", "2"),
255+
tableProperties);
244256
this.wrapper = new V2Metadata.ManifestFileWrapper(snapshotId, sequenceNumber);
245257
}
246258

@@ -250,12 +262,14 @@ protected ManifestFile prepare(ManifestFile manifest) {
250262
}
251263

252264
@Override
253-
protected FileAppender<ManifestFile> newAppender(OutputFile file, Map<String, String> meta) {
265+
protected FileAppender<ManifestFile> newAppender(
266+
OutputFile file, Map<String, String> meta, Map<String, String> tableProperties) {
254267
try {
255268
return InternalData.write(FileFormat.AVRO, file)
256269
.schema(V2Metadata.MANIFEST_LIST_SCHEMA)
257270
.named("manifest_file")
258271
.meta(meta)
272+
.set(tableProperties)
259273
.overwrite()
260274
.build();
261275

@@ -273,14 +287,16 @@ static class V1Writer extends ManifestListWriter {
273287
OutputFile snapshotFile,
274288
EncryptionManager encryptionManager,
275289
long snapshotId,
276-
Long parentSnapshotId) {
290+
Long parentSnapshotId,
291+
Map<String, String> tableProperties) {
277292
super(
278293
snapshotFile,
279294
encryptionManager,
280295
ImmutableMap.of(
281296
"snapshot-id", String.valueOf(snapshotId),
282297
"parent-snapshot-id", String.valueOf(parentSnapshotId),
283-
"format-version", "1"));
298+
"format-version", "1"),
299+
tableProperties);
284300
}
285301

286302
@Override
@@ -292,12 +308,14 @@ protected ManifestFile prepare(ManifestFile manifest) {
292308
}
293309

294310
@Override
295-
protected FileAppender<ManifestFile> newAppender(OutputFile file, Map<String, String> meta) {
311+
protected FileAppender<ManifestFile> newAppender(
312+
OutputFile file, Map<String, String> meta, Map<String, String> tableProperties) {
296313
try {
297314
return InternalData.write(FileFormat.AVRO, file)
298315
.schema(V1Metadata.MANIFEST_LIST_SCHEMA)
299316
.named("manifest_file")
300317
.meta(meta)
318+
.set(tableProperties)
301319
.overwrite()
302320
.build();
303321

core/src/main/java/org/apache/iceberg/ManifestLists.java

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import java.io.IOException;
2222
import java.util.List;
23+
import java.util.Map;
2324
import org.apache.iceberg.encryption.EncryptionManager;
2425
import org.apache.iceberg.exceptions.RuntimeIOException;
2526
import org.apache.iceberg.io.CloseableIterable;
@@ -55,34 +56,42 @@ static ManifestListWriter write(
5556
long snapshotId,
5657
Long parentSnapshotId,
5758
long sequenceNumber,
58-
Long firstRowId) {
59+
Long firstRowId,
60+
Map<String, String> tableProperties) {
5961
switch (formatVersion) {
6062
case 1:
6163
Preconditions.checkArgument(
6264
sequenceNumber == TableMetadata.INITIAL_SEQUENCE_NUMBER,
6365
"Invalid sequence number for v1 manifest list: %s",
6466
sequenceNumber);
6567
return new ManifestListWriter.V1Writer(
66-
manifestListFile, encryptionManager, snapshotId, parentSnapshotId);
68+
manifestListFile, encryptionManager, snapshotId, parentSnapshotId, tableProperties);
6769
case 2:
6870
return new ManifestListWriter.V2Writer(
69-
manifestListFile, encryptionManager, snapshotId, parentSnapshotId, sequenceNumber);
71+
manifestListFile,
72+
encryptionManager,
73+
snapshotId,
74+
parentSnapshotId,
75+
sequenceNumber,
76+
tableProperties);
7077
case 3:
7178
return new ManifestListWriter.V3Writer(
7279
manifestListFile,
7380
encryptionManager,
7481
snapshotId,
7582
parentSnapshotId,
7683
sequenceNumber,
77-
firstRowId);
84+
firstRowId,
85+
tableProperties);
7886
case 4:
7987
return new ManifestListWriter.V4Writer(
8088
manifestListFile,
8189
encryptionManager,
8290
snapshotId,
8391
parentSnapshotId,
8492
sequenceNumber,
85-
firstRowId);
93+
firstRowId,
94+
tableProperties);
8695
}
8796
throw new UnsupportedOperationException(
8897
"Cannot write manifest list for table version: " + formatVersion);

core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -289,7 +289,8 @@ public static RewriteResult<ManifestFile> rewriteManifestList(
289289
snapshot.snapshotId(),
290290
snapshot.parentId(),
291291
snapshot.sequenceNumber(),
292-
snapshot.firstRowId())) {
292+
snapshot.firstRowId(),
293+
tableMetadata.properties())) {
293294

294295
for (ManifestFile file : manifestFiles) {
295296
ManifestFile newFile = file.copy();

core/src/main/java/org/apache/iceberg/SnapshotProducer.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -270,7 +270,8 @@ public Snapshot apply() {
270270
snapshotId(),
271271
parentSnapshotId,
272272
sequenceNumber,
273-
base.nextRowId());
273+
base.nextRowId(),
274+
ops.current().properties());
274275

275276
try (writer) {
276277
// keep track of the manifest lists created

core/src/test/java/org/apache/iceberg/TestManifestListEncryption.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,8 @@ private ManifestFile writeAndReadEncryptedManifestList(EncryptionManager em) thr
217217
SNAPSHOT_ID,
218218
SNAPSHOT_ID - 1,
219219
SEQ_NUM,
220-
SNAPSHOT_FIRST_ROW_ID);
220+
SNAPSHOT_FIRST_ROW_ID,
221+
Map.of());
221222
writer.add(TEST_MANIFEST);
222223
writer.close();
223224
ManifestListFile manifestListFile = writer.toManifestListFile();

core/src/test/java/org/apache/iceberg/TestManifestListVersions.java

Lines changed: 54 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,16 @@
2727
import java.nio.file.Path;
2828
import java.util.Collection;
2929
import java.util.List;
30+
import java.util.Map;
3031
import org.apache.avro.AvroRuntimeException;
3132
import org.apache.avro.generic.GenericData;
3233
import org.apache.avro.generic.GenericRecord;
3334
import org.apache.avro.generic.GenericRecordBuilder;
3435
import org.apache.iceberg.avro.Avro;
36+
import org.apache.iceberg.avro.AvroIterable;
3537
import org.apache.iceberg.avro.AvroSchemaUtil;
3638
import org.apache.iceberg.encryption.PlaintextEncryptionManager;
39+
import org.apache.iceberg.exceptions.RuntimeIOException;
3740
import org.apache.iceberg.inmemory.InMemoryOutputFile;
3841
import org.apache.iceberg.io.CloseableIterable;
3942
import org.apache.iceberg.io.FileAppender;
@@ -445,12 +448,45 @@ public void testManifestsPartitionSummary(int formatVersion) throws IOException
445448
assertThat(second.upperBound()).isEqualTo(secondSummaryUpperBound);
446449
}
447450

451+
@ParameterizedTest
452+
@FieldSource("org.apache.iceberg.TestHelpers#ALL_VERSIONS")
453+
public void testOverrideManifestAvroCompression(int formatVersion) throws IOException {
454+
InputFile nullCompressedFile =
455+
writeManifestList(
456+
formatVersion,
457+
SNAPSHOT_FIRST_ROW_ID,
458+
Map.of("write.avro.compression-codec", "uncompressed"),
459+
TEST_MANIFEST);
460+
AvroIterable<?> nullCompressedManifestFile = readManifestFile(nullCompressedFile);
461+
assertThat(nullCompressedManifestFile.getMetadata().get("avro.codec")).isEqualTo("null");
462+
nullCompressedManifestFile.close();
463+
464+
InputFile snappyCompressedFile =
465+
writeManifestList(
466+
formatVersion,
467+
SNAPSHOT_FIRST_ROW_ID,
468+
Map.of("write.avro.compression-codec", "snappy"),
469+
TEST_MANIFEST);
470+
AvroIterable<?> snappyCompressedManifestFile = readManifestFile(snappyCompressedFile);
471+
assertThat(snappyCompressedManifestFile.getMetadata().get("avro.codec")).isEqualTo("snappy");
472+
snappyCompressedManifestFile.close();
473+
}
474+
448475
private InputFile writeManifestList(ManifestFile manifest, int formatVersion) throws IOException {
449-
return writeManifestList(formatVersion, SNAPSHOT_FIRST_ROW_ID, manifest);
476+
return writeManifestList(formatVersion, SNAPSHOT_FIRST_ROW_ID, Map.of(), manifest);
450477
}
451478

452479
private InputFile writeManifestList(
453480
int formatVersion, long expectedNextRowId, ManifestFile... manifests) throws IOException {
481+
return writeManifestList(formatVersion, expectedNextRowId, Map.of(), manifests);
482+
}
483+
484+
private InputFile writeManifestList(
485+
int formatVersion,
486+
long expectedNextRowId,
487+
Map<String, String> tableProperties,
488+
ManifestFile... manifests)
489+
throws IOException {
454490
OutputFile outputFile = new InMemoryOutputFile();
455491
ManifestListWriter writer =
456492
ManifestLists.write(
@@ -460,7 +496,8 @@ private InputFile writeManifestList(
460496
SNAPSHOT_ID,
461497
SNAPSHOT_ID - 1,
462498
formatVersion > 1 ? SEQ_NUM : 0,
463-
SNAPSHOT_FIRST_ROW_ID);
499+
SNAPSHOT_FIRST_ROW_ID,
500+
tableProperties);
464501

465502
try (writer) {
466503
for (ManifestFile manifest : manifests) {
@@ -498,4 +535,19 @@ private void assertEmptyAvroField(GenericRecord record, String field) {
498535
.isInstanceOf(AvroRuntimeException.class)
499536
.hasMessage("Not a valid schema field: " + field);
500537
}
538+
539+
private static AvroIterable<?> readManifestFile(InputFile inputFile) {
540+
try (AvroIterable<?> manifestListFile =
541+
(AvroIterable<?>)
542+
InternalData.read(FileFormat.AVRO, inputFile)
543+
.setRootType(GenericManifestFile.class)
544+
.setCustomType(
545+
ManifestFile.PARTITION_SUMMARIES_ELEMENT_ID, GenericPartitionFieldSummary.class)
546+
.project(ManifestFile.schema())
547+
.build()) {
548+
return manifestListFile;
549+
} catch (IOException e) {
550+
throw new RuntimeIOException(e, "Cannot read manifest list file: %s", inputFile.location());
551+
}
552+
}
501553
}

0 commit comments

Comments
 (0)