Skip to content

Commit 751c888

Browse files
committed
[Improvement] Implement manifest-driven data expiration
1 parent b4cba4b commit 751c888

File tree

11 files changed

+1786
-670
lines changed

11 files changed

+1786
-670
lines changed

amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/DataExpirationProcessor.java

Lines changed: 900 additions & 0 deletions
Large diffs are not rendered by default.
Lines changed: 294 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,294 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.amoro.server.optimizing.maintainer;
20+
21+
import static org.apache.amoro.iceberg.Constants.INVALID_SNAPSHOT_ID;
22+
import static org.apache.amoro.server.optimizing.maintainer.IcebergTableMaintainer.AMORO_MAINTAIN_COMMITS;
23+
24+
import org.apache.amoro.api.CommitMetaProducer;
25+
import org.apache.amoro.config.DataExpirationConfig;
26+
import org.apache.amoro.formats.iceberg.IcebergTable;
27+
import org.apache.amoro.optimizing.scan.IcebergTableFileScanHelper;
28+
import org.apache.amoro.optimizing.scan.TableFileScanHelper;
29+
import org.apache.amoro.server.table.TableConfigurations;
30+
import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
31+
import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
32+
import org.apache.amoro.shade.guava32.com.google.common.collect.Sets;
33+
import org.apache.iceberg.ContentFile;
34+
import org.apache.iceberg.DataFile;
35+
import org.apache.iceberg.DeleteFile;
36+
import org.apache.iceberg.DeleteFiles;
37+
import org.apache.iceberg.ManifestFile;
38+
import org.apache.iceberg.RewriteFiles;
39+
import org.apache.iceberg.Snapshot;
40+
import org.apache.iceberg.StructLike;
41+
import org.apache.iceberg.Table;
42+
import org.apache.iceberg.expressions.Expression;
43+
import org.apache.iceberg.io.CloseableIterable;
44+
import org.apache.iceberg.io.CloseableIterator;
45+
import org.apache.iceberg.types.Types;
46+
import org.slf4j.Logger;
47+
import org.slf4j.LoggerFactory;
48+
49+
import java.io.IOException;
50+
import java.time.*;
51+
import java.util.*;
52+
53+
/**
54+
* Data expiration processor for Iceberg tables.
55+
*
56+
* <p>It supports expiring data files and delete files based on a specified expiration field and
57+
* level.
58+
*
59+
* <p>The expiration field requires to be a partition field if the expiration level is PARTITION.
60+
*
61+
* <p>The expiration field can be a non-partition field if the expiration level is FILE.
62+
*
63+
* <p>The expiration field can be of type Timestamp, Long (epoch millis/seconds) or String
64+
* (formatted date time).
65+
*/
66+
public class IcebergExpirationProcessor extends DataExpirationProcessor {
67+
private static final Logger LOG = LoggerFactory.getLogger(IcebergExpirationProcessor.class);
68+
69+
private final Table table;
70+
private final Types.NestedField expirationField;
71+
72+
public IcebergExpirationProcessor(Table table, DataExpirationConfig expirationConfig) {
73+
super(
74+
expirationConfig,
75+
dataFile -> {
76+
if (table instanceof IcebergTable) {
77+
return dataFile.path();
78+
} else {
79+
return dataFile.copyWithoutStats();
80+
}
81+
});
82+
this.table = table;
83+
this.expirationField = table.schema().findField(expirationConfig.getExpirationField());
84+
TableConfigurations.validateExpirationField(expirationField, table.name());
85+
}
86+
87+
@Override
88+
public void process(Instant expireInstant) {
89+
Snapshot currentSnapshot = table.currentSnapshot();
90+
if (currentSnapshot == null || currentSnapshot.snapshotId() == INVALID_SNAPSHOT_ID) {
91+
LOG.debug("Table {} has no valid snapshot, skip data expiration", table.name());
92+
return;
93+
}
94+
95+
Map<String, String> snapshotSummary = currentSnapshot.summary();
96+
if (snapshotSummary.values().stream().anyMatch(AMORO_MAINTAIN_COMMITS::contains)) {
97+
LOG.debug(
98+
"{}'s last snapshot {} was maintained, there are no incremental changes, skip data expiration",
99+
table.name(),
100+
currentSnapshot.snapshotId());
101+
return;
102+
}
103+
104+
long expireTimestamp = expireInstant.toEpochMilli();
105+
DataExpirationConfig.ExpireLevel level = config.getExpirationLevel();
106+
if (level == DataExpirationConfig.ExpireLevel.PARTITION && !table.spec().isPartitioned()) {
107+
throw new UnsupportedOperationException(
108+
"Partition-level expiration is not supported for unpartitioned table " + table.name());
109+
}
110+
111+
try {
112+
if (level == DataExpirationConfig.ExpireLevel.FILE) {
113+
expireTableFiles(expireTimestamp, currentSnapshot);
114+
} else if (level == DataExpirationConfig.ExpireLevel.PARTITION) {
115+
expireTablePartitions(expireTimestamp, currentSnapshot);
116+
}
117+
} catch (Exception e) {
118+
LOG.error(
119+
"Failed to expire data files in table {} with level {}, before {}",
120+
table.name(),
121+
level,
122+
expireTimestamp,
123+
e);
124+
throw new RuntimeException(
125+
"Failed to expire data files in table " + table.name() + "(level=" + level + ")", e);
126+
}
127+
}
128+
129+
private void expireTableFiles(long expireTimestamp, Snapshot currentSnapshot) throws IOException {
130+
Expression filter = buildExpirationFilter(expireTimestamp);
131+
IcebergTableFileScanHelper scanHelper =
132+
new IcebergTableFileScanHelper(table, currentSnapshot.snapshotId());
133+
long expiredFileCount = 0L;
134+
135+
List<Object> dataFilesToExpire = Lists.newArrayList();
136+
List<DeleteFile> deleteFilesToExpire = Lists.newArrayList();
137+
try (CloseableIterable<TableFileScanHelper.FileScanResult> iterable =
138+
scanHelper.withPartitionFilter(filter).scan()) {
139+
try (CloseableIterator<TableFileScanHelper.FileScanResult> iterator = iterable.iterator()) {
140+
while (iterator.hasNext()) {
141+
TableFileScanHelper.FileScanResult result = iterator.next();
142+
dataFilesToExpire.add(dataFileDeleteFunc.apply(result.file()));
143+
List<ContentFile<?>> deleteFiles = result.deleteFiles();
144+
deleteFiles.forEach(d -> deleteFilesToExpire.add(((DeleteFile) d).copyWithoutStats()));
145+
expiredFileCount += 1 + deleteFiles.size();
146+
147+
if (exceedMaxSize(expiredFileCount)) {
148+
break;
149+
}
150+
}
151+
}
152+
}
153+
154+
commitExpiration(dataFilesToExpire, deleteFilesToExpire, "null", currentSnapshot.snapshotId());
155+
}
156+
157+
private void expireTablePartitions(long expireTimestamp, Snapshot currentSnapshot)
158+
throws IOException {
159+
List<ManifestFile> manifestFiles = currentSnapshot.allManifests(table.io());
160+
if (manifestFiles.isEmpty()) {
161+
LOG.debug("No manifests found in table {}, skip data expiration", table.name());
162+
return;
163+
}
164+
165+
ManifestsCollection manifestsCollection = buildManifestsIndex(manifestFiles, expireTimestamp);
166+
if (manifestsCollection.totalSize() == 0) {
167+
LOG.info(
168+
"No candidate manifests found <= {} in the table {}, skip data expiration",
169+
expireTimestamp,
170+
table.name());
171+
return;
172+
}
173+
174+
Map<StructLike, List<Object>> dataFilesToExpire = Maps.newHashMap();
175+
Map<StructLike, List<DeleteFile>> deleteFilesToExpire = Maps.newHashMap();
176+
long expiredFileCount = 0L;
177+
collectExpiredFiles(
178+
table,
179+
manifestsCollection,
180+
dataFilesToExpire,
181+
deleteFilesToExpire,
182+
expireTimestamp,
183+
expiredFileCount);
184+
185+
// expire data files and delete files partition by partition
186+
Set<StructLike> dataPartitions = dataFilesToExpire.keySet();
187+
Set<StructLike> deletePartitions = deleteFilesToExpire.keySet();
188+
Set<StructLike> allPartitions = Sets.union(dataPartitions, deletePartitions);
189+
for (StructLike partition : allPartitions) {
190+
List<Object> dataFiles = dataFilesToExpire.getOrDefault(partition, Collections.emptyList());
191+
List<DeleteFile> deleteFiles =
192+
deleteFilesToExpire.getOrDefault(partition, Collections.emptyList());
193+
String partitionString = table.spec().partitionToPath(partition);
194+
commitExpiration(dataFiles, deleteFiles, partitionString, currentSnapshot.snapshotId());
195+
}
196+
}
197+
198+
ManifestsCollection buildManifestsIndex(List<ManifestFile> manifestFiles, long expireTimestamp) {
199+
ManifestsCollection manifestsCollection = new ManifestsCollection(manifestFiles.size(), config);
200+
// Find the partition field that matches the expiration field
201+
Optional<PartitionFieldInfo> partitionFieldOp =
202+
findFieldInSpec(table.spec(), expirationField.fieldId());
203+
204+
if (partitionFieldOp.isEmpty()) {
205+
LOG.warn(
206+
"Expiration field: {} is not used for partitioning, cannot extract from manifest",
207+
expirationField.name());
208+
throw new UnsupportedOperationException(
209+
"Expiration field " + expirationField.name() + " is not used for partitioning");
210+
}
211+
212+
PartitionFieldInfo partitionFieldInfo = partitionFieldOp.get();
213+
214+
for (ManifestFile manifestFile : manifestFiles) {
215+
int manifestSpecId = manifestFile.partitionSpecId();
216+
if (manifestSpecId != partitionFieldInfo.specId) {
217+
// manifest's spec is different from current table spec, try to find the field in manifest's
218+
// spec
219+
Optional<PartitionFieldInfo> manifestFieldOp =
220+
findFieldInSpec(table.specs().get(manifestSpecId), expirationField.fieldId());
221+
if (manifestFieldOp.isEmpty()) {
222+
// manifest's spec does not have the expiration field, skip it
223+
continue;
224+
}
225+
226+
partitionFieldInfo = manifestFieldOp.get();
227+
}
228+
229+
List<ManifestFile.PartitionFieldSummary> partitionSummaries = manifestFile.partitions();
230+
ManifestFile.PartitionFieldSummary summary = partitionSummaries.get(partitionFieldInfo.index);
231+
PartitionRange partitionRange =
232+
new PartitionRange(summary, partitionFieldInfo.field, expirationField.type(), config);
233+
if (partitionRange.lowerBoundGt(expireTimestamp)) {
234+
// this manifest's partition range is all greater than expire timestamp, skip it
235+
continue;
236+
}
237+
238+
ManifestFileWrapper wrapper = new ManifestFileWrapper(manifestFile, partitionRange);
239+
manifestsCollection.add(wrapper, expireTimestamp);
240+
}
241+
242+
LOG.info(
243+
"Found {} candidate manifests that less than or equal to expire timestamp {} in table {}",
244+
manifestsCollection.totalSize(),
245+
expireTimestamp,
246+
table.name());
247+
248+
return manifestsCollection;
249+
}
250+
251+
private void commitExpiration(
252+
List<Object> dataFiles, List<DeleteFile> deleteFiles, String partition, long snapshotId) {
253+
if (dataFiles.isEmpty() && deleteFiles.isEmpty()) {
254+
LOG.info("No files to expire in table {}", table.name());
255+
return;
256+
}
257+
258+
// expire data files
259+
DeleteFiles delete = table.newDelete();
260+
dataFiles.forEach(
261+
d -> {
262+
if (d instanceof String) {
263+
delete.deleteFile((String) d);
264+
} else if (d instanceof DataFile) {
265+
delete.deleteFile((DataFile) d);
266+
}
267+
});
268+
delete.set(
269+
org.apache.amoro.op.SnapshotSummary.SNAPSHOT_PRODUCER,
270+
CommitMetaProducer.DATA_EXPIRATION.name());
271+
delete.commit();
272+
// expire delete files
273+
if (!deleteFiles.isEmpty()) {
274+
RewriteFiles rewriteFiles = table.newRewrite().validateFromSnapshot(snapshotId);
275+
deleteFiles.forEach(rewriteFiles::deleteFile);
276+
rewriteFiles.set(
277+
org.apache.amoro.op.SnapshotSummary.SNAPSHOT_PRODUCER,
278+
CommitMetaProducer.DATA_EXPIRATION.name());
279+
rewriteFiles.commit();
280+
}
281+
282+
LOG.info(
283+
"Expired {} {} data files and {} delete files{}",
284+
table.name(),
285+
dataFiles.size(),
286+
deleteFiles.size(),
287+
partition.equals("null") ? "" : ", in the partition: " + partition);
288+
}
289+
290+
@Override
291+
public Types.NestedField expirationField() {
292+
return expirationField;
293+
}
294+
}

0 commit comments

Comments
 (0)