1919 */
2020package org .neo4j .graphalgo .beta .filter ;
2121
22+ import com .carrotsearch .hppc .AbstractIterator ;
2223import org .apache .commons .lang3 .mutable .MutableInt ;
2324import org .eclipse .collections .api .block .function .primitive .LongToLongFunction ;
2425import org .neo4j .graphalgo .NodeLabel ;
2526import org .neo4j .graphalgo .annotation .ValueClass ;
2627import org .neo4j .graphalgo .api .DefaultValue ;
2728import org .neo4j .graphalgo .api .GraphStore ;
29+ import org .neo4j .graphalgo .api .IdMapping ;
2830import org .neo4j .graphalgo .api .NodeMapping ;
2931import org .neo4j .graphalgo .api .NodeProperties ;
3032import org .neo4j .graphalgo .api .NodeProperty ;
4547import org .neo4j .graphalgo .core .utils .mem .AllocationTracker ;
4648import org .neo4j .graphalgo .core .utils .paged .HugeLongArray ;
4749import org .neo4j .graphalgo .core .utils .paged .HugeMergeSort ;
48- import org .neo4j .graphalgo .core .utils .paged .SparseLongArray ;
4950import org .neo4j .graphalgo .core .utils .partition .Partition ;
5051import org .neo4j .graphalgo .core .utils .partition .PartitionUtils ;
5152
5253import java .util .Collection ;
54+ import java .util .Iterator ;
5355import java .util .Map ;
5456import java .util .concurrent .ExecutorService ;
5557import java .util .stream .Collectors ;
5658
59+ import static org .neo4j .graphalgo .core .utils .paged .SparseLongArray .SUPER_BLOCK_SHIFT ;
5760import static org .neo4j .graphalgo .utils .StringFormatting .formatWithLocale ;
5861
5962final class NodesFilter {
@@ -78,6 +81,13 @@ static FilteredNodes filterNodes(
7881 LongToLongFunction originalIdFunction ;
7982 LongToLongFunction internalIdFunction ;
8083
84+ // Partitions over the id space are created depending on the id map
85+ // implementation. For the BitIdMap, we need to make sure that the
86+ // ranges of original ids in each partition are aligned with the
87+ // block size used for creating the BitIdMap. For the regular IdMap,
88+ // we use range partitioning.
89+ Iterator <Partition > partitions ;
90+
8191 var inputNodes = graphStore .nodes ();
8292
8393 var nodesBuilderBuilder = GraphFactory .initNodesBuilder ()
@@ -112,39 +122,42 @@ static FilteredNodes filterNodes(
112122 // We signal the nodes builder to use the block-based
113123 // BitIdMap builder.
114124 nodesBuilderBuilder .hasDisjointPartitions (true );
125+ // Create partitions that are aligned to the blocks that
126+ // original ids belong to. We must guarantee, that no two
127+ // partitions contain ids that belong to the same block.
128+ partitions = PartitionUtils .blockAlignedPartitioning (
129+ sortedOriginalIds ,
130+ SUPER_BLOCK_SHIFT ,
131+ partition -> partition
132+ );
133+
115134 progressLogger .finishSubTask ("Prepare node ids" );
116135 } else {
117136 // If we need to construct a regular IdMap, we can just
118137 // delegate to the input node id mapping and use the
119138 // internal id as given.
120139 originalIdFunction = inputNodes ::toOriginalNodeId ;
121140 internalIdFunction = (id ) -> id ;
141+
142+ partitions = PartitionUtils
143+ .rangePartition (concurrency , graphStore .nodeCount (), partition -> partition )
144+ .iterator ();
122145 }
123146
124147 var nodesBuilder = nodesBuilderBuilder .build ();
125148
126- var nodeFilterTasks = PartitionUtils .numberAlignedPartitioning (
127- concurrency ,
128- graphStore .nodeCount (),
129- // We need to make sure to align the partition size
130- // with the block size in the SLA, which is the main
131- // data structure of the BitIdMap. If partition sizes
132- // are unaligned, wrong internal ids will be generated
133- // during import.
134- SparseLongArray .SUPER_BLOCK_SIZE ,
135- partition -> new NodeFilterTask (
136- partition ,
137- expression ,
138- graphStore ,
139- originalIdFunction ,
140- internalIdFunction ,
141- nodesBuilder ,
142- progressLogger
143- )
149+ var tasks = NodeFilterTask .of (
150+ graphStore ,
151+ expression ,
152+ partitions ,
153+ originalIdFunction ,
154+ internalIdFunction ,
155+ nodesBuilder ,
156+ progressLogger
144157 );
145158
146159 progressLogger .startSubTask ("Nodes" ).reset (graphStore .nodeCount ());
147- ParallelUtil .runWithConcurrency (concurrency , nodeFilterTasks , executorService );
160+ ParallelUtil .runWithConcurrency (concurrency , tasks , executorService );
148161 progressLogger .finishSubTask ("Nodes" );
149162
150163 var nodeMappingAndProperties = nodesBuilder .build ();
@@ -154,7 +167,6 @@ static FilteredNodes filterNodes(
154167 var filteredNodePropertyStores = filterNodeProperties (
155168 filteredNodeMapping ,
156169 graphStore ,
157- executorService ,
158170 concurrency ,
159171 progressLogger
160172 );
@@ -196,7 +208,6 @@ private static HugeLongArray sortOriginalIds(
196208 private static Map <NodeLabel , NodePropertyStore > filterNodeProperties (
197209 NodeMapping filteredNodeMapping ,
198210 GraphStore inputGraphStore ,
199- ExecutorService executorService ,
200211 int concurrency ,
201212 ProgressLogger progressLogger
202213 ) {
@@ -220,7 +231,6 @@ private static Map<NodeLabel, NodePropertyStore> filterNodeProperties(
220231 nodeLabel ,
221232 propertyKeys ,
222233 concurrency ,
223- executorService ,
224234 progressLogger
225235 );
226236
@@ -234,11 +244,10 @@ private static Map<NodeLabel, NodePropertyStore> filterNodeProperties(
234244
235245 private static NodePropertyStore createNodePropertyStore (
236246 GraphStore inputGraphStore ,
237- NodeMapping filteredMapping ,
247+ IdMapping filteredMapping ,
238248 NodeLabel nodeLabel ,
239249 Collection <String > propertyKeys ,
240250 int concurrency ,
241- ExecutorService executorService ,
242251 ProgressLogger progressLogger
243252 ) {
244253 var builder = NodePropertyStore .builder ();
@@ -382,6 +391,35 @@ private static final class NodeFilterTask implements Runnable {
382391 private final LongToLongFunction internalIdFunction ;
383392 private final NodesBuilder nodesBuilder ;
384393
394+ static Iterator <NodeFilterTask > of (
395+ GraphStore graphStore ,
396+ Expression expression ,
397+ Iterator <Partition > partitions ,
398+ LongToLongFunction originalIdFunction ,
399+ LongToLongFunction internalIdFunction ,
400+ NodesBuilder nodesBuilder ,
401+ ProgressLogger progressLogger
402+ ) {
403+ return new AbstractIterator <>() {
404+ @ Override
405+ protected NodeFilterTask fetch () {
406+ if (!partitions .hasNext ()) {
407+ return done ();
408+ }
409+
410+ return new NodeFilterTask (
411+ partitions .next (),
412+ expression ,
413+ graphStore ,
414+ originalIdFunction ,
415+ internalIdFunction ,
416+ nodesBuilder ,
417+ progressLogger
418+ );
419+ }
420+ };
421+ }
422+
385423 private NodeFilterTask (
386424 Partition partition ,
387425 Expression expression ,
0 commit comments