3232 'mock' ,
3333 'after_idle_for' ,
3434 'rayvens_logs' ,
35- 'alg_num_cpus'
35+ 'alg_num_cpus' ,
36+ 'data_num_cpus'
3637 ]
3738 )
3839
@@ -194,20 +195,20 @@ def _number_of_iterations(process_input, args, process_type):
194195def _get_extra_input (input_name , process_type , configs , args , buckets ):
195196 try :
196197 if process_type == 'input' :
197- extra = {}
198+ extra = {'mock' : args . mock }
198199 elif process_type == 'objective' :
199- extra = {}
200+ extra = {'mock' : args . mock }
200201 elif process_type == 'data' :
201202 files = files_from_data (input_name )
202203 storage = Storage (configs )
203204 objective = storage .get (buckets ['objectives_dest' ],files ['objective' ])
204- extra = {'objective' : json .load (objective )}
205+ extra = {'num_cpus' : args . data_num_cpus , ' objective' : json .load (objective ), 'mock' : args . mock }
205206 elif process_type == 'solution' :
206207 files = files_from_solution (input_name )
207208 storage = Storage (configs )
208209 objective = storage .get (buckets ['objectives_dest' ],files ['objective' ])
209210 data = storage .get (buckets ['data_dest' ],files ['data' ])
210- extra = {'is_mcmc' : args .mcmc , 'objective' : json .load (objective ), 'data' : pd .read_csv (data )}
211+ extra = {'is_mcmc' : args .mcmc , 'objective' : json .load (objective ), 'data' : pd .read_csv (data ), 'mock' : args . mock }
211212 else :
212213 extra = None
213214 return extra
@@ -222,10 +223,19 @@ def _get_extra_input(input_name, process_type, configs, args, buckets):
222223
223224def _process (process_type , configs , args , buckets , ** kwargs ):
224225 def proc (f ):
225-
226- @ray .remote (num_cpus = 1 )
227- def f_dist (* args ,** kwargs ):
228- return f (* args ,** kwargs )
226+
227+ if process_type == 'data' :
228+ @ray .remote (num_cpus = args .data_num_cpus )
229+ def f_dist (* args ,** kwargs ):
230+ return f (* args ,** kwargs )
231+ elif process_type == 'solution' :
232+ @ray .remote (num_cpus = args .alg_num_cpus )
233+ def f_dist (* args ,** kwargs ):
234+ return f (* args ,** kwargs )
235+ else :
236+ @ray .remote (num_cpus = 1 )
237+ def f_dist (* args ,** kwargs ):
238+ return f (* args ,** kwargs )
229239
230240 def inner (context , event ):
231241
@@ -253,12 +263,8 @@ def inner(context, event):
253263
254264 extra = _get_extra_input (input_name , process_type , configs , args , buckets )
255265 assert extra is not None , 'Extra input is None for event {}.' .format (input_name )
256- if args .logger :
257- if extra :
258- print ('({}) INFO ... Process working on event {} uses extra input {}.' .format (process_type ,input_name ,list (extra .keys ())))
259- else :
260- print ('({}) INFO ... Process working on event {} does not require extra input.' .format (process_type ,input_name ))
261- extra = {** extra ,** kwargs ,** configs ,** {'mock' : args .mock }}
266+
267+ extra = {** extra ,** kwargs ,** configs }
262268
263269 if args .distribute :
264270 _ = [f_dist .remote (context , process_input , input_name , ** extra ) for _ in range (n )]
@@ -311,8 +317,9 @@ def run(generate_user_solution, configs_file, **kwargs):
311317 after_idle_for = kwargs ['after_idle_for' ] if 'after_idle_for' in kwargs else 200
312318 rayvens_logs = kwargs ['rayvens_logs' ] if 'rayvens_logs' in kwargs else False
313319 alg_num_cpus = int (kwargs ['alg_num_cpus' ]) if 'alg_num_cpus' in kwargs else 1
320+ data_num_cpus = int (kwargs ['data_num_cpus' ]) if 'data_num_cpus' in kwargs else 1
314321
315- args = Args (objectives , datasets , feasibility_regions , run_mode , distribute , mcmc , logger , mock , after_idle_for , rayvens_logs , alg_num_cpus )
322+ args = Args (objectives , datasets , feasibility_regions , run_mode , distribute , mcmc , logger , mock , after_idle_for , rayvens_logs , alg_num_cpus , data_num_cpus )
316323
317324 if args .run_mode == 'operator' :
318325 ray .init (address = 'auto' )
@@ -356,7 +363,7 @@ def generate_datasets(context, process_input, input_name, **kwargs):
356363 else :
357364 print ('({}) ERROR ... generated dataset {} not published to context. Either `s3` or `local` missing from cofnigs or both feature.' .format ('objective' ,generated_file ))
358365
359- @ray .remote (num_cpus = args . alg_num_cpus )
366+ @ray .remote (num_cpus = 1 )
360367 @_process ('data' , configs , args , buckets , ** kwargs )
361368 def generate_solutions (context , process_input , input_name , ** kwargs ):
362369 solution , generated_file = generate_user_solution (process_input , input_name , ** kwargs )
0 commit comments