3232from graph_pattern import TARGET_VAR
3333from graph_pattern import ASK_VAR
3434from graph_pattern import COUNT_VAR
35+ from graph_pattern import NODE_VAR_SUM
36+ from graph_pattern import EDGE_VAR_COUNT
3537from utils import chunker
3638from utils import exception_stack_catcher
3739from utils import get_path
@@ -282,7 +284,6 @@ def _combined_chunk_res(q_res, _vars, _ret_val_mapping):
282284 return chunk_res
283285
284286
285-
286287def count_query (sparql , timeout , graph_pattern , source = None ,
287288 ** kwds ):
288289 assert isinstance (graph_pattern , GraphPattern )
@@ -426,21 +427,6 @@ def variable_substitution_query(
426427 )
427428
428429
429- def variable_substitution_deep_narrow_mut_query (
430- sparql , timeout , graph_pattern , edge_var , node_var ,
431- source_target_pairs , limit_res , batch_size = config .BATCH_SIZE ):
432- _vars , _values , _ret_val_mapping = _get_vars_values_mapping (
433- graph_pattern , source_target_pairs )
434- _edge_var_node_var_and_vars = (edge_var , node_var , _vars )
435- return _multi_query (
436- sparql , timeout , graph_pattern , source_target_pairs , batch_size ,
437- _edge_var_node_var_and_vars , _values , _ret_val_mapping ,
438- _var_subst_res_init , _var_subst_dnp_chunk_q ,
439- _var_subst_dnp_chunk_result_ext , limit = limit_res ,
440- # non standard, passed via **kwds, see handling below
441- )
442-
443-
444430# noinspection PyUnusedLocal
445431def _var_subst_res_init (_ , ** kwds ):
446432 return Counter ()
@@ -455,17 +441,6 @@ def _var_subst_chunk_q(gp, _sel_var_and_vars, values_chunk, limit):
455441 limit = limit )
456442
457443
458- def _var_subst_dnp_chunk_q (gp , _edge_var_node_var_and_vars ,
459- values_chunk , limit ):
460- edge_var , node_var , _vars = _edge_var_node_var_and_vars
461- return gp .to_find_edge_var_for_narrow_path_query (
462- edge_var = edge_var ,
463- node_var = node_var ,
464- vars_ = _vars ,
465- values = {_vars : values_chunk },
466- limit_res = limit )
467-
468-
469444# noinspection PyUnusedLocal
470445def _var_subst_chunk_result_ext (q_res , _sel_var_and_vars , _ , ** kwds ):
471446 var , _vars = _sel_var_and_vars
@@ -482,23 +457,70 @@ def _var_subst_chunk_result_ext(q_res, _sel_var_and_vars, _, **kwds):
482457 return chunk_res
483458
484459
485- def _var_subst_dnp_chunk_result_ext (q_res , _edge_var_node_var_and_vars , _ , ** kwds ):
460+ def _var_subst_res_update (res , update , ** _ ):
461+ res += update
462+
463+
464+ def variable_substitution_deep_narrow_mut_query (
465+ sparql , timeout , graph_pattern , edge_var , node_var ,
466+ source_target_pairs , limit_res , batch_size = config .BATCH_SIZE ):
467+ _vars , _values , _ret_val_mapping = _get_vars_values_mapping (
468+ graph_pattern , source_target_pairs )
469+ _edge_var_node_var_and_vars = (edge_var , node_var , _vars )
470+ return _multi_query (
471+ sparql , timeout , graph_pattern , source_target_pairs , batch_size ,
472+ _edge_var_node_var_and_vars , _values , _ret_val_mapping ,
473+ _var_subst_dnp_res_init , _var_subst_dnp_chunk_q ,
474+ _var_subst_dnp_chunk_result_ext ,
475+ _res_update = _var_subst_dnp_update ,
476+ limit = limit_res ,
477+ # non standard, passed via **kwds, see handling below
478+ )
479+
480+
481+ # noinspection PyUnusedLocal
482+ def _var_subst_dnp_res_init (_ , ** kwds ):
483+ return Counter (), Counter ()
484+
485+
486+ def _var_subst_dnp_chunk_q (gp , _edge_var_node_var_and_vars ,
487+ values_chunk , limit ):
486488 edge_var , node_var , _vars = _edge_var_node_var_and_vars
487- chunk_res = Counter ()
489+ return gp .to_find_edge_var_for_narrow_path_query (
490+ edge_var = edge_var ,
491+ node_var = node_var ,
492+ vars_ = _vars ,
493+ values = {_vars : values_chunk },
494+ limit_res = limit )
495+
496+
497+ # noinspection PyUnusedLocal
498+ def _var_subst_dnp_chunk_result_ext (
499+ q_res , _edge_var_node_var_and_vars , _ , ** kwds ):
500+ edge_var , node_var , _vars = _edge_var_node_var_and_vars
501+ chunk_edge_count , chunk_node_sum = Counter (), Counter ()
488502 res_rows_path = ['results' , 'bindings' ]
489503 bindings = sparql_json_result_bindings_to_rdflib (
490504 get_path (q_res , res_rows_path , default = [])
491505 )
492506
493507 for row in bindings :
494508 row_res = get_path (row , [edge_var ])
495- count_res = int (get_path (row , [COUNT_VAR ], '0' ))
496- chunk_res [row_res ] += count_res
497- return chunk_res
509+ edge_count = int (get_path (row , [EDGE_VAR_COUNT ], '0' ))
510+ chunk_edge_count [row_res ] += edge_count
511+ node_sum_count = int (get_path (row , [NODE_VAR_SUM ], '0' ))
512+ chunk_node_sum [row_res ] += node_sum_count
513+ return chunk_edge_count , chunk_node_sum ,
498514
499515
500- def _var_subst_res_update (res , update , ** _ ):
501- res += update
516+ def _var_subst_dnp_update (res , up , ** _ ):
517+ edge_count , node_sum_count = res
518+ try :
519+ chunk_edge_count , chunk_node_sum = up
520+ edge_count .update (chunk_edge_count )
521+ node_sum_count .update (chunk_node_sum )
522+ except ValueError :
523+ pass
502524
503525
504526def generate_stps_from_gp (sparql , gp ):
0 commit comments