Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 0 additions & 12 deletions ratis-server/dev-support/findbugsExcludeFile.xml
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,6 @@
limitations under the License.
-->
<FindBugsFilter>
<Match>
<Class name="org.apache.ratis.server.impl.LeaderElection"/>
<Bug pattern="CT_CONSTRUCTOR_THROW"/>
</Match>
<Match>
<Class name="org.apache.ratis.server.impl.LeaderStateImpl"/>
<Bug pattern="CT_CONSTRUCTOR_THROW"/>
Expand Down Expand Up @@ -51,10 +47,6 @@
<Class name="org.apache.ratis.server.raftlog.segmented.SegmentedRaftLogOutputStream"/>
<Bug pattern="CT_CONSTRUCTOR_THROW"/>
</Match>
<Match>
<Class name="org.apache.ratis.server.raftlog.segmented.SegmentedRaftLogReader"/>
<Bug pattern="CT_CONSTRUCTOR_THROW"/>
</Match>
<Match>
<Class name="org.apache.ratis.server.raftlog.segmented.SegmentedRaftLog$Builder"/>
<Bug pattern="EI_EXPOSE_REP2"/>
Expand All @@ -67,10 +59,6 @@
<Class name="org.apache.ratis.server.raftlog.segmented.SegmentedRaftLogWorker$WriteLog"/>
<Bug pattern="CT_CONSTRUCTOR_THROW"/>
</Match>
<Match>
<Class name="org.apache.ratis.server.storage.FileChunkReader"/>
<Bug pattern="CT_CONSTRUCTOR_THROW"/>
</Match>
<Match>
<Class name="org.apache.ratis.server.storage.RaftStorageImpl"/>
<Bug pattern="EI_EXPOSE_REP"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@
* Ongaro, D. Consensus: Bridging Theory and Practice. PhD thesis, Stanford University, 2014.
* Available at https://github.com/ongardie/dissertation
*/
class LeaderElection implements Runnable {
final class LeaderElection implements Runnable {
public static final Logger LOG = LoggerFactory.getLogger(LeaderElection.class);

interface ServerInterface {
Expand Down Expand Up @@ -306,25 +306,33 @@ public String toString() {
private final boolean skipPreVote;
private final ConfAndTerm round0;

LeaderElection(RaftServerImpl server, boolean force) {
this(ServerInterface.get(server), force);
static LeaderElection newInstance(RaftServerImpl server, boolean force) {
return newInstance(ServerInterface.get(server), force);
}

LeaderElection(ServerInterface server, boolean force) {
this.name = ServerStringUtils.generateUnifiedName(server.getMemberId(), getClass()) + COUNT.incrementAndGet();
this.lifeCycle = new LifeCycle(this);
this.daemon = Daemon.newBuilder().setName(name).setRunnable(this)
.setThreadGroup(server.getThreadGroup()).build();
this.server = server;
this.skipPreVote = force || !server.isPreVoteEnabled();
static LeaderElection newInstance(ServerInterface server, boolean force) {
String name = ServerStringUtils.generateUnifiedName(server.getMemberId(), LeaderElection.class)
+ COUNT.incrementAndGet();
try {
// increase term of the candidate in advance if it's forced to election
this.round0 = force ? server.initElection(Phase.ELECTION) : null;
final ConfAndTerm round0 = force ? server.initElection(Phase.ELECTION) : null;
return new LeaderElection(name, server, force, round0);
} catch (IOException e) {
throw new IllegalStateException(name + ": Failed to initialize election", e);
}
}


private LeaderElection(String name, ServerInterface server, boolean force, ConfAndTerm round0) {
this.name = name;
this.lifeCycle = new LifeCycle(this);
this.daemon = Daemon.newBuilder().setName(name).setRunnable(this)
.setThreadGroup(server.getThreadGroup()).build();
this.server = server;
this.skipPreVote = force || !server.isPreVoteEnabled();
this.round0 = round0;
}

void start() {
startIfNew(daemon::start);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ void startLeaderElection(RaftServerImpl server, boolean force) {
if (pauseLeaderElection.get()) {
return;
}
updateAndGet(leaderElection, new LeaderElection(server, force)).start();
updateAndGet(leaderElection, LeaderElection.newInstance(server, force)).start();
}

void setLeaderElectionPause(boolean pause) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ public InstallSnapshotRequestProto next() {
final FileInfo info = snapshot.getFiles().get(fileIndex);
try {
if (current == null) {
current = new FileChunkReader(info, getRelativePath.apply(info));
current = FileChunkReader.newInstance(info, getRelativePath.apply(info));
}
final FileChunkProto chunk = current.readFileChunk(snapshotChunkMaxSize);
if (chunk.getDone()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ private void init() throws IOException {
state.open();
boolean initSuccess = false;
try {
reader = new SegmentedRaftLogReader(logFile, maxOpSize, raftLogMetrics);
reader = SegmentedRaftLogReader.newInstance(logFile, maxOpSize, raftLogMetrics);
initSuccess = reader.verifyHeader();
} finally {
if (!initSuccess) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
import java.util.Optional;
import java.util.zip.Checksum;

class SegmentedRaftLogReader implements Closeable {
final class SegmentedRaftLogReader implements Closeable {
static final Logger LOG = LoggerFactory.getLogger(SegmentedRaftLogReader.class);
/**
* InputStream wrapper that keeps track of the current stream position.
Expand Down Expand Up @@ -150,10 +150,18 @@ public long skip(long amt) throws IOException {
private final SegmentedRaftLogMetrics raftLogMetrics;
private final SizeInBytes maxOpSize;

SegmentedRaftLogReader(File file, SizeInBytes maxOpSize, SegmentedRaftLogMetrics raftLogMetrics) throws IOException {
static SegmentedRaftLogReader newInstance(File file, SizeInBytes maxOpSize, SegmentedRaftLogMetrics raftLogMetrics)
throws IOException {
final LimitedInputStream limiter = new LimitedInputStream(new BufferedInputStream(FileUtils.newInputStream(file)));
final DataInputStream in = new DataInputStream(limiter);
return new SegmentedRaftLogReader(file, maxOpSize, raftLogMetrics, limiter, in);
}

private SegmentedRaftLogReader(File file, SizeInBytes maxOpSize, SegmentedRaftLogMetrics raftLogMetrics,
LimitedInputStream limiter, DataInputStream in) {
this.file = file;
this.limiter = new LimitedInputStream(new BufferedInputStream(FileUtils.newInputStream(file)));
in = new DataInputStream(limiter);
this.limiter = limiter;
this.in = in;
checksum = new PureJavaCrc32C();
this.maxOpSize = maxOpSize;
this.raftLogMetrics = raftLogMetrics;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import java.security.MessageDigest;

/** Read {@link FileChunkProto}s from a file. */
public class FileChunkReader implements Closeable {
public final class FileChunkReader implements Closeable {
private final FileInfo info;
private final Path relativePath;
private final InputStream in;
Expand All @@ -51,17 +51,27 @@ public class FileChunkReader implements Closeable {
* @param relativePath the relative path of the file.
* @throws IOException if it failed to open the file.
*/
public FileChunkReader(FileInfo info, Path relativePath) throws IOException {
this.info = info;
this.relativePath = relativePath;
public static FileChunkReader newInstance(FileInfo info, Path relativePath) throws IOException {
final File f = info.getPath().toFile();
final InputStream in;
final MessageDigest digester;

if (info.getFileDigest() == null) {
digester = MD5FileUtil.newMD5();
this.in = new DigestInputStream(FileUtils.newInputStream(f), digester);
in = new DigestInputStream(FileUtils.newInputStream(f), digester);
} else {
digester = null;
this.in = FileUtils.newInputStream(f);
in = FileUtils.newInputStream(f);
}

return new FileChunkReader(info, relativePath, in, digester);
}

private FileChunkReader(FileInfo info, Path relativePath, InputStream in, MessageDigest digester) {
this.info = info;
this.relativePath = relativePath;
this.in = in;
this.digester = digester;
}

static ByteString readFileChunk(int chunkLength, InputStream in) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -592,7 +592,7 @@ public void testLeaderElectionMetrics() throws IOException, InterruptedException
@Test
public void testImmediatelyRevertedToFollower() {
RaftServerImpl server = createMockServer(true);
LeaderElection subject = new LeaderElection(server, false);
LeaderElection subject = LeaderElection.newInstance(server, false);

try {
subject.startInForeground();
Expand All @@ -606,7 +606,7 @@ public void testImmediatelyRevertedToFollower() {
@Test
public void testShutdownBeforeStart() {
RaftServerImpl server = createMockServer(false);
LeaderElection subject = new LeaderElection(server, false);
LeaderElection subject = LeaderElection.newInstance(server, false);

try {
subject.shutdown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ void runTestVoterWithEmptyLog(boolean expectToPass, TermIndex... lastEntries) {
for(int i = 0; i < lastEntries.length; i++) {
map.put(peers.get(i).getId(), lastEntries[i]);
}
final LeaderElection election = new LeaderElection(newServerInterface(expectToPass, map), false);
final LeaderElection election = LeaderElection.newInstance(newServerInterface(expectToPass, map), false);
election.startInForeground();
}

Expand Down
Loading