Skip to content
This repository was archived by the owner on Apr 22, 2020. It is now read-only.

Commit 7d9c20f

Browse files
committed
add retries to pagerank parallel tasks
1 parent b3ca776 commit 7d9c20f

File tree

5 files changed

+17
-5
lines changed

5 files changed

+17
-5
lines changed

algo/src/main/java/org/neo4j/graphalgo/impl/pagerank/EigenvectorCentralityComputeStep.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,4 +108,11 @@ void normalizeDeltas() {
108108
// System.out.println(name + "\nnorm: " + l2Norm + "\nbefore: " + Arrays.toString(before) + "\nafter: " + Arrays.toString(deltas));
109109
}
110110

111+
@Override
112+
public String toString() {
113+
return "EigenvectorCentralityComputeStep{" +
114+
"partitionSize=" + partitionSize +
115+
", startNode=" + startNode +
116+
'}';
117+
}
111118
}

algo/src/main/java/org/neo4j/graphalgo/impl/pagerank/PageRank.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131

3232
import java.util.*;
3333
import java.util.concurrent.ExecutorService;
34+
import java.util.concurrent.TimeUnit;
3435
import java.util.stream.LongStream;
3536

3637

@@ -356,17 +357,17 @@ private void run(int iterations) {
356357
System.out.println("-------");
357358
System.out.println("[iteration started] iteration:" + iteration);
358359
// calculate scores
359-
ParallelUtil.runWithConcurrency(concurrency, steps, pool);
360+
ParallelUtil.runWithConcurrency(concurrency, steps, 3, 1, TimeUnit.SECONDS, pool);
360361

361362
// sync scores
362363
System.out.println("[sync scores] iteration:" + iteration + ", steps:" + steps.size());
363364
synchronizeScores();
364-
ParallelUtil.runWithConcurrency(concurrency, steps, pool);
365+
ParallelUtil.runWithConcurrency(concurrency, steps, 3, 1, TimeUnit.SECONDS, pool);
365366

366367
// normalize deltas
367368
System.out.println("[norm computation] iteration:" + iteration + ", steps:" + steps.size());
368369
normalizeDeltas(iteration);
369-
ParallelUtil.runWithConcurrency(concurrency, steps, pool);
370+
ParallelUtil.runWithConcurrency(concurrency, steps, 3, 1, TimeUnit.SECONDS, pool);
370371

371372
System.out.println("[iteration finished] iteration:" + iteration);
372373
System.out.println("-------");

core/src/main/java/org/neo4j/graphalgo/core/utils/ParallelUtil.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -707,7 +707,9 @@ private static void runWithConcurrency(
707707
while (ts.hasNext()) {
708708
if (completionService.hasTasks()) {
709709
try {
710+
System.out.println("[ParallelUtil#runWithConcurrency] waiting for task to finish... " + executor);
710711
completionService.awaitNext();
712+
System.out.println("[ParallelUtil#runWithConcurrency] task finished... " + executor);
711713
} catch (ExecutionException e) {
712714
error = Exceptions.chain(error, e.getCause());
713715
} catch (CancellationException ignore) {
@@ -888,6 +890,7 @@ protected void done() {
888890
if (executor instanceof ThreadPoolExecutor) {
889891
pool = (ThreadPoolExecutor) executor;
890892
availableConcurrency = pool.getCorePoolSize();
893+
// availableConcurrency = 2;
891894
int capacity = Math.max(targetConcurrency, availableConcurrency) + 1;
892895
System.out.println("[ParallelUtil#runWithConcurrency] capacity = " + capacity + " [target:" + targetConcurrency + ",available:" + availableConcurrency + "]");
893896
completionQueue = new ArrayBlockingQueue<>(capacity);
@@ -919,7 +922,6 @@ boolean submit(Runnable task) {
919922
executor.execute(future);
920923
return true;
921924
}
922-
System.out.println("[ParallelUtil#runWithConcurrency] unable to submit task " + task);
923925
return false;
924926
}
925927

@@ -942,6 +944,7 @@ private boolean canSubmit() {
942944

943945
if(!canSubmit) {
944946
System.out.println("[ParallelUtil#runWithConcurrency] unable to submit task and pool:" + pool + ", activeCount:" + activeCount + ", availableConcurrency:" + availableConcurrency);
947+
// throw new RuntimeException();
945948
} else {
946949
System.out.println("[ParallelUtil#runWithConcurrency] submitted task and pool:" + pool + ", activeCount:" + activeCount + ", availableConcurrency:" + availableConcurrency);
947950
}

core/src/main/java/org/neo4j/graphalgo/core/utils/Pools.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ public class Pools {
4242
DEFAULT_CONCURRENCY = definedProcessors;
4343
} else {
4444
DEFAULT_CONCURRENCY = Runtime.getRuntime().availableProcessors();
45+
// DEFAULT_CONCURRENCY = 2;
4546
}
4647
}
4748

tests/src/test/java/org/neo4j/graphalgo/algo/EigenvectorCentralityProcIntegrationTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -219,7 +219,7 @@ public void testWriteBackUnderDifferentProperty() throws Exception {
219219
@Test
220220
public void testParallelWriteBack() throws Exception {
221221
runQuery(
222-
"CALL algo.eigenvector('Character', 'INTERACTS_SEASON1', {batchSize:50, concurrency:2, write:true, graph:'"+graphImpl+"', direction: 'BOTH'}) YIELD writeMillis, write, writeProperty, iterations",
222+
"CALL algo.eigenvector('Character', 'INTERACTS_SEASON1', {batchSize:3, concurrency:2, write:true, graph:'"+graphImpl+"', direction: 'BOTH'}) YIELD writeMillis, write, writeProperty, iterations",
223223
row -> {
224224
assertTrue(
225225
"write time not set",

0 commit comments

Comments
 (0)