|
16 | 16 | import ibllib.oneibl.registration as registration
|
17 | 17 |
|
18 | 18 | _logger = logging.getLogger('ibllib')
|
19 |
| -LARGE_TASKS = ['EphysVideoCompress', 'TrainingVideoCompress', 'SpikeSorting', 'EphysDLC'] # 'TrainingDLC', |
| 19 | +LARGE_TASKS = ['EphysVideoCompress', 'TrainingVideoCompress', 'SpikeSorting', 'EphysDLC'] |
20 | 20 |
|
21 | 21 |
|
22 | 22 | def _get_pipeline_class(session_path, one):
|
@@ -135,39 +135,39 @@ def job_creator(root_path, one=None, dry=False, rerun=False, max_md5_size=None):
|
135 | 135 | return all_datasets
|
136 | 136 |
|
137 | 137 |
|
138 |
| -def job_runner(subjects_path, mode='all', lab=None, dry=False, one=None, count=5): |
| 138 | +def task_queue(mode='all', lab=None, one=None): |
139 | 139 | """
|
140 |
| - Function to be used as a process to run the jobs as they are created on the database |
141 |
| - This will query waiting jobs from the specified Lab |
142 |
| - :param subjects_path: on servers: /mnt/s0/Data/Subjects. Contains sessions |
143 |
| - :param mode: Whether to run all jobs, or only small or large (video compression, DLC, spike sorting) jobs |
144 |
| - :param lab: lab name as per Alyx |
145 |
| - :param dry: |
146 |
| - :param count: |
147 |
| - :return: |
| 140 | + Query waiting jobs from the specified Lab |
| 141 | + :param mode: Whether to return all waiting tasks, or only small or large (specified in LARGE_TASKS) jobs |
| 142 | + :param lab: lab name as per Alyx, otherwise try to infer from local globus install |
| 143 | + :param one: ONE instance |
| 144 | + ------- |
| 145 | +
|
148 | 146 | """
|
149 | 147 | if one is None:
|
150 | 148 | one = ONE(cache_rest=None)
|
151 | 149 | if lab is None:
|
| 150 | + _logger.info("Trying to infer lab from globus installation") |
152 | 151 | lab = _get_lab(one)
|
153 | 152 | if lab is None:
|
| 153 | + _logger.error("No lab provided or found") |
154 | 154 | return # if the lab is none, this will return empty tasks each time
|
155 | 155 | # Filter for tasks
|
156 | 156 | if mode == 'all':
|
157 |
| - tasks = one.alyx.rest('tasks', 'list', status='Waiting', |
158 |
| - django=f'session__lab__name__in,{lab}', no_cache=True) |
| 157 | + waiting_tasks = one.alyx.rest('tasks', 'list', status='Waiting', |
| 158 | + django=f'session__lab__name__in,{lab}', no_cache=True) |
159 | 159 | elif mode == 'small':
|
160 | 160 | tasks_all = one.alyx.rest('tasks', 'list', status='Waiting',
|
161 | 161 | django=f'session__lab__name__in,{lab}', no_cache=True)
|
162 |
| - tasks = [t for t in tasks_all if t['name'] not in LARGE_TASKS] |
| 162 | + waiting_tasks = [t for t in tasks_all if t['name'] not in LARGE_TASKS] |
163 | 163 | elif mode == 'large':
|
164 |
| - tasks = one.alyx.rest('tasks', 'list', status='Waiting', |
165 |
| - django=f'session__lab__name__in,{lab},name__in,{LARGE_TASKS}', no_cache=True) |
| 164 | + waiting_tasks = one.alyx.rest('tasks', 'list', status='Waiting', |
| 165 | + django=f'session__lab__name__in,{lab},name__in,{LARGE_TASKS}', no_cache=True) |
166 | 166 |
|
167 | 167 | # Order tasks by priority
|
168 |
| - tasks = sorted(tasks, key=lambda d: d['priority'], reverse=True) |
| 168 | + sorted_tasks = sorted(waiting_tasks, key=lambda d: d['priority'], reverse=True) |
169 | 169 |
|
170 |
| - tasks_runner(subjects_path, tasks, one=one, count=count, time_out=3600, dry=dry) |
| 170 | + return sorted_tasks |
171 | 171 |
|
172 | 172 |
|
173 | 173 | def tasks_runner(subjects_path, tasks_dict, one=None, dry=False, count=5, time_out=None, **kwargs):
|
|
0 commit comments