Skip to content

Commit 9ee646b

Browse files
authored
Merge pull request #2 from IBM/file_sys
File system and mock mode
2 parents 06fb383 + 0abe0c8 commit 9ee646b

File tree

6 files changed

+695
-96
lines changed

6 files changed

+695
-96
lines changed

doframework/api.py

Lines changed: 148 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -8,80 +8,102 @@
88

99
import ray
1010
import rayvens
11-
import ibm_boto3
12-
import boto3
1311

12+
from doframework.core.inputs import get_configs
13+
from doframework.core.storage import Storage, CamelKeysDict
1414
from doframework.flow.objectives import generate_objective, calculate_objectives
1515
from doframework.flow.datasets import generate_dataset
1616
from doframework.flow.solutions import generate_solution, files_from_data
1717
from doframework.flow.metrics import generate_metric, files_from_solution
18+
from doframework.flow.mock import generate_objective_mock, generate_dataset_mock, generate_solution_mock, generate_metric_mock, sleep_time
1819

1920
#################################################################################################################
2021
#################################################################################################################
2122

22-
def _get_s3_object(configs):
23-
24-
s3 = configs['s3']
25-
26-
assert 'cloud_service_provider' in s3, 'Missing s3:cloud_service_provider in configs.'
27-
assert s3['cloud_service_provider'] in ['ibm','aws'], 'cloud_service_provider in configs must be either `aws` or `ibm`.'
28-
29-
if s3['cloud_service_provider'] == 'ibm':
30-
31-
return ibm_boto3.resource(service_name='s3',
32-
region_name=s3['region'],
33-
endpoint_url=s3['endpoint_url'],
34-
aws_access_key_id=s3['aws_access_key_id'],
35-
aws_secret_access_key=s3['aws_secret_access_key'])
23+
Args = namedtuple(
24+
'Args',[
25+
'objectives',
26+
'datasets',
27+
'feasibility_regions',
28+
'run_mode',
29+
'distribute',
30+
'mcmc',
31+
'logger',
32+
'mock',
33+
'after_idle_for',
34+
'rayvens_logs',
35+
'alg_num_cpus'
36+
]
37+
)
3638

37-
if s3['cloud_service_provider'] == 'aws':
39+
GenerateFunctionsDict = {
40+
'objective': generate_objective,
41+
'objective_mock': generate_objective_mock,
42+
'data' : generate_dataset,
43+
'data_mock' : generate_dataset_mock,
44+
'solution' : generate_solution,
45+
'solution_mock' : generate_solution_mock,
46+
'metric' : generate_metric,
47+
'metric_mock' : generate_metric_mock,
48+
}
3849

39-
return boto3.resource(service_name='s3',
40-
region_name=s3['region'],
41-
aws_access_key_id=s3['aws_access_key_id'],
42-
aws_secret_access_key=s3['aws_secret_access_key'])
50+
#################################################################################################################
51+
#################################################################################################################
4352

44-
def _get_buckets(configs):
53+
def _get_source_config(from_bucket, to_bucket, configs):
4554

46-
s3_buckets = _get_s3_object(configs).buckets.all()
47-
s3_buckets = [bucket.name for bucket in s3_buckets]
55+
d = {}
4856

49-
return {name: bucket for name, bucket in configs['s3']['buckets'].items() if bucket in s3_buckets}
50-
51-
def _get_source_config(from_bucket, to_bucket, configs):
57+
if 's3' in configs:
5258

53-
s3 = configs['s3']
59+
s3 = configs['s3']
5460

55-
d = dict(kind='cloud-object-storage-source',
56-
name='source',
57-
bucket_name=from_bucket,
58-
access_key_id=s3['aws_access_key_id'],
59-
secret_access_key=s3['aws_secret_access_key'],
60-
region=s3['region'],
61-
move_after_read=to_bucket
62-
)
61+
d = dict(kind='cloud-object-storage-source',
62+
bucket_name=from_bucket,
63+
access_key_id=s3['aws_access_key_id'],
64+
secret_access_key=s3['aws_secret_access_key'],
65+
region=s3['region'],
66+
move_after_read=to_bucket
67+
)
6368

64-
if 'endpoint_url' in s3:
69+
if 'endpoint_url' in s3:
6570

66-
d = {**d,**dict(endpoint=s3['endpoint_url'])}
71+
d = {**d,**dict(endpoint=s3['endpoint_url'])}
72+
73+
if 'local' in configs:
74+
75+
d = dict(kind='file-source',
76+
path=from_bucket,
77+
keep_file=False,
78+
move_after_read=to_bucket
79+
)
6780

