diff --git a/lib/task.ml b/lib/task.ml index b74576b..d997d23 100644 --- a/lib/task.ml +++ b/lib/task.ml @@ -110,21 +110,31 @@ let parallel_for ?(chunk_size=0) ~start ~finish ~body pool = let parallel_scan pool op elements = + let n = Array.length elements in + let p = (Array.length pool.domains) + 1 in + let prefix_s = Array.copy elements in + let scan_part op elements prefix_sum start finish = - assert (Array.length elements > (finish - start)); + assert (Array.length elements >= (finish - start)); for i = (start + 1) to finish do prefix_sum.(i) <- op prefix_sum.(i - 1) elements.(i) done in + + if (n <= p) || (p = 1) then begin + (* Performs a sequential scan when number of elements is less than or equal + to the pool size or if the number of domains is one *) + scan_part op elements prefix_s 0 (n - 1); + prefix_s + end + else begin + let add_offset op prefix_sum offset start finish = - assert (Array.length prefix_sum > (finish - start)); + assert (Array.length prefix_sum >= (finish - start)); for i = start to finish do prefix_sum.(i) <- op offset prefix_sum.(i) done in - let n = Array.length elements in - let p = (Array.length pool.domains) + 1 in - let prefix_s = Array.copy elements in parallel_for pool ~chunk_size:1 ~start:0 ~finish:(p - 1) ~body:(fun i -> @@ -148,5 +158,14 @@ let parallel_scan pool op elements = let offset = prefix_s.(s - 1) in add_offset op prefix_s offset s e ); - prefix_s + end + + +let parallel_map pool ?(chunk_size=0) f arr = + let len = Array.length arr in + let res = Array.make len (f arr.(0)) in + parallel_for ~chunk_size ~start:1 ~finish:(len - 1) + ~body:(fun i -> + res.(i) <- (f arr.(i))) pool; + res diff --git a/lib/task.mli b/lib/task.mli index 2849fa7..49e20eb 100644 --- a/lib/task.mli +++ b/lib/task.mli @@ -53,3 +53,9 @@ val parallel_scan : pool -> ('a -> 'a -> 'a) -> 'a array -> 'a array * intermediate values. The reduce operations are performed in an arbitrary * order and the reduce function needs to be commutative and associative in * order to obtain a deterministic result *) + +val parallel_map : pool -> ?chunk_size:int -> ('a -> 'b ) -> 'a array + -> 'b array +(** [parallel_map p c f arr] is similar to [Array.map], but runs the computation + * in parallel. The [chunk_size] parameter is optional, when not provided + * defaults to the chunk size computed in [parallel_for] *) diff --git a/test/test_task.ml b/test/test_task.ml index 7f2209f..66b6bf5 100644 --- a/test/test_task.ml +++ b/test/test_task.ml @@ -25,19 +25,24 @@ let sum_sequence pool chunk_size init = fun () -> (* Parallel scan *) -let prefix_sum pool = fun () -> +let prefix_sum pool n = fun () -> let prefix_s l = List.rev (List.fold_left (fun a y -> match a with | [] -> [y] | x::_ -> (x+y)::a) [] l) in - let arr = Array.make 1000 1 in + let arr = Array.make n 1 in let v1 = Task.parallel_scan pool (+) arr in let ls = Array.to_list arr in let v2 = prefix_s ls in assert (v1 = Array.of_list v2) +let parallel_map pool chunk_size = fun () -> + let arr = Array.init 1000 (fun i -> i) in + let res_1 = + Task.parallel_map pool ~chunk_size (fun x -> x + 3) arr in + let res_2 = Array.map (fun x -> x + 3) arr in + assert (res_1 = res_2) -let () = - let pool = Task.setup_pool ~num_additional_domains:3 in +let run_all pool = fun () -> modify_arr pool 0 (); modify_arr pool 25 (); modify_arr pool 100 (); @@ -51,6 +56,20 @@ let () = sum_sequence pool 1 10 (); sum_sequence pool 100 10 (); sum_sequence pool 100 100 (); - prefix_sum pool (); + prefix_sum pool 1000 (); + prefix_sum pool 3 (); + parallel_map pool 0 (); + parallel_map pool 10 (); + parallel_map pool 100 () + +let () = + let pool = Task.setup_pool ~num_additional_domains:3 in + run_all pool (); Task.teardown_pool pool; + let pool2 = Task.setup_pool ~num_additional_domains:0 in + run_all pool2 (); + Task.teardown_pool pool2; + let pool3 = Task.setup_pool ~num_additional_domains:31 in + run_all pool3 (); + Task.teardown_pool pool3; print_endline "ok" \ No newline at end of file