Skip to content

Commit d976dee

Browse files
authored
Merge pull request #1346 from jjaderberg/1.3-pregel-docs
Pregel api doc
2 parents d3f8fb3 + 750693d commit d976dee

File tree

7 files changed

+266
-28
lines changed

7 files changed

+266
-28
lines changed

algo/src/main/java/org/neo4j/graphalgo/beta/pregel/examples/ConnectedComponentsPregel.java

Lines changed: 15 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -27,36 +27,24 @@
2727
public class ConnectedComponentsPregel implements PregelComputation {
2828

2929
@Override
30-
public void compute(PregelContext pregel, final long nodeId, Queue<Double> messages) {
31-
if (pregel.isInitialSuperStep()) {
32-
// Incremental computation
33-
double currentValue = pregel.getNodeValue(nodeId);
34-
if (Double.compare(currentValue, pregel.getInitialNodeValue()) == 0) {
35-
pregel.sendMessages(nodeId, nodeId);
36-
pregel.setNodeValue(nodeId, nodeId);
37-
} else {
38-
pregel.sendMessages(nodeId, currentValue);
39-
}
40-
} else {
41-
long newComponentId = (long) pregel.getNodeValue(nodeId);
42-
boolean hasChanged = false;
43-
44-
if (messages != null) {
45-
Double message;
46-
while ((message = messages.poll()) != null) {
47-
if (message < newComponentId) {
48-
newComponentId = message.longValue();
49-
hasChanged = true;
30+
public void compute(PregelContext context, final long nodeId, Queue<Double> messages) {
31+
double oldComponentId = context.getNodeValue(nodeId);
32+
double newComponentId = oldComponentId;
33+
if (context.isInitialSuperStep()) {
34+
// In the first round, every node uses its own id as the component id
35+
newComponentId = nodeId;
36+
} else if (messages != null && !messages.isEmpty()){
37+
Double nextComponentId;
38+
while ((nextComponentId = messages.poll()) != null) {
39+
if (nextComponentId.longValue() < newComponentId) {
40+
newComponentId = nextComponentId.longValue();
5041
}
5142
}
52-
}
53-
54-
if (hasChanged) {
55-
pregel.setNodeValue(nodeId, newComponentId);
56-
pregel.sendMessages(nodeId, newComponentId);
57-
}
43+
}
5844

59-
pregel.voteToHalt(nodeId);
45+
if (newComponentId != oldComponentId) {
46+
context.setNodeValue(nodeId, newComponentId);
47+
context.sendMessages(nodeId, newComponentId);
6048
}
6149
}
6250
}

algo/src/test/java/org/neo4j/graphalgo/beta/pregel/examples/WeaklyConnectedComponentsPregelTest.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
package org.neo4j.graphalgo.beta.pregel.examples;
2121

2222
import org.junit.jupiter.api.Test;
23-
import org.neo4j.graphalgo.AlgoTestBase;
2423
import org.neo4j.graphalgo.Orientation;
2524
import org.neo4j.graphalgo.TestSupport;
2625
import org.neo4j.graphalgo.beta.pregel.ImmutablePregelConfig;

doc/asciidoc/algorithms/algorithms-intro.adoc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ This chapter is divided into the following sections:
3131
* <<algorithms-path-finding>>
3232
* <<algorithms-linkprediction>>
3333
* <<algorithms-auxiliary>>
34+
* <<algorithms-pregel-api>>
3435

3536
include::algorithms-syntax.adoc[leveloffset=+1]
3637

@@ -47,3 +48,5 @@ include::algorithms-link-prediction.adoc[leveloffset=+1]
4748
include::node-embeddings.adoc[leveloffset=+1]
4849

4950
include::algorithms-auxiliary.adoc[leveloffset=+1]
51+
52+
include::algorithms-pregel-api.adoc[leveloffset=+1]
Lines changed: 189 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,189 @@
1+
[[algorithms-pregel-api]]
2+
= Pregel API
3+
4+
[abstract]
5+
--
6+
This chapter provides explanations and examples for using the Pregel API in the Neo4j Graph Data Science library.
7+
--
8+
9+
[[algorithms-pregel-api-intro]]
10+
== Introduction
11+
12+
Pregel is a vertex-centric computation model to define your own algorithms via a user-defined _compute_ function.
13+
Node values can be updated within the compute function and represent the algorithm result.
14+
The input graph contains default node values or node values from a graph projection.
15+
16+
The compute function is executed in multiple iterations, also called _super-steps_.
17+
In each super-step the compute function is executed for each node in the graph.
18+
Within that function, a node can receive messages from its neighbor nodes.
19+
Based on the received messages and its currently stored value, a node can compute a new value.
20+
A node can also send messages to all of its neighbors which are received in the next super-step.
21+
The algorithm terminates after a fixed number of super-steps or if no messages are being sent between nodes.
22+
23+
A Pregel computation is executed in parallel.
24+
Each thread executes the compute function for a batch of nodes.
25+
26+
For more information about Pregel, have a look at https://kowshik.github.io/JPregel/pregel_paper.pdf.
27+
28+
To implement your own Pregel algorithm, the Graph Data Science library provides a Java API, which is described below.
29+
30+
For an example on how to expose a custom Pregel computation via a Neo4j procedure, have a look at the https://github.com/neo-technology/graph-analytics/tree/master/public/examples/PregelK1Coloring[K1-Coloring example].
31+
32+
33+
== Pregel API
34+
.Initializing Pregel
35+
[source, java]
36+
----
37+
package org.neo4j.graphalgo.beta.pregel;
38+
39+
public final class Pregel {
40+
// constructing an instance of Pregel
41+
public static Pregel withDefaultNodeValues(
42+
final Graph graph,
43+
final PregelConfig config,
44+
final PregelComputation computation,
45+
final int batchSize,
46+
final ExecutorService executor,
47+
final AllocationTracker tracker
48+
) {...}
49+
50+
// running the Pregel instance to get node values as result
51+
public HugeDoubleArray run(final int maxIterations) {...}
52+
}
53+
----
54+
55+
To build a PregelConfig you can use the `ImmutablePregelConfig.builder()`.
56+
57+
.Pregel Config
58+
[opts="header",cols="1,1,1,6"]
59+
|===
60+
| Name | Type | Default Value | Description
61+
| initialNodeValue | Double | -1 | Initial value of the node in the Pregel context.
62+
| isAsynchronous | Boolean | false | Flag indicating if messages can be sent and received in the same super-step.
63+
| relationshipWeightProperty| String | null | Name of the relationship property that represents weight.
64+
| concurrency | Integer | 4 | Concurrency used when executing the Pregel computation.
65+
|===
66+
67+
To implement your own algorithm, you need to implement the `PregelComputation` interface.
68+
69+
.The Pregel computation
70+
[source, java]
71+
----
72+
@FunctionalInterface
73+
public interface PregelComputation {
74+
// specifying the algorithm logic.
75+
void compute(PregelContext context, long nodeId, Queue<Double> messages);
76+
// how relationship weights should be applied on the message
77+
default double applyRelationshipWeight(double nodeValue, double relationshipWeight) { return nodeValue; }
78+
}
79+
----
80+
81+
The compute function takes a context, the node id for which the method is being executed for, and the messages that were sent to that node.
82+
Using the context and the node id, one can access the current super-step, read and update the node value, send messages or vote to halt the computation.
83+
84+
.The Pregel context
85+
[source, java]
86+
----
87+
public final class PregelContext {
88+
// nodes voting to halt will be inactive and accept no new messages
89+
public void voteToHalt(long nodeId) {...};
90+
// if its the first iteration
91+
public boolean isInitialSuperStep() {...};
92+
// get the number of the current iteration
93+
public int getSuperstep() {...};
94+
public double getNodeValue(long nodeId) {...};
95+
public void setNodeValue(long nodeId, double value) {...};
96+
// sending a message to the neighbours of a node
97+
public void sendMessages(long nodeId, double message) {...};
98+
public int getDegree(long nodeId) {...};
99+
// get the inital node value given by the PregelConfig
100+
public double getInitialNodeValue() {...};
101+
}
102+
----
103+
104+
105+
[[algorithms-pregel-api-example]]
106+
== Example
107+
108+
.The following provides an example of Pregel computation:
109+
[source, java]
110+
----
111+
import org.neo4j.graphalgo.beta.pregel.PregelComputation;
112+
import org.neo4j.graphalgo.beta.pregel.PregelContext;
113+
114+
import java.util.Queue;
115+
116+
public class ConnectedComponentsPregel implements PregelComputation {
117+
118+
@Override
119+
public void compute(PregelContext context, long nodeId, Queue<Double> messages) {
120+
// get the current componentId for the node from the context
121+
// if we are on the first iteration, the value is the default value from the PregelConfig
122+
// which we do not use
123+
double oldComponentId = context.getNodeValue(nodeId);
124+
double newComponentId = oldComponentId;
125+
if (context.isInitialSuperStep()) {
126+
// In the first round, we use use the nodeId as component instead of the default -1
127+
newComponentId = nodeId;
128+
// need to check if there are any messages for this node
129+
} else if (messages != null && !messages.isEmpty()){
130+
// the componentId is updated to the smallest componentId of its neighbors including itself
131+
Double nextComponentId;
132+
while ((nextComponentId = messages.poll()) != null) {
133+
if (nextComponentId.longValue() < newComponentId) {
134+
newComponentId = nextComponentId.longValue();
135+
}
136+
}
137+
}
138+
139+
// update the node's componentId, both in the context and notify neighbors
140+
if (newComponentId != oldComponentId) {
141+
context.setNodeValue(nodeId, newComponentId);
142+
// send the new componentId to neighbors so that they also can be updated
143+
context.sendMessages(nodeId, newComponentId);
144+
}
145+
}
146+
}
147+
----
148+
149+
.The following runs Pregel, using `ConnectedComponentsPregel`
150+
[source, java]
151+
----
152+
import org.neo4j.graphalgo.core.utils.paged.HugeDoubleArray;
153+
import org.neo4j.graphalgo.core.concurrency.Pools;
154+
import org.neo4j.graphalgo.core.utils.paged.AllocationTracker;
155+
import org.neo4j.graphalgo.config.AlgoBaseConfig;
156+
157+
import org.neo4j.graphalgo.beta.pregel.ImmutablePregelConfig;
158+
import org.neo4j.graphalgo.beta.pregel.Pregel;
159+
import org.neo4j.graphalgo.beta.pregel.PregelConfig;
160+
import org.neo4j.graphalgo.beta.generator.RandomGraphGenerator;
161+
162+
163+
public class PregelExample {
164+
public static void main(String[] args) {
165+
int batchSize = 10;
166+
int maxIterations = 10;
167+
168+
PregelConfig config = ImmutablePregelConfig.builder()
169+
.isAsynchronous(true)
170+
.build();
171+
172+
Pregel pregelJob = Pregel.withDefaultNodeValues(
173+
// generate a random graph with 100 nodes and average degree 10
174+
RandomGraphGenerator.generate(100, 10),
175+
config,
176+
new ConnectedComponentsPregel(),
177+
batchSize,
178+
// run on the default GDS ExecutorService
179+
Pools.DEFAULT,
180+
// disable memory allocation tracking
181+
AllocationTracker.EMPTY
182+
);
183+
184+
// the index in the nodeValues array is the nodeId from the graph
185+
HugeDoubleArray nodeValues = pregelJob.run(maxIterations);
186+
System.out.println(nodeValues.toString());
187+
}
188+
}
189+
----

doc/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,7 @@ html {
235235
'//cdnjs.cloudflare.com/ajax/libs/codemirror/5.11.0/codemirror.min.js',
236236
'//cdnjs.cloudflare.com/ajax/libs/codemirror/5.11.0/addon/runmode/runmode.min.js',
237237
'//cdnjs.cloudflare.com/ajax/libs/codemirror/5.11.0/mode/cypher/cypher.min.js',
238+
"//cdnjs.cloudflare.com/ajax/libs/codemirror/5.11.0/mode/clike/clike.min.js",
238239
'javascript/colorize.js',
239240
'javascript/tabs-for-chunked.js',
240241
'javascript/mp-nav.js',

doc/docbook/content-map.xml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,9 @@
195195
<?dbhtml filename="alpha-algorithms/one-hot-encoding/index.html"?>
196196
</d:tocentry>
197197
</d:tocentry>
198+
<d:tocentry linkend="algorithms-pregel-api">
199+
<?dbhtml filename="algorithms/pregel-api/index.html"?>
200+
</d:tocentry>
198201
</d:tocentry>
199202

200203
<d:tocentry linkend="production-deployment">
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/*
2+
* Copyright (c) 2017-2020 "Neo4j,"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Neo4j is free software: you can redistribute it and/or modify
8+
* it under the terms of the GNU General Public License as published by
9+
* the Free Software Foundation, either version 3 of the License, or
10+
* (at your option) any later version.
11+
*
12+
* This program is distributed in the hope that it will be useful,
13+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
14+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15+
* GNU General Public License for more details.
16+
*
17+
* You should have received a copy of the GNU General Public License
18+
* along with this program. If not, see <http://www.gnu.org/licenses/>.
19+
*/
20+
package org.neo4j.graphalgo.doc;
21+
22+
import org.junit.jupiter.api.Test;
23+
import org.neo4j.graphalgo.beta.generator.RandomGraphGenerator;
24+
import org.neo4j.graphalgo.beta.pregel.ImmutablePregelConfig;
25+
import org.neo4j.graphalgo.beta.pregel.Pregel;
26+
import org.neo4j.graphalgo.beta.pregel.PregelConfig;
27+
import org.neo4j.graphalgo.beta.pregel.examples.ConnectedComponentsPregel;
28+
import org.neo4j.graphalgo.config.AlgoBaseConfig;
29+
import org.neo4j.graphalgo.core.concurrency.Pools;
30+
import org.neo4j.graphalgo.core.utils.paged.AllocationTracker;
31+
import org.neo4j.graphalgo.core.utils.paged.HugeDoubleArray;
32+
33+
public class PregelConnectedComponentsDocExample {
34+
@Test
35+
public void testDoc() {
36+
int batchSize = 10;
37+
int maxIterations = 10;
38+
39+
PregelConfig config = ImmutablePregelConfig.builder()
40+
.isAsynchronous(true)
41+
.build();
42+
43+
Pregel pregelJob = Pregel.withDefaultNodeValues(
44+
RandomGraphGenerator.generate(100, 10),
45+
config,
46+
new ConnectedComponentsPregel(),
47+
batchSize,
48+
Pools.DEFAULT,
49+
AllocationTracker.EMPTY
50+
);
51+
52+
HugeDoubleArray nodeValues = pregelJob.run(maxIterations);
53+
System.out.println(nodeValues.toString());
54+
}
55+
}

0 commit comments

Comments
 (0)