Skip to content

Commit b0722bb

Browse files
committed
WriterFunction hides engine type
1 parent 54ff177 commit b0722bb

File tree

18 files changed

+141
-224
lines changed

18 files changed

+141
-224
lines changed

core/src/main/java/org/apache/iceberg/avro/Avro.java

Lines changed: 0 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -121,8 +121,6 @@ public static class WriteBuilder
121121
private MetricsConfig metricsConfig;
122122
private Function<Map<String, String>, Context> createContextFunc = Context::dataContext;
123123
private FileContent content;
124-
private Class<?> inputSchemaClass = null;
125-
private Object inputSchema = null;
126124

127125
private WriteBuilder(OutputFile file) {
128126
this.file = file;
@@ -141,24 +139,6 @@ public WriteBuilder schema(org.apache.iceberg.Schema newSchema) {
141139
return this;
142140
}
143141

144-
WriteBuilder inputSchemaClass(Class<?> newInputSchemaClass) {
145-
this.inputSchemaClass = newInputSchemaClass;
146-
return this;
147-
}
148-
149-
@Override
150-
public WriteBuilder inputSchema(Object newInputSchema) {
151-
Preconditions.checkNotNull(
152-
inputSchemaClass, "Input schema class must be set before setting the input schema");
153-
Preconditions.checkArgument(
154-
inputSchemaClass.isInstance(newInputSchema),
155-
"Input schema must be of class: %s, found: %s",
156-
inputSchemaClass.getName(),
157-
newInputSchema.getClass().getName());
158-
this.inputSchema = newInputSchema;
159-
return this;
160-
}
161-
162142
@Override
163143
public WriteBuilder named(String newName) {
164144
this.name = newName;
@@ -249,15 +229,11 @@ public <D> FileAppender<D> build() throws IOException {
249229
case DATA:
250230
Preconditions.checkNotNull(schema, "Schema is required");
251231
Preconditions.checkNotNull(name, "Table name is required and cannot be null");
252-
Preconditions.checkState(writerFunction != null, "Writer function has to be set.");
253-
this.createWriterFunc = avroSchema -> writerFunction.apply(avroSchema, inputSchema);
254232
this.createContextFunc = Context::dataContext;
255233
break;
256234
case EQUALITY_DELETES:
257235
Preconditions.checkNotNull(schema, "Schema is required");
258236
Preconditions.checkNotNull(name, "Table name is required and cannot be null");
259-
Preconditions.checkState(writerFunction != null, "Writer function has to be set.");
260-
this.createWriterFunc = avroSchema -> writerFunction.apply(avroSchema, inputSchema);
261237
this.createContextFunc = Context::deleteContext;
262238
break;
263239
case POSITION_DELETES:

core/src/main/java/org/apache/iceberg/avro/AvroFormatModel.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -67,11 +67,9 @@ public Class<S> schemaType() {
6767
}
6868

6969
@Override
70-
public WriteBuilder writeBuilder(OutputFile outputFile) {
70+
public WriteBuilder writeBuilder(OutputFile outputFile, Schema icebergSchema, S inputSchema) {
7171
return Avro.write(outputFile)
72-
.inputSchemaClass(schemaType)
73-
.writerFunction(
74-
(schema, writerSchemaType) -> writerFunction.apply(schema, (S) writerSchemaType));
72+
.createWriterFunc(schema -> writerFunction.apply(schema, inputSchema));
7573
}
7674

7775
@Override

core/src/main/java/org/apache/iceberg/formats/ContentFileWriteBuilderImpl.java

Lines changed: 92 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,11 @@
2121
import java.io.IOException;
2222
import java.nio.ByteBuffer;
2323
import java.util.List;
24+
import java.util.Map;
2425
import java.util.Objects;
2526
import java.util.stream.Collectors;
2627
import java.util.stream.IntStream;
28+
import org.apache.iceberg.FileContent;
2729
import org.apache.iceberg.FileFormat;
2830
import org.apache.iceberg.Metrics;
2931
import org.apache.iceberg.MetricsConfig;
@@ -34,10 +36,12 @@
3436
import org.apache.iceberg.deletes.EqualityDeleteWriter;
3537
import org.apache.iceberg.deletes.PositionDelete;
3638
import org.apache.iceberg.deletes.PositionDeleteWriter;
39+
import org.apache.iceberg.encryption.EncryptedOutputFile;
3740
import org.apache.iceberg.encryption.EncryptionKeyMetadata;
3841
import org.apache.iceberg.io.DataWriter;
3942
import org.apache.iceberg.io.FileAppender;
4043
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
44+
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
4145

4246
/**
4347
* An internal implementation that handles all {@link ContentFileWriteBuilder} interface variants.
@@ -61,69 +65,77 @@
6165
*/
6266
abstract class ContentFileWriteBuilderImpl<B extends ContentFileWriteBuilder<B>, D, S>
6367
implements ContentFileWriteBuilder<B> {
64-
private final WriteBuilder writeBuilder;
65-
private final String location;
68+
private final FormatModel<D, S> formatModel;
69+
private final EncryptedOutputFile outputFile;
6670
private final FileFormat format;
6771
private PartitionSpec spec = null;
6872
private StructLike partition = null;
6973
private EncryptionKeyMetadata keyMetadata = null;
7074
private SortOrder sortOrder = null;
75+
private Map<String, String> properties = Maps.newHashMap();
76+
private Map<String, String> metadata = Maps.newHashMap();
77+
private MetricsConfig metricsConfig = null;
78+
private boolean overwrite = false;
79+
private ByteBuffer encryptionKey = null;
80+
private ByteBuffer aadPrefix = null;
7181

7282
static <D, S> DataWriteBuilder<D, S> forDataFile(
73-
WriteBuilder writeBuilder, String location, FileFormat format) {
74-
return new DataFileWriteBuilder<>(writeBuilder, location, format);
83+
FormatModel<D, S> formatModel, EncryptedOutputFile outputFile, FileFormat format) {
84+
return new DataFileWriteBuilder<>(formatModel, outputFile, format);
7585
}
7686

7787
static <D, S> EqualityDeleteWriteBuilder<D, S> forEqualityDelete(
78-
WriteBuilder writeBuilder, String location, FileFormat format) {
79-
return new EqualityDeleteFileWriteBuilder<>(writeBuilder, location, format);
88+
FormatModel<D, S> formatModel, EncryptedOutputFile outputFile, FileFormat format) {
89+
return new EqualityDeleteFileWriteBuilder<>(formatModel, outputFile, format);
8090
}
8191

8292
static PositionDeleteWriteBuilder forPositionDelete(
83-
WriteBuilder writeBuilder, String location, FileFormat format) {
84-
return new PositionDeleteFileWriteBuilder(writeBuilder, location, format);
93+
FormatModel<PositionDelete<?>, Object> formatModel,
94+
EncryptedOutputFile outputFile,
95+
FileFormat format) {
96+
return new PositionDeleteFileWriteBuilder(formatModel, outputFile, format);
8597
}
8698

8799
private ContentFileWriteBuilderImpl(
88-
WriteBuilder writeBuilder, String location, FileFormat format) {
89-
this.writeBuilder = writeBuilder;
90-
this.location = location;
100+
FormatModel<D, S> formatModel, EncryptedOutputFile outputFile, FileFormat format) {
101+
this.formatModel = formatModel;
102+
this.outputFile = outputFile;
91103
this.format = format;
92104
}
93105

94106
@Override
95107
public B set(String property, String value) {
96-
writeBuilder.set(property, value);
108+
properties.put(property, value);
97109
return self();
98110
}
99111

100112
@Override
101113
public B meta(String property, String value) {
102-
writeBuilder.meta(property, value);
114+
metadata.put(property, value);
103115
return self();
104116
}
105117

106118
@Override
107-
public B metricsConfig(MetricsConfig metricsConfig) {
108-
writeBuilder.metricsConfig(metricsConfig);
119+
public B metricsConfig(MetricsConfig newMetricsConfig) {
120+
this.metricsConfig = newMetricsConfig;
109121
return self();
110122
}
111123

112124
@Override
113125
public B overwrite() {
114-
writeBuilder.overwrite();
126+
this.overwrite = true;
115127
return self();
116128
}
117129

118130
@Override
119-
public B withFileEncryptionKey(ByteBuffer encryptionKey) {
120-
writeBuilder.withFileEncryptionKey(encryptionKey);
131+
public B withFileEncryptionKey(ByteBuffer newEncryptionKey) {
132+
this.encryptionKey = newEncryptionKey;
121133
return self();
122134
}
123135

124136
@Override
125-
public B withAADPrefix(ByteBuffer aadPrefix) {
126-
writeBuilder.withAADPrefix(aadPrefix);
137+
public B withAADPrefix(ByteBuffer newAadPrefix) {
138+
this.aadPrefix = newAadPrefix;
127139
return self();
128140
}
129141

@@ -151,22 +163,43 @@ public B sortOrder(SortOrder newSortOrder) {
151163
return self();
152164
}
153165

166+
private void init(WriteBuilder writeBuilder) {
167+
if (metricsConfig != null) {
168+
writeBuilder.metricsConfig(metricsConfig);
169+
}
170+
if (overwrite) {
171+
writeBuilder.overwrite();
172+
}
173+
if (encryptionKey != null) {
174+
writeBuilder.withFileEncryptionKey(encryptionKey);
175+
}
176+
if (aadPrefix != null) {
177+
writeBuilder.withAADPrefix(aadPrefix);
178+
}
179+
180+
writeBuilder.setAll(properties).meta(metadata);
181+
}
182+
154183
private static class DataFileWriteBuilder<D, S>
155184
extends ContentFileWriteBuilderImpl<DataWriteBuilder<D, S>, D, S>
156185
implements DataWriteBuilder<D, S> {
157-
private DataFileWriteBuilder(WriteBuilder writeBuilder, String location, FileFormat format) {
158-
super(writeBuilder, location, format);
186+
private Schema schema = null;
187+
private S inputSchema = null;
188+
189+
private DataFileWriteBuilder(
190+
FormatModel<D, S> formatModel, EncryptedOutputFile outputFile, FileFormat format) {
191+
super(formatModel, outputFile, format);
159192
}
160193

161194
@Override
162-
public DataFileWriteBuilder<D, S> schema(Schema schema) {
163-
super.writeBuilder.schema(schema);
195+
public DataFileWriteBuilder<D, S> schema(Schema newSchema) {
196+
this.schema = newSchema;
164197
return this;
165198
}
166199

167200
@Override
168-
public DataFileWriteBuilder<D, S> inputSchema(S schema) {
169-
super.writeBuilder.inputSchema(schema);
201+
public DataFileWriteBuilder<D, S> inputSchema(S newInputSchema) {
202+
this.inputSchema = newInputSchema;
170203
return this;
171204
}
172205

@@ -182,10 +215,16 @@ public DataWriter<D> build() throws IOException {
182215
super.spec.isUnpartitioned() || super.partition != null,
183216
"Partition must not be null when creating data writer for partitioned spec");
184217

218+
WriteBuilder writeBuilder =
219+
super.formatModel
220+
.writeBuilder(super.outputFile.encryptingOutputFile(), schema, inputSchema)
221+
.schema(schema)
222+
.content(FileContent.DATA);
223+
super.init(writeBuilder);
185224
return new DataWriter<>(
186-
super.writeBuilder.build(),
225+
writeBuilder.build(),
187226
super.format,
188-
super.location,
227+
super.outputFile.encryptingOutputFile().location(),
189228
super.spec,
190229
super.partition,
191230
super.keyMetadata,
@@ -196,23 +235,18 @@ public DataWriter<D> build() throws IOException {
196235
private static class EqualityDeleteFileWriteBuilder<D, S>
197236
extends ContentFileWriteBuilderImpl<EqualityDeleteWriteBuilder<D, S>, D, S>
198237
implements EqualityDeleteWriteBuilder<D, S> {
238+
private S inputSchema = null;
199239
private Schema rowSchema = null;
200240
private int[] equalityFieldIds = null;
201241

202242
private EqualityDeleteFileWriteBuilder(
203-
WriteBuilder writeBuilder, String location, FileFormat format) {
204-
super(writeBuilder, location, format);
205-
}
206-
207-
@Override
208-
public EqualityDeleteFileWriteBuilder<D, S> schema(Schema schema) {
209-
super.writeBuilder.schema(schema);
210-
return this;
243+
FormatModel<D, S> formatModel, EncryptedOutputFile outputFile, FileFormat format) {
244+
super(formatModel, outputFile, format);
211245
}
212246

213247
@Override
214248
public EqualityDeleteFileWriteBuilder<D, S> inputSchema(S schema) {
215-
super.writeBuilder.inputSchema(schema);
249+
this.inputSchema = schema;
216250
return this;
217251
}
218252

@@ -245,8 +279,15 @@ public EqualityDeleteWriter<D> build() throws IOException {
245279
super.spec.isUnpartitioned() || super.partition != null,
246280
"Partition must not be null for partitioned writes");
247281

282+
WriteBuilder writeBuilder =
283+
super.formatModel
284+
.writeBuilder(super.outputFile.encryptingOutputFile(), rowSchema, inputSchema)
285+
.schema(rowSchema)
286+
.content(FileContent.EQUALITY_DELETES);
287+
super.init(writeBuilder);
288+
248289
return new EqualityDeleteWriter<>(
249-
super.writeBuilder
290+
writeBuilder
250291
.schema(rowSchema)
251292
.meta("delete-type", "equality")
252293
.meta(
@@ -256,7 +297,7 @@ public EqualityDeleteWriter<D> build() throws IOException {
256297
.collect(Collectors.joining(", ")))
257298
.build(),
258299
super.format,
259-
super.location,
300+
super.outputFile.encryptingOutputFile().location(),
260301
super.spec,
261302
super.partition,
262303
super.keyMetadata,
@@ -270,8 +311,10 @@ private static class PositionDeleteFileWriteBuilder
270311
implements PositionDeleteWriteBuilder {
271312

272313
private PositionDeleteFileWriteBuilder(
273-
WriteBuilder writeBuilder, String location, FileFormat format) {
274-
super(writeBuilder, location, format);
314+
FormatModel<PositionDelete<?>, Object> formatModel,
315+
EncryptedOutputFile outputFile,
316+
FileFormat format) {
317+
super(formatModel, outputFile, format);
275318
}
276319

277320
@Override
@@ -287,11 +330,16 @@ public PositionDeleteWriter<?> build() throws IOException {
287330
super.spec.isUnpartitioned() || super.partition != null,
288331
"Partition must not be null for partitioned writes");
289332

333+
WriteBuilder writeBuilder =
334+
super.formatModel
335+
.writeBuilder(super.outputFile.encryptingOutputFile(), null, null)
336+
.content(FileContent.POSITION_DELETES);
337+
super.init(writeBuilder);
338+
290339
return new PositionDeleteWriter<>(
291-
new PositionDeleteFileAppender(
292-
super.writeBuilder.meta("delete-type", "position").build()),
340+
new PositionDeleteFileAppender(writeBuilder.meta("delete-type", "position").build()),
293341
super.format,
294-
super.location,
342+
super.outputFile.encryptingOutputFile().location(),
295343
super.spec,
296344
super.partition,
297345
super.keyMetadata);

core/src/main/java/org/apache/iceberg/formats/EqualityDeleteWriteBuilder.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,12 +40,9 @@
4040
public interface EqualityDeleteWriteBuilder<D, S>
4141
extends ContentFileWriteBuilder<EqualityDeleteWriteBuilder<D, S>> {
4242

43-
/** Set the file schema. */
44-
EqualityDeleteWriteBuilder<D, S> schema(Schema schema);
45-
4643
/**
4744
* Sets the input schema accepted by the writer. If not provided derived from the {@link
48-
* #schema(Schema)}.
45+
* #rowSchema(Schema)}.
4946
*/
5047
EqualityDeleteWriteBuilder<D, S> inputSchema(S schema);
5148

core/src/main/java/org/apache/iceberg/formats/FormatModel.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.iceberg.formats;
2020

2121
import org.apache.iceberg.FileFormat;
22+
import org.apache.iceberg.Schema;
2223
import org.apache.iceberg.io.InputFile;
2324
import org.apache.iceberg.io.OutputFile;
2425

@@ -71,7 +72,7 @@ public interface FormatModel<D, S> {
7172
* @param outputFile destination for the written data
7273
* @return configured writer builder
7374
*/
74-
WriteBuilder writeBuilder(OutputFile outputFile);
75+
WriteBuilder writeBuilder(OutputFile outputFile, Schema icebergSchema, S engineSchema);
7576

7677
/**
7778
* Creates a file reader builder for the specified input file.

0 commit comments

Comments
 (0)