6881
return d
6982

7083
def _get_sink_config(to_bucket, configs):
7184

72-
s3 = configs['s3']
85+
d = {}
7386

74-
d = dict(kind='cloud-object-storage-sink',
75-
name='sink',
76-
bucket_name=to_bucket,
77-
access_key_id=s3['aws_access_key_id'],
78-
secret_access_key=s3['aws_secret_access_key'],
79-
region=s3['region']
80-
)
87+
if 's3' in configs:
88+
89+
s3 = configs['s3']
8190

82-
if 'endpoint_url' in s3:
91+
d = dict(kind='cloud-object-storage-sink',
92+
bucket_name=to_bucket,
93+
access_key_id=s3['aws_access_key_id'],
94+
secret_access_key=s3['aws_secret_access_key'],
95+
region=s3['region']
96+
)
8397

84-
d = {**d,**dict(endpoint=s3['endpoint_url'])}
98+
if 'endpoint_url' in s3:
99+
100+
d = {**d,**dict(endpoint=s3['endpoint_url'])}
101+
102+
if 'local' in configs:
103+
104+
d = dict(kind='file-sink',
105+
path=to_bucket
106+
)
85107

86108
return d
87109

@@ -127,10 +149,29 @@ def _get_event_type(process_type,args):
127149
print('({}) ERROR ... '.format(process_type) + e)
128150
return None
129151

152+
def _get_event(process_type,process_json,event_type,args):
153+
try:
154+
if event_type == 'json':
155+
process_input = json.loads(process_json['body'])
156+
elif event_type == 'csv':
157+
process_input = pd.read_csv(StringIO(process_json['body']))
158+
else:
159+
process_input = None
160+
return process_input
161+
except json.JSONDecodeError as e:
162+
if args.logger:
163+
print('({}) ERROR ... Error occured while decoding file {} in json loads.'.format(process_json['filename']))
164+
print('({}) ERROR ... '.format(process_type) + e)
165+
except Exception as e:
166+
if args.logger:
167+
print('({}) ERROR ... Error occured when extracting event content.'.format(process_type))
168+
print('({}) ERROR ... '.format(process_type) + e)
169+
return None
170+
130171
def _number_of_iterations(process_input, args, process_type):
131172
try:
132173
if process_type == 'input':
133-
n = calculate_objectives(process_input,args)
174+
n = args.objectives if args.mock else calculate_objectives(process_input,args)
134175
elif process_type == 'objective':
135176
n = args.datasets
136177
elif process_type == 'data':
@@ -140,6 +181,10 @@ def _number_of_iterations(process_input, args, process_type):
140181
else:
141182
n = None
142183
return n
184+
except KeyError as e:
185+
if args.logger:
186+
print('({}) ERROR ... Error occured when calculating n in number_of_iterations.'.format(process_type))
187+
print('({}) ERROR ... '.format(process_type) + e)
143188
except Exception as e:
144189
if args.logger:
145190
print('({}) ERROR ... Error occured when calculating n in number_of_iterations.'.format(process_type))
@@ -154,14 +199,15 @@ def _get_extra_input(input_name, process_type, configs, args, buckets):
154199
extra = {}
155200
elif process_type == 'data':
156201
files = files_from_data(input_name)
157-
objective = _get_s3_object(configs).Bucket(buckets['objectives_dest']).Object(files['objective']).get()
158-
extra = {'objective': json.load(objective['Body'])}
202+
storage = Storage(configs)
203+
objective = storage.get(buckets['objectives_dest'],files['objective'])
204+
extra = {'objective': json.load(objective)}
159205
elif process_type == 'solution':
160206
files = files_from_solution(input_name)
161-
s3 = _get_s3_object(configs)
162-
objective = s3.Bucket(buckets['objectives_dest']).Object(files['objective']).get()
163-
data = s3.Bucket(buckets['data_dest']).Object(files['data']).get()
164-
extra = {'is_mcmc': args.mcmc, 'objective': json.load(objective['Body']), 'data': pd.read_csv(data['Body'])}
207+
storage = Storage(configs)
208+
objective = storage.get(buckets['objectives_dest'],files['objective'])
209+
data = storage.get(buckets['data_dest'],files['data'])
210+
extra = {'is_mcmc': args.mcmc, 'objective': json.load(objective), 'data': pd.read_csv(data)}
165211
else:
166212
extra = None
167213
return extra
@@ -189,13 +235,7 @@ def inner(context, event):
189235
assert ('body' in process_json) and ('filename' in process_json), 'Missing fields body and / or filename in event json.'
190236

