diff --git a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/ReliableTaildirEventReader.java b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/ReliableTaildirEventReader.java index ae9583620a..b4ca0cb1f5 100644 --- a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/ReliableTaildirEventReader.java +++ b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/ReliableTaildirEventReader.java @@ -22,7 +22,6 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; -import com.google.common.collect.Maps; import com.google.common.collect.Table; import com.google.gson.stream.JsonReader; import org.apache.flume.Event; @@ -43,6 +42,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.concurrent.ConcurrentHashMap; @InterfaceAudience.Private @InterfaceStability.Evolving @@ -53,7 +53,7 @@ public class ReliableTaildirEventReader implements ReliableEventReader { private final Table headerTable; private TailFile currentFile = null; - private Map tailFiles = Maps.newHashMap(); + private Map tailFiles = new ConcurrentHashMap<>(); private long updateTime; private boolean addByteOffset; private boolean cachePatternMatching; diff --git a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSource.java b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSource.java index 9ecccd7487..c450c8852c 100644 --- a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSource.java +++ b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSource.java @@ -25,6 +25,7 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -81,7 +82,7 @@ public class TaildirSource extends AbstractSource implements private int writePosInterval; private boolean cachePatternMatching; - private List existingInodes = new CopyOnWriteArrayList(); + private List existingInodes = new ArrayList<>(); private List idleInodes = new CopyOnWriteArrayList(); private Long backoffSleepIncrement; private Long maxBackOffSleepInterval; @@ -231,9 +232,12 @@ protected SourceCounter getSourceCounter() { public Status process() { Status status = Status.BACKOFF; try { - existingInodes.clear(); - existingInodes.addAll(reader.updateTailFiles()); - for (long inode : existingInodes) { + final List updatedInodes = reader.updateTailFiles(); + synchronized (existingInodes) { + existingInodes.clear(); + existingInodes.addAll(updatedInodes); + } + for (long inode : updatedInodes) { TailFile tf = reader.getTailFiles().get(inode); if (tf.needTail()) { boolean hasMoreLines = tailFileProcess(tf, true); @@ -304,10 +308,10 @@ private void closeTailFiles() throws IOException, InterruptedException { if (tf.getRaf() != null) { // when file has not closed yet tailFileProcess(tf, false); tf.close(); + idleInodes.remove(inode); logger.info("Closed file: " + tf.getPath() + ", inode: " + inode + ", pos: " + tf.getPos()); } } - idleInodes.clear(); } /** @@ -346,10 +350,8 @@ private void writePosition() { FileWriter writer = null; try { writer = new FileWriter(file); - if (!existingInodes.isEmpty()) { - String json = toPosInfoJson(); - writer.write(json); - } + String json = toPosInfoJson(); + writer.write(json); } catch (Throwable t) { logger.error("Failed writing positionFile", t); sourceCounter.incrementGenericProcessingFail(); @@ -366,9 +368,11 @@ private void writePosition() { private String toPosInfoJson() { @SuppressWarnings("rawtypes") List posInfos = Lists.newArrayList(); - for (Long inode : existingInodes) { - TailFile tf = reader.getTailFiles().get(inode); - posInfos.add(ImmutableMap.of("inode", inode, "pos", tf.getPos(), "file", tf.getPath())); + synchronized (existingInodes) { + for (long inode : existingInodes) { + TailFile tf = reader.getTailFiles().get(inode); + posInfos.add(ImmutableMap.of("inode", inode, "pos", tf.getPos(), "file", tf.getPath())); + } } return new Gson().toJson(posInfos); }