Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions .github/workflows/cpp_integration.yml
Original file line number Diff line number Diff line change
Expand Up @@ -85,24 +85,31 @@ jobs:
check-latest: false
- name: Compile & Install Celeborn Java
run: build/mvn clean install -DskipTests
- name: Run Java-Cpp Hybrid Integration Test
- name: Run Java-Write Cpp-Read Hybrid Integration Test (NONE Decompression)
run: |
build/mvn -pl worker \
test-compile exec:java \
-Dexec.classpathScope="test" \
-Dexec.mainClass="org.apache.celeborn.service.deploy.cluster.JavaWriteCppReadTestWithNONE" \
-Dexec.args="-XX:MaxDirectMemorySize=2G"
- name: Run Java-Cpp Hybrid Integration Test (LZ4 Decompression)
- name: Run Java-Write Cpp-Read Hybrid Integration Test (LZ4 Decompression)
run: |
build/mvn -pl worker \
test-compile exec:java \
-Dexec.classpathScope="test" \
-Dexec.mainClass="org.apache.celeborn.service.deploy.cluster.JavaWriteCppReadTestWithLZ4" \
-Dexec.args="-XX:MaxDirectMemorySize=2G"
- name: Run Java-Cpp Hybrid Integration Test (ZSTD Decompression)
- name: Run Java-Write Cpp-Read Hybrid Integration Test (ZSTD Decompression)
run: |
build/mvn -pl worker \
test-compile exec:java \
-Dexec.classpathScope="test" \
-Dexec.mainClass="org.apache.celeborn.service.deploy.cluster.JavaWriteCppReadTestWithZSTD" \
-Dexec.args="-XX:MaxDirectMemorySize=2G"
- name: Run Cpp-Write Java-Read Hybrid Integration Test (NONE Compression)
run: |
build/mvn -pl worker \
test-compile exec:java \
-Dexec.classpathScope="test" \
-Dexec.mainClass="org.apache.celeborn.service.deploy.cluster.CppWriteJavaReadTestWithNONE" \
-Dexec.args="-XX:MaxDirectMemorySize=2G"
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.celeborn.common.network.client.TransportClient;
import org.apache.celeborn.common.network.client.TransportClientFactory;
import org.apache.celeborn.common.network.protocol.PushData;
import org.apache.celeborn.common.network.protocol.SerdeVersion;
import org.apache.celeborn.common.network.protocol.TransportMessage;
import org.apache.celeborn.common.network.util.TransportConf;
import org.apache.celeborn.common.protocol.MessageType;
Expand Down Expand Up @@ -528,7 +529,7 @@ public Optional<PartitionLocation> regionStart(
public Optional<PartitionLocation> revive(
int shuffleId, int mapId, int attemptId, PartitionLocation location)
throws CelebornIOException {
Set<Integer> mapIds = new HashSet<>();
List<Integer> mapIds = new ArrayList<>();
mapIds.add(mapId);
List<ReviveRequest> requests = new ArrayList<>();
ReviveRequest req =
Expand All @@ -543,7 +544,7 @@ public Optional<PartitionLocation> revive(
requests.add(req);
PbChangeLocationResponse response =
lifecycleManagerRef.askSync(
ControlMessages.Revive$.MODULE$.apply(shuffleId, mapIds, requests),
ControlMessages.Revive$.MODULE$.apply(shuffleId, mapIds, requests, SerdeVersion.V1),
conf.clientRpcRequestPartitionLocationAskTimeout(),
ClassTag$.MODULE$.apply(PbChangeLocationResponse.class));
// per partitionKey only serve single PartitionLocation in Client Cache.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -550,11 +550,11 @@ private ConcurrentHashMap<Integer, PartitionLocation> registerShuffle(
numPartitions,
() ->
lifecycleManagerRef.askSync(
RegisterShuffle$.MODULE$.apply(shuffleId, numMappers, numPartitions),
new RegisterShuffle(shuffleId, numMappers, numPartitions, SerdeVersion.V1),
conf.clientRpcRegisterShuffleAskTimeout(),
rpcMaxRetries,
rpcRetryWait,
ClassTag$.MODULE$.apply(PbRegisterShuffleResponse.class)));
ClassTag$.MODULE$.apply(RegisterShuffleResponse.class)));
}

@Override
Expand Down Expand Up @@ -593,7 +593,7 @@ public PartitionLocation registerMapPartitionTask(
partitionId,
isSegmentGranularityVisible),
conf.clientRpcRegisterShuffleAskTimeout(),
ClassTag$.MODULE$.apply(PbRegisterShuffleResponse.class)));
ClassTag$.MODULE$.apply(RegisterShuffleResponse.class)));

return partitionLocationMap.get(partitionId);
}
Expand Down Expand Up @@ -709,23 +709,18 @@ public boolean reportBarrierTaskFailure(int appShuffleId, String appShuffleIdent
}