191237
event_type = _get_event_type(process_type,args)
192-
193-
if event_type == 'json':
194-
process_input = json.loads(process_json['body'])
195-
elif event_type == 'csv':
196-
process_input = pd.read_csv(StringIO(process_json['body']))
197-
else:
198-
process_input = None
238+
process_input = _get_event(process_type,process_json,event_type,args)
199239
assert process_input is not None, 'Unable to extract process input. Perhaps illegal event_type={}.'.format(event_type)
200240
if args.logger: print('({}) INFO ... Process successfully extracted event of type {}.'.format(process_type, event_type))
201241

@@ -218,7 +258,7 @@ def inner(context, event):
218258
print('({}) INFO ... Process working on event {} uses extra input {}.'.format(process_type,input_name,list(extra.keys())))
219259
else:
220260
print('({}) INFO ... Process working on event {} does not require extra input.'.format(process_type,input_name))
221-
extra = {**extra,**kwargs}
261+
extra = {**extra,**kwargs,**configs,**{'mock': args.mock}}
222262

223263
if args.distribute:
224264
_ = [f_dist.remote(context, process_input, input_name, **extra) for _ in range(n)]
@@ -249,16 +289,15 @@ def inner(context, event):
249289
def resolve(predict_optimize):
250290

251291
def inner(process_input, input_name, **kwargs):
252-
253-
return generate_solution(predict_optimize, process_input, input_name, **kwargs)
292+
293+
key = 'solution_mock' if 'mock' in kwargs and kwargs['mock'] else 'solution' # !!!
294+
return GenerateFunctionsDict[key](predict_optimize, process_input, input_name, **kwargs)
254295

255296
return inner
256297

257298
#################################################################################################################
258299
#################################################################################################################
259300

260-
Args = namedtuple('Args',['objectives','datasets','feasibility_regions','run_mode','distribute','mcmc','logger','after_idle_for','rayvens_logs','alg_num_cpus'])
261-
262301
def run(generate_user_solution, configs_file, **kwargs):
263302

264303
objectives = int(kwargs['objectives']) if 'objectives' in kwargs else 1
@@ -268,11 +307,12 @@ def run(generate_user_solution, configs_file, **kwargs):
268307
distribute = kwargs['distribute'] if 'distribute' in kwargs else True
269308
mcmc = kwargs['mcmc'] if 'mcmc' in kwargs else False
270309
logger = kwargs['logger'] if 'logger' in kwargs else True
310+
mock = kwargs['mock'] if 'mock' in kwargs else False
271311
after_idle_for = kwargs['after_idle_for'] if 'after_idle_for' in kwargs else 200
272312
rayvens_logs = kwargs['rayvens_logs'] if 'rayvens_logs' in kwargs else False
273313
alg_num_cpus = int(kwargs['alg_num_cpus']) if 'alg_num_cpus' in kwargs else 1
274314

275-
args = Args(objectives, datasets, feasibility_regions, run_mode, distribute, mcmc, logger, after_idle_for, rayvens_logs, alg_num_cpus)
315+
args = Args(objectives, datasets, feasibility_regions, run_mode, distribute, mcmc, logger, mock, after_idle_for, rayvens_logs, alg_num_cpus)
276316

277317
if args.run_mode == 'operator':
278318
ray.init(address='auto')
@@ -283,45 +323,64 @@ def run(generate_user_solution, configs_file, **kwargs):
283323
if args.logger: print('({}) INFO ... Running simulation with args objectives={o} datasets={s} feasibility_regions={r} distribute={d} run_mode={m} logger={l}'.format('root',
284324
o=args.objectives, s=args.datasets, r=args.feasibility_regions, d=args.distribute, m=args.run_mode, l=args.logger))
285325

286-
with open(configs_file,'r') as file:
287-
try:
288-
configs = yaml.safe_load(file)
289-
except yaml.YAMLError as e:
290-
if args.logger:
291-
print('({}) ERROR ... Could not load configs yaml.'.format('root'))
292-
print(e)
293-
raise e
326+
if args.mock:
327+
if args.logger: print('({}) INFO ... Running in MOCK mode.'.format('root'))
294328

295-
buckets = _get_buckets(configs)
329+
configs = get_configs(configs_file)
330+
storage = Storage(configs)
331+
buckets = storage.buckets()
296332

