Skip to content

Commit 7313451

Browse files
committed
feat(plugin): add new RegexRouterCleanupPolicy
This commit contains some minor internal changes
1 parent 7851c21 commit 7313451

File tree

10 files changed

+332
-13
lines changed

10 files changed

+332
-13
lines changed
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Copyright 2023 StreamThoughts.
3+
*
4+
* Licensed to the Apache Software Foundation (ASF) under one or more
5+
* contributor license agreements. See the NOTICE file distributed with
6+
* this work for additional information regarding copyright ownership.
7+
* The ASF licenses this file to You under the Apache License, Version 2.0
8+
* (the "License"); you may not use this file except in compliance with
9+
* the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package io.streamthoughts.kafka.connect.filepulse.config;
20+
21+
import org.apache.kafka.common.config.AbstractConfig;
22+
import org.apache.kafka.common.config.ConfigDef;
23+
24+
import java.util.Map;
25+
26+
public class SimpleConfig extends AbstractConfig {
27+
28+
public SimpleConfig(ConfigDef configDef, Map<?, ?> originals) {
29+
super(configDef, originals, false);
30+
}
31+
32+
}

connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/FileSystemMonitor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ public interface FileSystemMonitor {
3838
void invoke(final ConnectorContext context);
3939

4040
/**
41-
* Enables or disables the the file-listing process either temporarily or permanently.
41+
* Enables or disables the file-listing process either temporarily or permanently.
4242
* In other words, if disabled then {@link #listFilesToSchedule()} will always return an empty list.
4343
*
4444
* @param enabled is the file-listing process enabled.

connect-file-pulse-dataformat/src/main/java/io/streamthoughts/kafka/connect/filepulse/xml/XMLNodeToStructConverter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ public XMLNodeToStructConverter setTypeInferenceEnabled(final boolean isTypeInfe
134134
}
135135

136136
/**
137-
* Converts the given {@link Node} object tree into a new new {@link TypedStruct} instance.
137+
* Converts the given {@link Node} object tree into a new {@link TypedStruct} instance.
138138
*
139139
* @param node the {@link Node} object tree to convert.
140140
* @return the new {@link TypedStruct} instance.

connect-file-pulse-filesystems/filepulse-local-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/LocalFSDirectoryListing.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import io.streamthoughts.kafka.connect.filepulse.errors.ConnectFilePulseException;
2222
import io.streamthoughts.kafka.connect.filepulse.fs.codec.CodecHandler;
2323
import io.streamthoughts.kafka.connect.filepulse.fs.codec.CodecManager;
24-
import io.streamthoughts.kafka.connect.filepulse.fs.reader.LocalFileStorage;
2524
import io.streamthoughts.kafka.connect.filepulse.source.FileObjectMeta;
2625
import io.streamthoughts.kafka.connect.filepulse.source.LocalFileObjectMeta;
2726
import org.apache.kafka.connect.errors.ConnectException;
Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,8 @@
1616
* See the License for the specific language governing permissions and
1717
* limitations under the License.
1818
*/
19-
package io.streamthoughts.kafka.connect.filepulse.fs.reader;
19+
package io.streamthoughts.kafka.connect.filepulse.fs;
2020

21-
import io.streamthoughts.kafka.connect.filepulse.fs.Storage;
2221
import io.streamthoughts.kafka.connect.filepulse.source.FileObjectMeta;
2322
import io.streamthoughts.kafka.connect.filepulse.source.LocalFileObjectMeta;
2423
import org.slf4j.Logger;
@@ -85,15 +84,15 @@ public boolean move(final URI source, final URI dest) {
8584
final Path sourcePath = Paths.get(source);
8685
final Path destPath = Paths.get(dest);
8786
try {
88-
LOG.info("Moving file {} to {}", source, dest);
87+
LOG.info("Moving file '{}' to '{}'.", source, dest);
8988
createParentIfNotExists(destPath);
9089
Files.move(sourcePath, destPath, StandardCopyOption.ATOMIC_MOVE);
91-
LOG.info("File {} moved successfully", source);
90+
LOG.info("File '{}' moved successfully", source);
9291
} catch (IOException outer) {
9392
try {
9493
Files.move(sourcePath, destPath, StandardCopyOption.REPLACE_EXISTING);
9594
LOG.debug(
96-
"Non-atomic move of {} to {} succeeded after atomic move failed due to {}",
95+
"Non-atomic move of '{}' to '{}' succeeded after atomic move failed due to '{}'",
9796
source,
9897
destPath,
9998
outer.getMessage()
@@ -102,14 +101,14 @@ public boolean move(final URI source, final URI dest) {
102101
inner.addSuppressed(outer);
103102
try {
104103
doSimpleMove(sourcePath, destPath);
105-
LOG.debug("Simple move as copy+delete of {} to {} succeeded after move failed due to {}",
104+
LOG.debug("Simple move as copy+delete of '{}' to '{}' succeeded after move failed due to '{}'",
106105
source,
107106
dest,
108107
inner.getMessage()
109108
);
110109
} catch (IOException e) {
111110
e.addSuppressed(inner);
112-
LOG.error("Error while moving file {}", source, inner);
111+
LOG.error("Error while moving file '{}'", source, inner);
113112
return false;
114113
}
115114
}

connect-file-pulse-filesystems/filepulse-local-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/reader/BaseLocalFileInputReader.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package io.streamthoughts.kafka.connect.filepulse.fs.reader;
2020

21+
import io.streamthoughts.kafka.connect.filepulse.fs.LocalFileStorage;
2122
import io.streamthoughts.kafka.connect.filepulse.reader.StorageAwareFileInputReader;
2223

2324
public abstract class BaseLocalFileInputReader

connect-file-pulse-filesystems/filepulse-local-fs/src/test/java/io/streamthoughts/kafka/connect/filepulse/fs/clean/LocalMoveCleanupPolicyTest.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,7 @@
1919
package io.streamthoughts.kafka.connect.filepulse.fs.clean;
2020

2121
import io.streamthoughts.kafka.connect.filepulse.fs.LocalFSDirectoryListingConfig;
22-
import io.streamthoughts.kafka.connect.filepulse.fs.clean.LocalMoveCleanupPolicy;
23-
import io.streamthoughts.kafka.connect.filepulse.fs.reader.LocalFileStorage;
22+
import io.streamthoughts.kafka.connect.filepulse.fs.LocalFileStorage;
2423
import io.streamthoughts.kafka.connect.filepulse.source.LocalFileObjectMeta;
2524
import io.streamthoughts.kafka.connect.filepulse.source.FileObject;
2625
import io.streamthoughts.kafka.connect.filepulse.source.FileObjectOffset;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,186 @@
1+
/*
2+
* Copyright 2023 StreamThoughts.
3+
*
4+
* Licensed to the Apache Software Foundation (ASF) under one or more
5+
* contributor license agreements. See the NOTICE file distributed with
6+
* this work for additional information regarding copyright ownership.
7+
* The ASF licenses this file to You under the Apache License, Version 2.0
8+
* (the "License"); you may not use this file except in compliance with
9+
* the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package io.streamthoughts.kafka.connect.filepulse.fs.clean;
20+
21+
import io.streamthoughts.kafka.connect.filepulse.annotation.VisibleForTesting;
22+
import io.streamthoughts.kafka.connect.filepulse.clean.FileCleanupPolicy;
23+
import io.streamthoughts.kafka.connect.filepulse.config.SimpleConfig;
24+
import io.streamthoughts.kafka.connect.filepulse.fs.Storage;
25+
import io.streamthoughts.kafka.connect.filepulse.source.FileObject;
26+
import org.apache.kafka.common.config.ConfigDef;
27+
import org.slf4j.Logger;
28+
import org.slf4j.LoggerFactory;
29+
30+
import java.net.URI;
31+
import java.util.Map;
32+
import java.util.regex.Matcher;
33+
import java.util.regex.Pattern;
34+
35+
/**
36+
* Policy for printing into log files completed files.
37+
*/
38+
public final class RegexRouterCleanupPolicy implements FileCleanupPolicy {
39+
40+
private static final String CONFIG_PREFIX = "fs.cleanup.policy.router.";
41+
42+
public static final String SUCCESS_ROUTE_TOPIC_REGEX_CONFIG = CONFIG_PREFIX + "success.uri.regex";
43+
private static final String SUCCESS_ROUTE_TOPIC_REGEX_DOC =
44+
"Regular expression to use for matching objects in success.";
45+
public static final String SUCCESS_ROUTE_TOPIC_REPLACEMENT_CONFIG = CONFIG_PREFIX + "success.uri.replacement";
46+
private static final String SUCCESS_ROUTE_TOPIC_REPLACEMENT_DOC = "Replacement string.";
47+
48+
public static final String FAILURE_ROUTE_TOPIC_REGEX_CONFIG = CONFIG_PREFIX + "failure.uri.regex";
49+
private static final String FAILURE_ROUTE_TOPIC_REGEX_DOC =
50+
"Regular expression to use for matching objects in failure.";
51+
public static final String FAILURE_ROUTE_TOPIC_REPLACEMENT_CONFIG = CONFIG_PREFIX + "failure.uri.replacement";
52+
private static final String FAILURE_ROUTE_TOPIC_REPLACEMENT_DOC = "Replacement string.";
53+
54+
private static final Logger LOG = LoggerFactory.getLogger(LogCleanupPolicy.class);
55+
56+
private String successReplacement;
57+
private Pattern successRegex;
58+
59+
private String failureReplacement;
60+
private Pattern failureRegex;
61+
62+
private Storage storage;
63+
64+
/**
65+
* {@inheritDoc}
66+
*/
67+
@Override
68+
public void configure(final Map<String, ?> configs) {
69+
SimpleConfig simpleConfig = new SimpleConfig(configDef(), configs);
70+
successReplacement = simpleConfig.getString(SUCCESS_ROUTE_TOPIC_REPLACEMENT_CONFIG);
71+
successRegex = Pattern.compile(simpleConfig.getString(SUCCESS_ROUTE_TOPIC_REGEX_CONFIG));
72+
73+
failureReplacement = simpleConfig.getString(SUCCESS_ROUTE_TOPIC_REPLACEMENT_CONFIG);
74+
failureRegex = Pattern.compile(simpleConfig.getString(SUCCESS_ROUTE_TOPIC_REGEX_CONFIG));
75+
}
76+
77+
/**
78+
* {@inheritDoc}
79+
*/
80+
public boolean onSuccess(final FileObject source) {
81+
URI sourceURI = source.metadata().uri();
82+
return storage.move(sourceURI, routeOnSuccess(sourceURI));
83+
}
84+
85+
/**
86+
* {@inheritDoc}
87+
*/
88+
public boolean onFailure(final FileObject source) {
89+
URI sourceURI = source.metadata().uri();
90+
return storage.move(sourceURI, routeOnFailure(sourceURI));
91+
}
92+
93+
@VisibleForTesting
94+
URI routeOnSuccess(URI sourceURI) {
95+
return route(sourceURI.toString(), successReplacement, successRegex);
96+
}
97+
98+
@VisibleForTesting
99+
URI routeOnFailure(URI sourceURI) {
100+
return route(sourceURI.toString(), failureReplacement, failureRegex);
101+
}
102+
103+
/**
104+
* {@inheritDoc}
105+
*/
106+
@Override
107+
public void close() {
108+
}
109+
110+
@VisibleForTesting
111+
private URI route(final String originalURI,
112+
final String replacement,
113+
final Pattern regex) {
114+
final Matcher matcher = regex.matcher(originalURI);
115+
116+
String targetURI;
117+
if (matcher.matches()) {
118+
targetURI = matcher.replaceFirst(replacement);
119+
LOG.trace("Rerouting from object-file from '{}' to '{}'", originalURI, targetURI);
120+
} else {
121+
targetURI = originalURI;
122+
LOG.trace("Not rerouting object-file '{}' as it does not match the configured regex", originalURI);
123+
}
124+
125+
return URI.create(targetURI);
126+
}
127+
128+
/**
129+
* {@inheritDoc}
130+
*/
131+
@Override
132+
public void setStorage(final Storage storage) {
133+
this.storage = storage;
134+
}
135+
136+
private static ConfigDef configDef() {
137+
int orderInGroup = 0;
138+
return new ConfigDef()
139+
.define(
140+
SUCCESS_ROUTE_TOPIC_REPLACEMENT_CONFIG,
141+
ConfigDef.Type.STRING,
142+
"${routedByValue}",
143+
new ConfigDef.NonEmptyString(),
144+
ConfigDef.Importance.HIGH,
145+
SUCCESS_ROUTE_TOPIC_REPLACEMENT_DOC,
146+
null,
147+
orderInGroup,
148+
ConfigDef.Width.NONE,
149+
SUCCESS_ROUTE_TOPIC_REPLACEMENT_CONFIG)
150+
.define(
151+
SUCCESS_ROUTE_TOPIC_REGEX_CONFIG,
152+
ConfigDef.Type.STRING,
153+
"(?<routedByValue>.*)",
154+
new ConfigDef.NonEmptyString(),
155+
ConfigDef.Importance.HIGH,
156+
SUCCESS_ROUTE_TOPIC_REGEX_DOC,
157+
null,
158+
orderInGroup,
159+
ConfigDef.Width.NONE,
160+
SUCCESS_ROUTE_TOPIC_REGEX_CONFIG
161+
)
162+
.define(
163+
FAILURE_ROUTE_TOPIC_REPLACEMENT_CONFIG,
164+
ConfigDef.Type.STRING,
165+
"${routedByValue}",
166+
new ConfigDef.NonEmptyString(),
167+
ConfigDef.Importance.HIGH,
168+
FAILURE_ROUTE_TOPIC_REPLACEMENT_DOC,
169+
null,
170+
orderInGroup,
171+
ConfigDef.Width.NONE,
172+
FAILURE_ROUTE_TOPIC_REPLACEMENT_CONFIG)
173+
.define(
174+
FAILURE_ROUTE_TOPIC_REGEX_CONFIG,
175+
ConfigDef.Type.STRING,
176+
"(?<routedByValue>.*)",
177+
new ConfigDef.NonEmptyString(),
178+
ConfigDef.Importance.HIGH,
179+
FAILURE_ROUTE_TOPIC_REGEX_DOC,
180+
null,
181+
orderInGroup,
182+
ConfigDef.Width.NONE,
183+
FAILURE_ROUTE_TOPIC_REGEX_CONFIG
184+
);
185+
}
186+
}

connect-file-pulse-plugin/src/test/java/io/streamthoughts/kafka/connect/filepulse/fs/clean/DeleteCleanupPolicyTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
package io.streamthoughts.kafka.connect.filepulse.fs.clean;
2121

22-
import io.streamthoughts.kafka.connect.filepulse.fs.reader.LocalFileStorage;
22+
import io.streamthoughts.kafka.connect.filepulse.fs.LocalFileStorage;
2323
import io.streamthoughts.kafka.connect.filepulse.source.FileObject;
2424
import io.streamthoughts.kafka.connect.filepulse.source.FileObjectOffset;
2525
import io.streamthoughts.kafka.connect.filepulse.source.FileObjectStatus;

0 commit comments

Comments
 (0)