Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -842,10 +842,15 @@ private CompletableFuture<RaftClientReply> streamAsync(RaftClientRequest request
newExceptionReply(request, generateNotLeaderException())));
}

private CompletableFuture<RaftClientRequest> streamEndOfRequestAsync(RaftClientRequest request) {
CompletableFuture<RaftClientRequest> streamEndOfRequestAsync(RaftClientRequest request) {
return role.getLeaderState()
.map(ls -> ls.streamEndOfRequestAsync(request))
.orElse(null);
.map(ls -> ls.streamEndOfRequestAsync(request))
.orElseGet(() -> {
final CompletableFuture<RaftClientRequest> errorF = new CompletableFuture<>();
errorF.completeExceptionally(
new Exception("Unexpected null encountered, while receiving end of stream request."));
return errorF;
});
}

CompletableFuture<RaftClientReply> processQueryFuture(
Expand Down Expand Up @@ -1560,21 +1565,30 @@ private InstallSnapshotReplyProto notifyStateMachineToInstallSnapshot(
return reply;
}

Optional<RaftPeerProto> leaderPeerInfo = null;
RaftPeerProto leaderPeerInfo = null;
if (request.hasLastRaftConfigurationLogEntryProto()) {
List<RaftPeerProto> peerList = request.getLastRaftConfigurationLogEntryProto().getConfigurationEntry()
.getPeersList();
leaderPeerInfo = peerList.stream().filter(p -> RaftPeerId.valueOf(p.getId()).equals(leaderId)).findFirst();
Preconditions.assertTrue(leaderPeerInfo.isPresent());
Optional<RaftPeerProto> optionalLeaderPeerInfo = peerList.stream()
.filter(p -> RaftPeerId.valueOf(p.getId()).equals(leaderId)).findFirst();
leaderPeerInfo = (optionalLeaderPeerInfo.isPresent()) ? optionalLeaderPeerInfo.get() : null;
}

// For the cases where RaftConf is empty on newly started peer with
// empty peer list, we retrieve leader info from
// installSnapShotRequestProto.
RoleInfoProto roleInfoProto =
getRaftConf().getPeer(state.getLeaderId()) == null ?
getRoleInfoProto(ProtoUtils.toRaftPeer(leaderPeerInfo.get())) :
getRoleInfoProto();
RoleInfoProto roleInfoProto;
RaftPeer raftPeer = getRaftConf().getPeer(state.getLeaderId());
if (raftPeer == null) {
if (leaderPeerInfo != null) {
roleInfoProto = getRoleInfoProto(ProtoUtils.toRaftPeer(leaderPeerInfo));
} else {
throw new IOException("Leader peer info is unknown.");
}
} else {
roleInfoProto = getRoleInfoProto();
}

// This is the first installSnapshot notify request for this term and
// index. Notify the state machine to install the snapshot.
LOG.info("{}: notifyInstallSnapshot: nextIndex is {} but the leader's first available index is {}.",
Expand Down Expand Up @@ -1633,7 +1647,7 @@ private CompletableFuture<Message> replyPendingRequest(
// update the retry cache
final CacheEntry cacheEntry = retryCache.getOrCreateEntry(invocationId);
if (getInfo().isLeader()) {
Preconditions.assertTrue(cacheEntry != null && !cacheEntry.isCompletedNormally(),
Preconditions.assertTrue(!cacheEntry.isCompletedNormally(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why remove cacheEntry != null ?

"retry cache entry should be pending: %s", cacheEntry);
}
if (cacheEntry.isFailed()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,13 +181,11 @@ public String toString() {
}

CacheEntry getOrCreateEntry(ClientInvocationId key) {
final CacheEntry entry;
try {
entry = cache.get(key, () -> new CacheEntry(key));
return cache.get(key, () -> new CacheEntry(key));
} catch (ExecutionException e) {
throw new IllegalStateException(e);
}
return entry;
}

CacheEntry refreshEntry(CacheEntry newEntry) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,7 @@
import org.apache.log4j.Level;
import org.apache.ratis.RaftTestUtil;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftGroupMemberId;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.protocol.*;
import org.apache.ratis.server.DataStreamMap;
import org.apache.ratis.server.DataStreamServer;
import org.apache.ratis.server.DivisionInfo;
Expand All @@ -44,6 +41,7 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Stream;

public class RaftServerTestUtil {
Expand Down Expand Up @@ -156,6 +154,10 @@ public static DataStreamMap newDataStreamMap(Object name) {
return new DataStreamMapImpl(name);
}

public static CompletableFuture<RaftClientRequest> streamEndOfRequestAsync(RaftServer.Division server, RaftClientRequest request) {
return ((RaftServerImpl)server).streamEndOfRequestAsync(request);
}

public static void assertLostMajorityHeartbeatsRecently(RaftServer.Division leader) {
final FollowerState f = ((RaftServerImpl)leader).getRole().getFollowerState().orElse(null);
Assert.assertNotNull(f);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,15 @@ public static void assertFailure(RetryCache cache, LogEntryProto logEntry, boole
}
}

public static void getOrCreateEntry(RaftServer.Division server, ClientInvocationId invocationId) {
getOrCreateEntry(server.getRetryCache(), invocationId);
public static RetryCache.Entry getOrCreateEntry(RaftServer.Division server, ClientInvocationId invocationId) {
return getOrCreateEntryImpl(server.getRetryCache(), invocationId);
}

private static RetryCache.Entry getOrCreateEntry(RetryCache cache, ClientInvocationId invocationId) {
public static RetryCache.Entry getOrCreateEntry(RetryCache retryCache, ClientInvocationId invocationId) {
return getOrCreateEntryImpl(retryCache, invocationId);
}

private static RetryCache.Entry getOrCreateEntryImpl(RetryCache cache, ClientInvocationId invocationId) {
return ((RetryCacheImpl)cache).getOrCreateEntry(invocationId);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.log4j.Level;
import org.apache.ratis.BaseTest;
import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.protocol.*;
import org.apache.ratis.protocol.exceptions.StreamException;
import org.apache.ratis.server.impl.MiniRaftCluster;
import org.apache.ratis.RaftTestUtil;
import org.apache.ratis.RaftTestUtil.SimpleMessage;
Expand All @@ -44,9 +45,6 @@
import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.proto.RaftProtos.RaftPeerRole;
import org.apache.ratis.protocol.exceptions.AlreadyClosedException;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.protocol.exceptions.TimeoutIOException;
import org.apache.ratis.retry.RetryPolicies;
import org.apache.ratis.server.RaftServer;
Expand Down Expand Up @@ -74,6 +72,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;

public class TestRaftServerWithGrpc extends BaseTest implements MiniRaftClusterWithGrpc.FactoryGet {
{
Expand Down Expand Up @@ -311,9 +310,50 @@ void testRaftClientRequestMetrics(MiniRaftClusterWithGrpc cluster) throws IOExce
}
}

@Test
public void TestStreamEndOfRequestAsync() throws Exception {
runWithNewCluster(1, this::runTestStreamEndOfRequestAsync);
}

void runTestStreamEndOfRequestAsync(MiniRaftClusterWithGrpc cluster) throws Exception {
final RaftServer.Division leader = RaftTestUtil.waitForLeader(cluster);
final RaftPeerId leaderId = leader.getId();
final RaftGroupId leaderGroupId = leader.getGroup().getGroupId();
final RaftClient client = cluster.createClient();
final AtomicLong seqNum = new AtomicLong();
final RaftClientRequest clientRequest = newRaftClientRequest(client, leaderId, seqNum.incrementAndGet(),
RaftClientRequest.messageStreamRequestType(12, 12, true));

// Leader completes exceptionally, because there is no such stream
// Creating realistic stream is complex, since streams are created by clients, but
// this function tests server functionality.
CompletableFuture<RaftClientRequest> fRequest = RaftServerTestUtil.streamEndOfRequestAsync(leader, clientRequest);
Assert.assertNotNull(fRequest);
Assert.assertTrue(fRequest.isCompletedExceptionally());
fRequest.exceptionally(e -> {
Assert.assertSame(e.getCause().getClass(), StreamException.class);
return clientRequest;
});

// On non leader, request should fail because only leaders handle this kind of requests
RaftServer server = cluster.putNewServer(RaftPeerId.getRaftPeerId("Server 21"), leader.getGroup(), false);
fRequest = RaftServerTestUtil.streamEndOfRequestAsync(server.getDivision(leaderGroupId), clientRequest);
Assert.assertNotNull(fRequest);
Assert.assertTrue(fRequest.isCompletedExceptionally());
fRequest.exceptionally(e -> {
Assert.assertSame(e.getCause().getClass(), Exception.class);
return clientRequest;
});
}

static RaftClientRequest newRaftClientRequest(RaftClient client, RaftPeerId serverId, long seqNum) {
return newRaftClientRequest(client, serverId, seqNum, RaftClientRequest.writeRequestType());
}

static RaftClientRequest newRaftClientRequest(RaftClient client, RaftPeerId serverId, long seqNum,
RaftClientRequest.Type type) {
final SimpleMessage m = new SimpleMessage("m" + seqNum);
return RaftClientTestUtil.newRaftClientRequest(client, serverId, seqNum, m,
RaftClientRequest.writeRequestType(), ProtoUtils.toSlidingWindowEntry(seqNum, seqNum == 1L));
type, ProtoUtils.toSlidingWindowEntry(seqNum, seqNum == 1L));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,7 @@
import org.apache.ratis.RaftTestUtil.SimpleOperation;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.metrics.RatisMetricRegistry;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftGroupMemberId;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.protocol.*;
import org.apache.ratis.protocol.exceptions.TimeoutIOException;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.impl.RetryCacheTestUtil;
Expand Down Expand Up @@ -636,6 +634,29 @@ public void notifyLogFailed(Throwable cause, LogEntryProto entry) {
throw ex;
}

/**
* Verifies that getOrCreateEntry function creates cache entry in every case and does not return null.
*/
@Test
public void testGetOrCreateEntry() {
final RetryCache retryCache = RetryCacheTestUtil.createRetryCache();
final ClientId clientId = ClientId.randomId();
final long invocationId1 = 123456789;
final ClientInvocationId clientInvocationId1 = ClientInvocationId.valueOf(clientId, invocationId1);
RetryCache.Entry cacheEntry1 = RetryCacheTestUtil.getOrCreateEntry(retryCache, clientInvocationId1);
Assert.assertNotNull(cacheEntry1);

RetryCache.Entry cacheEntry1Again = RetryCacheTestUtil.getOrCreateEntry(retryCache, clientInvocationId1);
Assert.assertEquals(cacheEntry1.toString(), cacheEntry1Again.toString());

final long invocationId2 = 987654321;
final ClientInvocationId clientInvocationId2 = ClientInvocationId.valueOf(clientId, invocationId2);
RetryCache.Entry cacheEntry2 = RetryCacheTestUtil.getOrCreateEntry(retryCache, clientInvocationId2);
Assert.assertNotNull(cacheEntry2);

Assert.assertNotEquals(cacheEntry1.toString(), cacheEntry2.toString());
}

static Thread startAppendEntryThread(RaftLog raftLog, LogEntryProto entry) {
final Thread t = new Thread(() -> {
try {
Expand Down