297333
@ray.remote(num_cpus=1)
298334
@_process('input', configs, args, buckets)
299335
def generate_objectives(context, process_input, input_name, **kwargs):
300-
objective, generated_file = generate_objective(process_input, input_name,**kwargs)
301-
event = rayvens.OutputEvent(json.dumps(objective),{"CamelAwsS3Key": generated_file})
302-
context.publish(event)
336+
key = 'objective_mock' if 'mock' in kwargs and kwargs['mock'] else 'objective'
337+
objective, generated_file = GenerateFunctionsDict[key](process_input, input_name, **kwargs)
338+
339+
if any(['local' in kwargs, 's3' in kwargs]) and not all(['local' in kwargs, 's3' in kwargs]):
340+
key = 'local'*('local' in kwargs) + 's3'*('s3' in kwargs)
341+
event = rayvens.OutputEvent(json.dumps(objective),{CamelKeysDict[key]: generated_file})
342+
context.publish(event)
343+
else:
344+
print('({}) ERROR ... generated objective {} not published to context. Either `s3` or `local` missing from cofnigs or both feature.'.format('input',generated_file))
303345

304346
@ray.remote(num_cpus=1)
305347
@_process('objective', configs, args, buckets)
306348
def generate_datasets(context, process_input, input_name, **kwargs):
307-
df, generated_file = generate_dataset(process_input, input_name, **kwargs)
308-
event = rayvens.OutputEvent(df.to_csv(index=False),{"CamelAwsS3Key": generated_file})
309-
context.publish(event)
349+
key = 'data_mock' if 'mock' in kwargs and kwargs['mock'] else 'data'
350+
df, generated_file = GenerateFunctionsDict[key](process_input, input_name, **kwargs)
351+
352+
if any(['local' in kwargs, 's3' in kwargs]) and not all(['local' in kwargs, 's3' in kwargs]):
353+
key = 'local'*('local' in kwargs) + 's3'*('s3' in kwargs)
354+
event = rayvens.OutputEvent(df.to_csv(index=False),{CamelKeysDict[key]: generated_file})
355+
context.publish(event)
356+
else:
357+
print('({}) ERROR ... generated dataset {} not published to context. Either `s3` or `local` missing from cofnigs or both feature.'.format('objective',generated_file))
310358

311359
@ray.remote(num_cpus=args.alg_num_cpus)
312360
@_process('data', configs, args, buckets, **kwargs)
313361
def generate_solutions(context, process_input, input_name, **kwargs):
314362
solution, generated_file = generate_user_solution(process_input, input_name, **kwargs)
315-
event = rayvens.OutputEvent(json.dumps(solution),{"CamelAwsS3Key": generated_file})
316-
context.publish(event)
363+
364+
if any(['local' in kwargs, 's3' in kwargs]) and not all(['local' in kwargs, 's3' in kwargs]):
365+
key = 'local'*('local' in kwargs) + 's3'*('s3' in kwargs)
366+
event = rayvens.OutputEvent(json.dumps(solution),{CamelKeysDict[key]: generated_file})
367+
context.publish(event)
368+
else:
369+
print('({}) ERROR ... generated solution {} not published to context. Either `s3` or `local` missing from cofnigs or both feature.'.format('data',generated_file))
317370

318371
@ray.remote(num_cpus=1)
319372
@_process('solution', configs, args, buckets)
320373
def generate_metrics(context, process_input, input_name, **kwargs):
321-
metric, generated_file = generate_metric(process_input, input_name, **kwargs)
322-
event = rayvens.OutputEvent(json.dumps(metric),{"CamelAwsS3Key": generated_file})
323-
context.publish(event)
324-
374+
key = 'metric_mock' if 'mock' in kwargs and kwargs['mock'] else 'metric'
375+
metric, generated_file = GenerateFunctionsDict[key](process_input, input_name, **kwargs)
376+
377+
if any(['local' in kwargs, 's3' in kwargs]) and not all(['local' in kwargs, 's3' in kwargs]):
378+
key = 'local'*('local' in kwargs) + 's3'*('s3' in kwargs)
379+
event = rayvens.OutputEvent(json.dumps(metric),{CamelKeysDict[key]: generated_file})
380+
context.publish(event)
381+
else:
382+
print('({}) ERROR ... generated metric {} not published to context. Either `s3` or `local` missing from cofnigs or both feature.'.format('solution',generated_file))
383+
325384
sources = ['inputs', 'objectives', 'data', 'solutions']
326385
targets = ['objectives', 'data', 'solutions', 'metrics_dest']
327386
operators = [generate_objectives, generate_datasets, generate_solutions, generate_metrics]

0 commit comments

Comments
 (0)