private ConcurrentHashMap<Integer, PartitionLocation> registerShuffleInternal(
int shuffleId,
int numMappers,
int numPartitions,
Callable<PbRegisterShuffleResponse> callable)
int shuffleId, int numMappers, int numPartitions, Callable<RegisterShuffleResponse> callable)
throws CelebornIOException {
int numRetries = registerShuffleMaxRetries;
StatusCode lastFailedStatusCode = null;
while (numRetries > 0) {
try {
PbRegisterShuffleResponse response = callable.call();
StatusCode respStatus = StatusCode.fromValue(response.getStatus());
RegisterShuffleResponse response = callable.call();
StatusCode respStatus = response.status();
if (StatusCode.SUCCESS.equals(respStatus)) {
ConcurrentHashMap<Integer, PartitionLocation> result = JavaUtils.newConcurrentHashMap();
Tuple2<List<PartitionLocation>, List<PartitionLocation>> locations =
PbSerDeUtils.fromPbPackedPartitionLocationsPair(
response.getPackedPartitionLocationsPair());
for (PartitionLocation location : locations._1) {
PartitionLocation[] locations = response.partitionLocations();
for (PartitionLocation location : locations) {
pushExcludedWorkers.remove(location.hostAndPushPort());
if (location.hasPeer()) {
pushExcludedWorkers.remove(location.getPeer().hostAndPushPort());
Expand Down Expand Up @@ -900,43 +895,43 @@ Map<Integer, Integer> reviveBatch(
oldLocMap.put(req.partitionId, req.loc);
}
try {
PbChangeLocationResponse response =
ChangeLocationResponse response =
lifecycleManagerRef.askSync(
Revive$.MODULE$.apply(shuffleId, mapIds, requests),
Revive$.MODULE$.apply(
shuffleId, new ArrayList<>(mapIds), new ArrayList<>(requests), SerdeVersion.V1),
conf.clientRpcRequestPartitionLocationAskTimeout(),
ClassTag$.MODULE$.apply(PbChangeLocationResponse.class));
ClassTag$.MODULE$.apply(ChangeLocationResponse.class));

for (int i = 0; i < response.getEndedMapIdCount(); i++) {
int mapId = response.getEndedMapId(i);
for (int i = 0; i < response.endedMapIds().size(); i++) {
int mapId = response.endedMapIds().get(i);
mapperEndMap.computeIfAbsent(shuffleId, (id) -> ConcurrentHashMap.newKeySet()).add(mapId);
}

for (int i = 0; i < response.getPartitionInfoCount(); i++) {
PbChangeLocationPartitionInfo partitionInfo = response.getPartitionInfo(i);
int partitionId = partitionInfo.getPartitionId();
int statusCode = partitionInfo.getStatus();
if (partitionInfo.getOldAvailable()) {
for (Map.Entry<Integer, Tuple3<StatusCode, Boolean, PartitionLocation>> entry :
response.newLocs().entrySet()) {
int partitionId = entry.getKey();
StatusCode statusCode = entry.getValue()._1();
if (entry.getValue()._2() != null) {
PartitionLocation oldLoc = oldLocMap.get(partitionId);
// Currently, revive only check if main location available, here won't remove peer loc.
pushExcludedWorkers.remove(oldLoc.hostAndPushPort());
}

if (StatusCode.SUCCESS.getValue() == statusCode) {
PartitionLocation loc =
PbSerDeUtils.fromPbPartitionLocation(partitionInfo.getPartition());
if (StatusCode.SUCCESS == statusCode) {
PartitionLocation loc = entry.getValue()._3();
partitionLocationMap.put(partitionId, loc);
pushExcludedWorkers.remove(loc.hostAndPushPort());
if (loc.hasPeer()) {
pushExcludedWorkers.remove(loc.getPeer().hostAndPushPort());
}
} else if (StatusCode.STAGE_ENDED.getValue() == statusCode) {
} else if (StatusCode.STAGE_ENDED == statusCode) {
stageEndShuffleSet.add(shuffleId);
return results;
} else if (StatusCode.SHUFFLE_UNREGISTERED.getValue() == statusCode) {
} else if (StatusCode.SHUFFLE_UNREGISTERED == statusCode) {
logger.error("SHUFFLE_NOT_REGISTERED!");
return null;
}
results.put(partitionId, statusCode);
results.put(partitionId, (int) (statusCode.getValue()));
}

return results;
Expand Down Expand Up @@ -1806,7 +1801,8 @@ private void mapEndInternal(
pushState.getFailedBatches(),
numPartitions,
crc32PerPartition,
bytesPerPartition),
bytesPerPartition,
SerdeVersion.V1),
rpcMaxRetries,
rpcRetryWait,
ClassTag$.MODULE$.apply(MapperEndResponse.class));
Expand Down
Loading
Loading