From 07b3600377f9038f8add81945295cd9b621cbd04 Mon Sep 17 00:00:00 2001 From: Sudha Parimala Date: Mon, 2 Aug 2021 13:38:19 +0530 Subject: [PATCH 1/3] add a fast path in parallel scan parallel scan runs a sequential version when the number of elements in the array is less than the number of domains in the pool or when the number of domains in the pool is one --- lib/task.ml | 23 +++++++++++++++++------ test/test_task.ml | 21 +++++++++++++++------ 2 files changed, 32 insertions(+), 12 deletions(-) diff --git a/lib/task.ml b/lib/task.ml index b74576b..3804a5c 100644 --- a/lib/task.ml +++ b/lib/task.ml @@ -110,21 +110,31 @@ let parallel_for ?(chunk_size=0) ~start ~finish ~body pool = let parallel_scan pool op elements = + let n = Array.length elements in + let p = (Array.length pool.domains) + 1 in + let prefix_s = Array.copy elements in + let scan_part op elements prefix_sum start finish = - assert (Array.length elements > (finish - start)); + assert (Array.length elements >= (finish - start)); for i = (start + 1) to finish do prefix_sum.(i) <- op prefix_sum.(i - 1) elements.(i) done in + + if (n <= p) || (p = 1) then begin + (* Performs a sequential scan when number of elements is less than or equal + to the pool size or if the number of domains is one *) + scan_part op elements prefix_s 0 (n - 1); + prefix_s + end + else begin + let add_offset op prefix_sum offset start finish = - assert (Array.length prefix_sum > (finish - start)); + assert (Array.length prefix_sum >= (finish - start)); for i = start to finish do prefix_sum.(i) <- op offset prefix_sum.(i) done in - let n = Array.length elements in - let p = (Array.length pool.domains) + 1 in - let prefix_s = Array.copy elements in parallel_for pool ~chunk_size:1 ~start:0 ~finish:(p - 1) ~body:(fun i -> @@ -148,5 +158,6 @@ let parallel_scan pool op elements = let offset = prefix_s.(s - 1) in add_offset op prefix_s offset s e ); - prefix_s + end + diff --git a/test/test_task.ml b/test/test_task.ml index 7f2209f..2dcfedb 100644 --- a/test/test_task.ml +++ b/test/test_task.ml @@ -25,19 +25,17 @@ let sum_sequence pool chunk_size init = fun () -> (* Parallel scan *) -let prefix_sum pool = fun () -> +let prefix_sum pool n = fun () -> let prefix_s l = List.rev (List.fold_left (fun a y -> match a with | [] -> [y] | x::_ -> (x+y)::a) [] l) in - let arr = Array.make 1000 1 in + let arr = Array.make n 1 in let v1 = Task.parallel_scan pool (+) arr in let ls = Array.to_list arr in let v2 = prefix_s ls in assert (v1 = Array.of_list v2) - -let () = - let pool = Task.setup_pool ~num_additional_domains:3 in +let run_all pool = fun () -> modify_arr pool 0 (); modify_arr pool 25 (); modify_arr pool 100 (); @@ -51,6 +49,17 @@ let () = sum_sequence pool 1 10 (); sum_sequence pool 100 10 (); sum_sequence pool 100 100 (); - prefix_sum pool (); + prefix_sum pool 1000 (); + prefix_sum pool 3 () + +let () = + let pool = Task.setup_pool ~num_additional_domains:3 in + run_all pool (); Task.teardown_pool pool; + let pool2 = Task.setup_pool ~num_additional_domains:0 in + run_all pool2 (); + Task.teardown_pool pool2; + let pool3 = Task.setup_pool ~num_additional_domains:31 in + run_all pool3 (); + Task.teardown_pool pool3; print_endline "ok" \ No newline at end of file From 24853f67e3e061428813e82c33a1c93c6746d79f Mon Sep 17 00:00:00 2001 From: Sudha Parimala Date: Tue, 3 Aug 2021 11:09:50 +0530 Subject: [PATCH 2/3] parallel_map for arrays --- lib/task.ml | 8 ++++++++ lib/task.mli | 3 +++ test/test_task.ml | 12 +++++++++++- 3 files changed, 22 insertions(+), 1 deletion(-) diff --git a/lib/task.ml b/lib/task.ml index 3804a5c..d997d23 100644 --- a/lib/task.ml +++ b/lib/task.ml @@ -161,3 +161,11 @@ let parallel_scan pool op elements = prefix_s end + +let parallel_map pool ?(chunk_size=0) f arr = + let len = Array.length arr in + let res = Array.make len (f arr.(0)) in + parallel_for ~chunk_size ~start:1 ~finish:(len - 1) + ~body:(fun i -> + res.(i) <- (f arr.(i))) pool; + res diff --git a/lib/task.mli b/lib/task.mli index 2849fa7..7682600 100644 --- a/lib/task.mli +++ b/lib/task.mli @@ -53,3 +53,6 @@ val parallel_scan : pool -> ('a -> 'a -> 'a) -> 'a array -> 'a array * intermediate values. The reduce operations are performed in an arbitrary * order and the reduce function needs to be commutative and associative in * order to obtain a deterministic result *) + +val parallel_map : pool -> ?chunk_size:int -> ('a -> 'b ) -> 'a array + -> 'b array diff --git a/test/test_task.ml b/test/test_task.ml index 2dcfedb..66b6bf5 100644 --- a/test/test_task.ml +++ b/test/test_task.ml @@ -35,6 +35,13 @@ let prefix_sum pool n = fun () -> let v2 = prefix_s ls in assert (v1 = Array.of_list v2) +let parallel_map pool chunk_size = fun () -> + let arr = Array.init 1000 (fun i -> i) in + let res_1 = + Task.parallel_map pool ~chunk_size (fun x -> x + 3) arr in + let res_2 = Array.map (fun x -> x + 3) arr in + assert (res_1 = res_2) + let run_all pool = fun () -> modify_arr pool 0 (); modify_arr pool 25 (); @@ -50,7 +57,10 @@ let run_all pool = fun () -> sum_sequence pool 100 10 (); sum_sequence pool 100 100 (); prefix_sum pool 1000 (); - prefix_sum pool 3 () + prefix_sum pool 3 (); + parallel_map pool 0 (); + parallel_map pool 10 (); + parallel_map pool 100 () let () = let pool = Task.setup_pool ~num_additional_domains:3 in From 93727bba32152eb98311a67492e0d55b6ebbebb2 Mon Sep 17 00:00:00 2001 From: Sudha Parimala Date: Tue, 3 Aug 2021 11:21:52 +0530 Subject: [PATCH 3/3] parallel_map doc --- lib/task.mli | 3 +++ 1 file changed, 3 insertions(+) diff --git a/lib/task.mli b/lib/task.mli index 7682600..49e20eb 100644 --- a/lib/task.mli +++ b/lib/task.mli @@ -56,3 +56,6 @@ val parallel_scan : pool -> ('a -> 'a -> 'a) -> 'a array -> 'a array val parallel_map : pool -> ?chunk_size:int -> ('a -> 'b ) -> 'a array -> 'b array +(** [parallel_map p c f arr] is similar to [Array.map], but runs the computation + * in parallel. The [chunk_size] parameter is optional, when not provided + * defaults to the chunk size computed in [parallel_for] *)