From 8b8e8b25a6fceaed5c0ba8353ef9f832cce6e5ee Mon Sep 17 00:00:00 2001 From: olivier Date: Tue, 16 Jul 2024 15:56:53 +0100 Subject: [PATCH 01/30] remove deprecated spike sorting task --- ibllib/pipes/ephys_preprocessing.py | 320 ---------------------------- 1 file changed, 320 deletions(-) diff --git a/ibllib/pipes/ephys_preprocessing.py b/ibllib/pipes/ephys_preprocessing.py index 0e18fbd65..51f10f4ee 100644 --- a/ibllib/pipes/ephys_preprocessing.py +++ b/ibllib/pipes/ephys_preprocessing.py @@ -217,326 +217,6 @@ def _run(self, overwrite=False): return [output_file] -class SpikeSorting(tasks.Task): - """ - (DEPRECATED) Pykilosort 2.5 pipeline - """ - gpu = 1 - io_charge = 100 # this jobs reads raw ap files - priority = 60 - level = 1 # this job doesn't depend on anything - force = True - job_size = 'large' - - SHELL_SCRIPT = Path.home().joinpath( - "Documents/PYTHON/iblscripts/deploy/serverpc/kilosort2/run_pykilosort.sh" - ) - SPIKE_SORTER_NAME = 'pykilosort' - PYKILOSORT_REPO = Path.home().joinpath('Documents/PYTHON/SPIKE_SORTING/pykilosort') - signature = { - 'input_files': [], # see setUp method for declaration of inputs - 'output_files': [] # see setUp method for declaration of inputs - } - - def __init__(self, *args, **kwargs): - warnings.warn('`pipes.ephys_preprocessing.SpikeSorting` to be removed ' - 'in favour of `pipes.ephys_tasks.SpikeSorting`', - FutureWarning) - super().__init__(*args, **kwargs) - - @staticmethod - def spike_sorting_signature(pname=None): - pname = pname if pname is not None else "probe*" - input_signature = [('*ap.meta', f'raw_ephys_data/{pname}', True), - ('*ap.ch', f'raw_ephys_data/{pname}', True), - ('*ap.cbin', f'raw_ephys_data/{pname}', True), - ('*nidq.meta', 'raw_ephys_data', False), - ('_spikeglx_sync.channels.*', 'raw_ephys_data*', True), - ('_spikeglx_sync.polarities.*', 'raw_ephys_data*', True), - ('_spikeglx_sync.times.*', 'raw_ephys_data*', True), - ('_iblrig_taskData.raw.*', 'raw_behavior_data', True), - ('_iblrig_taskSettings.raw.*', 'raw_behavior_data', True)] - output_signature = [('spike_sorting_pykilosort.log', f'spike_sorters/pykilosort/{pname}', True), - ('_iblqc_ephysTimeRmsAP.rms.npy', f'raw_ephys_data/{pname}', True), # new ibllib 2.5 - ('_iblqc_ephysTimeRmsAP.timestamps.npy', f'raw_ephys_data/{pname}', True)] # new ibllib 2.5 - return input_signature, output_signature - - @staticmethod - def _sample2v(ap_file): - md = spikeglx.read_meta_data(ap_file.with_suffix(".meta")) - s2v = spikeglx._conversion_sample2v_from_meta(md) - return s2v["ap"][0] - - @staticmethod - def _fetch_pykilosort_version(repo_path): - init_file = Path(repo_path).joinpath('pykilosort', '__init__.py') - version = SpikeSorting._fetch_ks2_commit_hash(repo_path) # default - try: - with open(init_file) as fid: - lines = fid.readlines() - for line in lines: - if line.startswith("__version__ = "): - version = line.split('=')[-1].strip().replace('"', '').replace("'", '') - except Exception: - pass - return f"pykilosort_{version}" - - @staticmethod - def _fetch_pykilosort_run_version(log_file): - """ - Parse the following line (2 formats depending on version) from the log files to get the version - '\x1b[0m15:39:37.919 [I] ibl:90 Starting Pykilosort version ibl_1.2.1, output in gnagga^[[0m\n' - '\x1b[0m15:39:37.919 [I] ibl:90 Starting Pykilosort version ibl_1.3.0^[[0m\n' - """ - with open(log_file) as fid: - line = fid.readline() - version = re.search('version (.*), output', line) - version = version or re.search('version (.*)', line) # old versions have output, new have a version line - version = re.sub('\\^[[0-9]+m', '', version.group(1)) # removes the coloring tags - return f"pykilosort_{version}" - - @staticmethod - def _fetch_ks2_commit_hash(repo_path): - command2run = f"git --git-dir {repo_path}/.git rev-parse --verify HEAD" - process = subprocess.Popen( - command2run, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE - ) - info, error = process.communicate() - if process.returncode != 0: - _logger.error( - f"Can't fetch pykilsort commit hash, will still attempt to run \n" - f"Error: {error.decode('utf-8')}" - ) - return "" - return info.decode("utf-8").strip() - - @staticmethod - def parse_version(v) -> packaging.version.Version: - """ - Extracts and parses semantic version (major.minor.patch) from a version string. - - Parameters - ---------- - v : str - A version string containing a semantic version. - - Returns - ------- - packaging.version.Version - The parsed semantic version number - - Examples - -------- - >>> SpikeSorting.parse_version('ibl_1.2') - - >>> SpikeSorting.parse_version('pykilosort_ibl_1.2.0-new') - - >>> SpikeSorting.parse_version('ibl_0.2') < SpikeSorting.parse_version('pykilosort_v1') - True - """ - m = re.search(r'(\d+\.?){1,3}', v) - if not m: - raise packaging.version.InvalidVersion(f'Failed to detect SemVer in "{v}"') - return packaging.version.parse(m.group()) - - def setUp(self, probes=None): - """ - Overwrite setup method to allow inputs and outputs to be only one probe - :param probes: list of probes e.g ['probe00'] - :return: - """ - if not probes or len(probes) == 2: - self.signature['input_files'], self.signature['output_files'] = self.spike_sorting_signature() - else: - self.signature['input_files'], self.signature['output_files'] = self.spike_sorting_signature(probes[0]) - - return super().setUp(probes=probes) - - def _run_pykilosort(self, ap_file): - """ - Runs the ks2 matlab spike sorting for one probe dataset - the raw spike sorting output is in session_path/spike_sorters/{self.SPIKE_SORTER_NAME}/probeXX folder - (discontinued support for old spike sortings in the probe folder <1.5.5) - :return: path of the folder containing ks2 spike sorting output - """ - self.version = self._fetch_pykilosort_version(self.PYKILOSORT_REPO) - label = ap_file.parts[-2] # this is usually the probe name - sorter_dir = self.session_path.joinpath("spike_sorters", self.SPIKE_SORTER_NAME, label) - self.FORCE_RERUN = False - if not self.FORCE_RERUN: - log_file = sorter_dir.joinpath(f"spike_sorting_{self.SPIKE_SORTER_NAME}.log") - if log_file.exists(): - run_version = self._fetch_pykilosort_run_version(log_file) - if self.parse_version(run_version) > self.parse_version('pykilosort_ibl_1.1.0'): - _logger.info(f"Already ran: spike_sorting_{self.SPIKE_SORTER_NAME}.log" - f" found in {sorter_dir}, skipping.") - return sorter_dir - else: - self.FORCE_RERUN = True - - print(sorter_dir.joinpath(f"spike_sorting_{self.SPIKE_SORTER_NAME}.log")) - # get the scratch drive from the shell script - with open(self.SHELL_SCRIPT) as fid: - lines = fid.readlines() - line = [line for line in lines if line.startswith("SCRATCH_DRIVE=")][0] - m = re.search(r"\=(.*?)(\#|\n)", line)[0] - scratch_drive = Path(m[1:-1].strip()) - assert scratch_drive.exists() - - # clean up and create directory, this also checks write permissions - # temp_dir has the following shape: pykilosort/ZM_3003_2020-07-29_001_probe00 - # first makes sure the tmp dir is clean - shutil.rmtree(scratch_drive.joinpath(self.SPIKE_SORTER_NAME), ignore_errors=True) - temp_dir = scratch_drive.joinpath( - self.SPIKE_SORTER_NAME, "_".join(list(self.session_path.parts[-3:]) + [label]) - ) - if temp_dir.exists(): # hmmm this has to be decided, we may want to restart ? - # But failed sessions may then clog the scratch dir and have users run out of space - shutil.rmtree(temp_dir, ignore_errors=True) - temp_dir.mkdir(parents=True, exist_ok=True) - - check_nvidia_driver() - command2run = f"{self.SHELL_SCRIPT} {ap_file} {temp_dir}" - _logger.info(command2run) - process = subprocess.Popen( - command2run, - shell=True, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - executable="/bin/bash", - ) - info, error = process.communicate() - info_str = info.decode("utf-8").strip() - _logger.info(info_str) - if process.returncode != 0: - error_str = error.decode("utf-8").strip() - # try and get the kilosort log if any - for log_file in temp_dir.rglob('*_kilosort.log'): - with open(log_file) as fid: - log = fid.read() - _logger.error(log) - break - raise RuntimeError(f"{self.SPIKE_SORTER_NAME} {info_str}, {error_str}") - - shutil.copytree(temp_dir.joinpath('output'), sorter_dir, dirs_exist_ok=True) - shutil.rmtree(temp_dir, ignore_errors=True) - return sorter_dir - - def _run(self, probes=None): - """ - Multiple steps. For each probe: - - Runs ks2 (skips if it already ran) - - synchronize the spike sorting - - output the probe description files - :param probes: (list of str) if provided, will only run spike sorting for specified probe names - :return: list of files to be registered on database - """ - efiles = spikeglx.glob_ephys_files(self.session_path) - ap_files = [(ef.get("ap"), ef.get("label")) for ef in efiles if "ap" in ef.keys()] - out_files = [] - for ap_file, label in ap_files: - if isinstance(probes, list) and label not in probes: - continue - try: - # if the file is part of a sequence, handles the run accordingly - sequence_file = ap_file.parent.joinpath(ap_file.stem.replace('ap', 'sequence.json')) - # temporary just skips for now - if sequence_file.exists(): - continue - ks2_dir = self._run_pykilosort(ap_file) # runs ks2, skips if it already ran - probe_out_path = self.session_path.joinpath("alf", label, self.SPIKE_SORTER_NAME) - shutil.rmtree(probe_out_path, ignore_errors=True) - probe_out_path.mkdir(parents=True, exist_ok=True) - spikes.ks2_to_alf( - ks2_dir, - bin_path=ap_file.parent, - out_path=probe_out_path, - bin_file=ap_file, - ampfactor=self._sample2v(ap_file), - ) - logfile = ks2_dir.joinpath(f"spike_sorting_{self.SPIKE_SORTER_NAME}.log") - if logfile.exists(): - shutil.copyfile(logfile, probe_out_path.joinpath( - f"_ibl_log.info_{self.SPIKE_SORTER_NAME}.log")) - out, _ = spikes.sync_spike_sorting(ap_file=ap_file, out_path=probe_out_path) - out_files.extend(out) - # convert ks2_output into tar file and also register - # Make this in case spike sorting is in old raw_ephys_data folders, for new - # sessions it should already exist - tar_dir = self.session_path.joinpath( - 'spike_sorters', self.SPIKE_SORTER_NAME, label) - tar_dir.mkdir(parents=True, exist_ok=True) - out = spikes.ks2_to_tar(ks2_dir, tar_dir, force=self.FORCE_RERUN) - out_files.extend(out) - - if self.one: - eid = self.one.path2eid(self.session_path, query_type='remote') - ins = self.one.alyx.rest('insertions', 'list', session=eid, name=label, query_type='remote') - if len(ins) != 0: - _logger.info("Creating SpikeSorting QC plots") - plot_task = ApPlots(ins[0]['id'], session_path=self.session_path, one=self.one) - _ = plot_task.run() - self.plot_tasks.append(plot_task) - - plot_task = SpikeSortingPlots(ins[0]['id'], session_path=self.session_path, one=self.one) - _ = plot_task.run(collection=str(probe_out_path.relative_to(self.session_path))) - self.plot_tasks.append(plot_task) - - resolved = ins[0].get('json', {'temp': 0}).get('extended_qc', {'temp': 0}). \ - get('alignment_resolved', False) - if resolved: - chns = np.load(probe_out_path.joinpath('channels.localCoordinates.npy')) - out = get_aligned_channels(ins[0], chns, one=self.one, save_dir=probe_out_path) - out_files.extend(out) - - except Exception: - _logger.error(traceback.format_exc()) - self.status = -1 - continue - probe_files = spikes.probes_description(self.session_path, one=self.one) - return out_files + probe_files - - def get_signatures(self, probes=None, **kwargs): - """ - This transforms all wildcards in collection to exact match - :param probes: - :return: - """ - - neuropixel_version = spikeglx.get_neuropixel_version_from_folder(self.session_path) - probes = probes or spikeglx.get_probes_from_folder(self.session_path) - - full_input_files = [] - for sig in self.signature['input_files']: - if 'raw_ephys_data*' in sig[1]: - if neuropixel_version != '3A': - full_input_files.append((sig[0], 'raw_ephys_data', sig[2])) - for probe in probes: - full_input_files.append((sig[0], f'raw_ephys_data/{probe}', sig[2])) - - elif 'raw_ephys_data/probe*' in sig[1]: - for probe in probes: - full_input_files.append((sig[0], f'raw_ephys_data/{probe}', sig[2])) - elif 'raw_ephys_data' in sig[1]: - if neuropixel_version != '3A': - full_input_files.append((sig[0], 'raw_ephys_data', sig[2])) - else: - full_input_files.append(sig) - - self.input_files = full_input_files - - full_output_files = [] - for sig in self.signature['output_files']: - if 'probe*' in sig[1]: - for probe in probes: - col = sig[1].split('/')[:-1] + [probe] - full_output_files.append((sig[0], '/'.join(col), sig[2])) - else: - full_input_files.append(sig) - - self.output_files = full_output_files - - class EphysVideoCompress(tasks.Task): priority = 90 level = 0 From 856c79e448d39e8350d6ed1d85e9d71ba76fdad9 Mon Sep 17 00:00:00 2001 From: olivier Date: Tue, 16 Jul 2024 22:46:37 +0100 Subject: [PATCH 02/30] flake --- ibllib/pipes/ephys_preprocessing.py | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/ibllib/pipes/ephys_preprocessing.py b/ibllib/pipes/ephys_preprocessing.py index 51f10f4ee..ee872cf6f 100644 --- a/ibllib/pipes/ephys_preprocessing.py +++ b/ibllib/pipes/ephys_preprocessing.py @@ -4,8 +4,6 @@ and the dynamic pipeline. """ import logging -import re -import shutil import subprocess from collections import OrderedDict import traceback @@ -15,27 +13,25 @@ import cv2 import numpy as np import pandas as pd -import packaging.version import one.alf.io as alfio from ibldsp.utils import rms import spikeglx from ibllib.misc import check_nvidia_driver -from ibllib.ephys import ephysqc, spikes, sync_probes +from ibllib.ephys import ephysqc, sync_probes from ibllib.io import ffmpeg from ibllib.io.video import label_from_path, assert_valid_label from ibllib.io.extractors import ephys_fpga, ephys_passive, camera from ibllib.pipes import tasks, base_tasks import ibllib.pipes.training_preprocessing as tpp from ibllib.pipes.misc import create_alyx_probe_insertions -from ibllib.qc.alignment_qc import get_aligned_channels +from ibllib.pipes.ephys_tasks import SpikeSorting from ibllib.qc.task_extractors import TaskQCExtractor from ibllib.qc.task_metrics import TaskQC from ibllib.qc.camera import run_all_qc as run_camera_qc from ibllib.qc.dlc import DlcQC -from ibllib.plots.figures import dlc_qc_plot, BehaviourPlots, LfpPlots, ApPlots, BadChannelsAp -from ibllib.plots.figures import SpikeSorting as SpikeSortingPlots +from ibllib.plots.figures import dlc_qc_plot, BehaviourPlots, LfpPlots, BadChannelsAp from ibllib.plots.snapshot import ReportSnapshot from brainbox.behavior.dlc import likelihood_threshold, get_licks, get_pupil_diameter, get_smooth_pupil_diameter From 4d1f1cddada870e8dc21ac6bac3ad9547aa7668d Mon Sep 17 00:00:00 2001 From: olivier Date: Tue, 16 Jul 2024 22:51:09 +0100 Subject: [PATCH 03/30] add pointer to the lock file when the task fails --- ibllib/pipes/tasks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ibllib/pipes/tasks.py b/ibllib/pipes/tasks.py index 2159c8683..2d0712461 100644 --- a/ibllib/pipes/tasks.py +++ b/ibllib/pipes/tasks.py @@ -222,7 +222,7 @@ def run(self, **kwargs): if self.gpu >= 1: if not self._creates_lock(): self.status = -2 - _logger.info(f'Job {self.__class__} exited as a lock was found') + _logger.info(f'Job {self.__class__} exited as a lock was found at {self._lock_file_path()}') new_log = log_capture_string.getvalue() self.log = new_log if self.clobber else self.log + new_log _logger.removeHandler(ch) From 9944e25badcf9d8c44c7be268ebf6d91de39167b Mon Sep 17 00:00:00 2001 From: olivier Date: Wed, 17 Jul 2024 08:24:13 +0100 Subject: [PATCH 04/30] spike sorting loader uses iblsorter as default collection --- brainbox/io/one.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/brainbox/io/one.py b/brainbox/io/one.py index c9b25a778..c0a21803c 100644 --- a/brainbox/io/one.py +++ b/brainbox/io/one.py @@ -866,13 +866,18 @@ def _get_attributes(dataset_types): waveform_attributes = list(set(WAVEFORMS_ATTRIBUTES + waveform_attributes)) return {'spikes': spike_attributes, 'clusters': cluster_attributes, 'waveforms': waveform_attributes} - def _get_spike_sorting_collection(self, spike_sorter='pykilosort'): + def _get_spike_sorting_collection(self, spike_sorter=None): """ Filters a list or array of collections to get the relevant spike sorting dataset if there is a pykilosort, load it """ - collection = next(filter(lambda c: c == f'alf/{self.pname}/{spike_sorter}', self.collections), None) - # otherwise, prefers the shortest + for sorter in list([spike_sorter, 'iblsorter', 'pykilosort']): + if sorter is None: + continue + collection = next(filter(lambda c: c == f'alf/{self.pname}/{sorter}', self.collections), None) + if collection is not None: + return collection + # if none is found amongst the defaults, prefers the shortest collection = collection or next(iter(sorted(filter(lambda c: f'alf/{self.pname}' in c, self.collections), key=len)), None) _logger.debug(f"selecting: {collection} to load amongst candidates: {self.collections}") return collection From ca975bdc8e86be888f0d2f38547727b31dbf7ee4 Mon Sep 17 00:00:00 2001 From: olivier Date: Wed, 17 Jul 2024 08:24:38 +0100 Subject: [PATCH 05/30] set the waveform extraction chunk size to 30_000 --- ibllib/pipes/ephys_tasks.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/ibllib/pipes/ephys_tasks.py b/ibllib/pipes/ephys_tasks.py index 4d794b19f..44a723108 100644 --- a/ibllib/pipes/ephys_tasks.py +++ b/ibllib/pipes/ephys_tasks.py @@ -738,12 +738,11 @@ def _run(self): spike_samples=spikes['samples'], spike_clusters=spikes['clusters'], spike_channels=clusters['channels'][spikes['clusters']], - h=None, # todo the geometry needs to be set using the spikeglx object channel_labels=channels['labels'], max_wf=256, trough_offset=42, spike_length_samples=128, - chunksize_samples=int(3000), + chunksize_samples=int(30_000), n_jobs=None, wfs_dtype=np.float16, preprocessing_steps=["phase_shift", From b6cf50643cadae13e17c784ee44214fb996ee88d Mon Sep 17 00:00:00 2001 From: olivier Date: Fri, 19 Jul 2024 09:02:57 +0100 Subject: [PATCH 06/30] bugfix: reflect changes in arguments of waveform extraction --- ibllib/pipes/ephys_tasks.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/ibllib/pipes/ephys_tasks.py b/ibllib/pipes/ephys_tasks.py index 44a723108..411d16123 100644 --- a/ibllib/pipes/ephys_tasks.py +++ b/ibllib/pipes/ephys_tasks.py @@ -649,7 +649,7 @@ def _run_iblsort(self, ap_file): line = [line for line in lines if line.startswith("SCRATCH_DRIVE=")][0] m = re.search(r"\=(.*?)(\#|\n)", line)[0] scratch_drive = Path(m[1:-1].strip()) - assert scratch_drive.exists() + assert scratch_drive.exists(), f"Scratch drive {scratch_drive} not found" spikesorter_dir = f"{self.version}_{'_'.join(list(self.session_path.parts[-3:]))}_{self.pname}" temp_dir = scratch_drive.joinpath(spikesorter_dir) _logger.info(f"job progress command: tail -f {temp_dir} *.log") @@ -698,6 +698,7 @@ def _run(self): """ efiles = spikeglx.glob_ephys_files(self.session_path.joinpath(self.device_collection, self.pname)) ap_files = [(ef.get("ap"), ef.get("label")) for ef in efiles if "ap" in ef.keys()] + assert len(ap_files) == 0, f"No ap file found for probe {self.session_path.joinpath(self.device_collection, self.pname)}" assert len(ap_files) == 1, f"Several bin files found for the same probe {ap_files}" ap_file, label = ap_files[0] out_files = [] @@ -733,7 +734,7 @@ def _run(self): clusters = alfio.load_object(probe_out_path, 'clusters', attribute=['channels']) channels = alfio.load_object(probe_out_path, 'channels') extract_wfs_cbin( - cbin_file=ap_file, + bin_file=ap_file, output_dir=probe_out_path, spike_samples=spikes['samples'], spike_clusters=spikes['clusters'], @@ -745,10 +746,7 @@ def _run(self): chunksize_samples=int(30_000), n_jobs=None, wfs_dtype=np.float16, - preprocessing_steps=["phase_shift", - "bad_channel_interpolation", - "butterworth", - "car"] + preprocess_steps=["phase_shift", "bad_channel_interpolation", "butterworth", "car"] ) if self.one: eid = self.one.path2eid(self.session_path, query_type='remote') From d7b9add2df1ed9093f1b3452ebc3a5a29d3fd3f3 Mon Sep 17 00:00:00 2001 From: olivier Date: Fri, 19 Jul 2024 09:08:00 +0100 Subject: [PATCH 07/30] assertion reversed fixed --- ibllib/pipes/ephys_tasks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ibllib/pipes/ephys_tasks.py b/ibllib/pipes/ephys_tasks.py index 411d16123..b1a6d46df 100644 --- a/ibllib/pipes/ephys_tasks.py +++ b/ibllib/pipes/ephys_tasks.py @@ -698,7 +698,7 @@ def _run(self): """ efiles = spikeglx.glob_ephys_files(self.session_path.joinpath(self.device_collection, self.pname)) ap_files = [(ef.get("ap"), ef.get("label")) for ef in efiles if "ap" in ef.keys()] - assert len(ap_files) == 0, f"No ap file found for probe {self.session_path.joinpath(self.device_collection, self.pname)}" + assert len(ap_files) != 0, f"No ap file found for probe {self.session_path.joinpath(self.device_collection, self.pname)}" assert len(ap_files) == 1, f"Several bin files found for the same probe {ap_files}" ap_file, label = ap_files[0] out_files = [] From 2a018a6a3d14e44b33ca0044b30580cdba32fa63 Mon Sep 17 00:00:00 2001 From: olivier Date: Fri, 19 Jul 2024 09:17:44 +0100 Subject: [PATCH 08/30] change entrypoint for spike sorting script --- ibllib/pipes/ephys_tasks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ibllib/pipes/ephys_tasks.py b/ibllib/pipes/ephys_tasks.py index b1a6d46df..4722dcb5e 100644 --- a/ibllib/pipes/ephys_tasks.py +++ b/ibllib/pipes/ephys_tasks.py @@ -566,7 +566,7 @@ class SpikeSorting(base_tasks.EphysTask, CellQCMixin): force = True SHELL_SCRIPT = Path.home().joinpath( - "Documents/PYTHON/iblscripts/deploy/serverpc/iblsorter/run_iblsorter.sh" + "Documents/PYTHON/iblscripts/deploy/serverpc/iblsorter/sort_recording.sh" ) SPIKE_SORTER_NAME = 'iblsorter' PYKILOSORT_REPO = Path.home().joinpath('Documents/PYTHON/SPIKE_SORTING/ibl-sorter') From b07f438f36a5522ad7f0a8a13c39ed5ee884f6c9 Mon Sep 17 00:00:00 2001 From: olivier Date: Fri, 19 Jul 2024 13:07:28 +0100 Subject: [PATCH 09/30] remove ephys cell qc task from pipeline as it is part of spike sorting --- ibllib/pipes/dynamic_pipeline.py | 2 -- ibllib/pipes/ephys_tasks.py | 36 -------------------------------- 2 files changed, 38 deletions(-) diff --git a/ibllib/pipes/dynamic_pipeline.py b/ibllib/pipes/dynamic_pipeline.py index 109cf5a3f..561cff907 100644 --- a/ibllib/pipes/dynamic_pipeline.py +++ b/ibllib/pipes/dynamic_pipeline.py @@ -494,8 +494,6 @@ def make_pipeline(session_path, **pkwargs): tasks[f'RawEphysQC_{pname}'] = type(f'RawEphysQC_{pname}', (etasks.RawEphysQC,), {})( **kwargs, **ephys_kwargs, pname=pname, parents=register_task) - tasks[f'EphysCellQC_{pname}'] = type(f'EphysCellQC_{pname}', (etasks.EphysCellsQc,), {})( - **kwargs, **ephys_kwargs, pname=pname, parents=[tasks[f'Spikesorting_{pname}']]) # Video tasks if 'cameras' in devices: diff --git a/ibllib/pipes/ephys_tasks.py b/ibllib/pipes/ephys_tasks.py index 4722dcb5e..2ff1f2fdd 100644 --- a/ibllib/pipes/ephys_tasks.py +++ b/ibllib/pipes/ephys_tasks.py @@ -769,39 +769,3 @@ def _run(self): out_files.extend(out) return out_files - - -class EphysCellsQc(base_tasks.EphysTask, CellQCMixin): - priority = 90 - job_size = 'small' - - @property - def signature(self): - signature = { - 'input_files': [('spikes.times.npy', f'alf/{self.pname}*', True), - ('spikes.clusters.npy', f'alf/{self.pname}*', True), - ('spikes.amps.npy', f'alf/{self.pname}*', True), - ('spikes.depths.npy', f'alf/{self.pname}*', True), - ('clusters.channels.npy', f'alf/{self.pname}*', True)], - 'output_files': [('clusters.metrics.pqt', f'alf/{self.pname}*', True)] - } - return signature - - def _run(self): - """ - Post spike-sorting quality control at the cluster level. - Outputs a QC table in the clusters ALF object and labels corresponding probes in Alyx - """ - files_spikes = Path(self.session_path).joinpath('alf', self.pname).rglob('spikes.times.npy') - folder_probes = [f.parent for f in files_spikes] - out_files = [] - for folder_probe in folder_probes: - try: - qc_file, df_units, drift = self.compute_cell_qc(folder_probe) - out_files.append(qc_file) - self._label_probe_qc(folder_probe, df_units, drift) - except Exception: - _logger.error(traceback.format_exc()) - self.status = -1 - continue - return out_files From 431db81e8c6fd1b85f253b77ab1e3cb74661cbb0 Mon Sep 17 00:00:00 2001 From: olivier Date: Fri, 19 Jul 2024 13:57:06 +0100 Subject: [PATCH 10/30] add iblsort environment to the spike sorting task --- ibllib/pipes/ephys_tasks.py | 1 + 1 file changed, 1 insertion(+) diff --git a/ibllib/pipes/ephys_tasks.py b/ibllib/pipes/ephys_tasks.py index 2ff1f2fdd..c9220f9a2 100644 --- a/ibllib/pipes/ephys_tasks.py +++ b/ibllib/pipes/ephys_tasks.py @@ -564,6 +564,7 @@ class SpikeSorting(base_tasks.EphysTask, CellQCMixin): priority = 60 job_size = 'large' force = True + env = 'iblsorter' SHELL_SCRIPT = Path.home().joinpath( "Documents/PYTHON/iblscripts/deploy/serverpc/iblsorter/sort_recording.sh" From d1aac3faaedecdb3f6c84ac0f7c4c5065cd637f2 Mon Sep 17 00:00:00 2001 From: chris-langfield Date: Fri, 26 Jul 2024 13:06:27 -0400 Subject: [PATCH 11/30] typo --- ibllib/pipes/ephys_tasks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ibllib/pipes/ephys_tasks.py b/ibllib/pipes/ephys_tasks.py index c9220f9a2..ee5a0c8e6 100644 --- a/ibllib/pipes/ephys_tasks.py +++ b/ibllib/pipes/ephys_tasks.py @@ -598,7 +598,7 @@ def _fetch_iblsorter_version(repo_path): return f"iblsorter_{iblsorter.__version__}" except ImportError: _logger.info('IBL-sorter not in environment, trying to locate the repository') - init_file = Path(repo_path).joinpath('ibl-sorter', '__init__.py') + init_file = Path(repo_path).joinpath('iblsorter', '__init__.py') try: with open(init_file) as fid: lines = fid.readlines() From 696ff01f023338cd08d482590511f01553d6c436 Mon Sep 17 00:00:00 2001 From: chris-langfield Date: Mon, 29 Jul 2024 14:37:16 -0400 Subject: [PATCH 12/30] test forcing subprocess for large_jobs --- ibllib/pipes/ephys_tasks.py | 47 +++++++++++++++++-------------------- 1 file changed, 21 insertions(+), 26 deletions(-) diff --git a/ibllib/pipes/ephys_tasks.py b/ibllib/pipes/ephys_tasks.py index ee5a0c8e6..25bb4707a 100644 --- a/ibllib/pipes/ephys_tasks.py +++ b/ibllib/pipes/ephys_tasks.py @@ -656,32 +656,27 @@ def _run_iblsort(self, ap_file): _logger.info(f"job progress command: tail -f {temp_dir} *.log") temp_dir.mkdir(parents=True, exist_ok=True) check_nvidia_driver() - try: - # if pykilosort is in the environment, use the installed version within the task - import iblsorter.ibl # noqa - iblsorter.ibl.run_spike_sorting_ibl(bin_file=ap_file, scratch_dir=temp_dir) - except ImportError: - command2run = f"{self.SHELL_SCRIPT} {ap_file} {temp_dir}" - _logger.info(command2run) - process = subprocess.Popen( - command2run, - shell=True, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - executable="/bin/bash", - ) - info, error = process.communicate() - info_str = info.decode("utf-8").strip() - _logger.info(info_str) - if process.returncode != 0: - error_str = error.decode("utf-8").strip() - # try and get the kilosort log if any - for log_file in temp_dir.rglob('*_kilosort.log'): - with open(log_file) as fid: - log = fid.read() - _logger.error(log) - break - raise RuntimeError(f"{self.SPIKE_SORTER_NAME} {info_str}, {error_str}") + command2run = f"{self.SHELL_SCRIPT} {ap_file} {temp_dir}" + _logger.info(command2run) + process = subprocess.Popen( + command2run, + shell=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + executable="/bin/bash", + ) + info, error = process.communicate() + info_str = info.decode("utf-8").strip() + _logger.info(info_str) + if process.returncode != 0: + error_str = error.decode("utf-8").strip() + # try and get the kilosort log if any + for log_file in temp_dir.rglob('*_kilosort.log'): + with open(log_file) as fid: + log = fid.read() + _logger.error(log) + break + raise RuntimeError(f"{self.SPIKE_SORTER_NAME} {info_str}, {error_str}") shutil.copytree(temp_dir.joinpath('output'), sorter_dir, dirs_exist_ok=True) shutil.rmtree(temp_dir, ignore_errors=True) From 81481d000b3241eee0f5a5722f0279ece92d1e96 Mon Sep 17 00:00:00 2001 From: chris-langfield Date: Tue, 30 Jul 2024 09:49:39 -0400 Subject: [PATCH 13/30] Revert "test forcing subprocess for large_jobs" This reverts commit 31ff95dc5de1899b813fa8cbd948de6d73472364. --- ibllib/pipes/ephys_tasks.py | 47 ++++++++++++++++++++----------------- 1 file changed, 26 insertions(+), 21 deletions(-) diff --git a/ibllib/pipes/ephys_tasks.py b/ibllib/pipes/ephys_tasks.py index 25bb4707a..ee5a0c8e6 100644 --- a/ibllib/pipes/ephys_tasks.py +++ b/ibllib/pipes/ephys_tasks.py @@ -656,27 +656,32 @@ def _run_iblsort(self, ap_file): _logger.info(f"job progress command: tail -f {temp_dir} *.log") temp_dir.mkdir(parents=True, exist_ok=True) check_nvidia_driver() - command2run = f"{self.SHELL_SCRIPT} {ap_file} {temp_dir}" - _logger.info(command2run) - process = subprocess.Popen( - command2run, - shell=True, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - executable="/bin/bash", - ) - info, error = process.communicate() - info_str = info.decode("utf-8").strip() - _logger.info(info_str) - if process.returncode != 0: - error_str = error.decode("utf-8").strip() - # try and get the kilosort log if any - for log_file in temp_dir.rglob('*_kilosort.log'): - with open(log_file) as fid: - log = fid.read() - _logger.error(log) - break - raise RuntimeError(f"{self.SPIKE_SORTER_NAME} {info_str}, {error_str}") + try: + # if pykilosort is in the environment, use the installed version within the task + import iblsorter.ibl # noqa + iblsorter.ibl.run_spike_sorting_ibl(bin_file=ap_file, scratch_dir=temp_dir) + except ImportError: + command2run = f"{self.SHELL_SCRIPT} {ap_file} {temp_dir}" + _logger.info(command2run) + process = subprocess.Popen( + command2run, + shell=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + executable="/bin/bash", + ) + info, error = process.communicate() + info_str = info.decode("utf-8").strip() + _logger.info(info_str) + if process.returncode != 0: + error_str = error.decode("utf-8").strip() + # try and get the kilosort log if any + for log_file in temp_dir.rglob('*_kilosort.log'): + with open(log_file) as fid: + log = fid.read() + _logger.error(log) + break + raise RuntimeError(f"{self.SPIKE_SORTER_NAME} {info_str}, {error_str}") shutil.copytree(temp_dir.joinpath('output'), sorter_dir, dirs_exist_ok=True) shutil.rmtree(temp_dir, ignore_errors=True) From b9651e272f9d41f17dfed4fbfbb19cae6f4936c9 Mon Sep 17 00:00:00 2001 From: Mayo Faulkner Date: Wed, 11 Sep 2024 12:04:46 +0100 Subject: [PATCH 14/30] label probe qc if ONE instance --- ibllib/pipes/ephys_tasks.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/ibllib/pipes/ephys_tasks.py b/ibllib/pipes/ephys_tasks.py index ee5a0c8e6..668e72959 100644 --- a/ibllib/pipes/ephys_tasks.py +++ b/ibllib/pipes/ephys_tasks.py @@ -721,7 +721,7 @@ def _run(self): out, _ = ibllib.ephys.spikes.sync_spike_sorting(ap_file=ap_file, out_path=probe_out_path) out_files.extend(out) # Now compute the unit metrics - self.compute_cell_qc(probe_out_path) + _, df_units, drift = self.compute_cell_qc(probe_out_path) # convert ks2_output into tar file and also register # Make this in case spike sorting is in old raw_ephys_data folders, for new # sessions it should already exist @@ -753,6 +753,8 @@ def _run(self): eid = self.one.path2eid(self.session_path, query_type='remote') ins = self.one.alyx.rest('insertions', 'list', session=eid, name=label, query_type='remote') if len(ins) != 0: + _logger.info("Populating probe insertion with qc") + self._label_probe_qc(probe_out_path, df_units, drift) _logger.info("Creating SpikeSorting QC plots") plot_task = ApPlots(ins[0]['id'], session_path=self.session_path, one=self.one) _ = plot_task.run() From 12b722c3a9387a8239de831bf313f2dbf4cc61ea Mon Sep 17 00:00:00 2001 From: owinter Date: Thu, 12 Sep 2024 16:48:54 +0100 Subject: [PATCH 15/30] add passingSpikes.pqt to spike sorting job - update task signature --- ibllib/pipes/ephys_tasks.py | 80 ++++++++++++++++++++++++++++++------- 1 file changed, 66 insertions(+), 14 deletions(-) diff --git a/ibllib/pipes/ephys_tasks.py b/ibllib/pipes/ephys_tasks.py index 668e72959..0a3ced096 100644 --- a/ibllib/pipes/ephys_tasks.py +++ b/ibllib/pipes/ephys_tasks.py @@ -517,8 +517,22 @@ def compute_cell_qc(folder_alf_probe): df_units = pd.concat( [df_units, ks2_labels['ks2_label'].reindex(df_units.index)], axis=1) # save as parquet file - df_units.to_parquet(folder_alf_probe.joinpath("clusters.metrics.pqt")) - return folder_alf_probe.joinpath("clusters.metrics.pqt"), df_units, drift + df_units.to_parquet(file_metrics := folder_alf_probe.joinpath("clusters.metrics.pqt")) + + assert np.all((df_units['bitwise_fail'] == 0) == (df_units['label'] == 1)) # useless but sanity check for OW + + cok = df_units['bitwise_fail'] == 0 + sok = cok[spikes['clusters']].values + spikes['templates'] = spikes['templates'].astype(np.uint16) + spikes['clusters'] = spikes['clusters'].astype(np.uint16) + spikes['depths'] = spikes['depths'].astype(np.float32) + spikes['amps'] = spikes['amps'].astype(np.float32) + file_passing = folder_alf_probe.joinpath('passingSpikes.table.pqt') + df_spikes = pd.DataFrame(spikes) + df_spikes = df_spikes.iloc[sok, :].reset_index(drop=True) + df_spikes.to_parquet(file_passing) + + return [file_metrics, file_passing], df_units, drift def _label_probe_qc(self, folder_probe, df_units, drift): """ @@ -565,23 +579,59 @@ class SpikeSorting(base_tasks.EphysTask, CellQCMixin): job_size = 'large' force = True env = 'iblsorter' - + _sortername = 'iblsorter' SHELL_SCRIPT = Path.home().joinpath( - "Documents/PYTHON/iblscripts/deploy/serverpc/iblsorter/sort_recording.sh" + f"Documents/PYTHON/iblscripts/deploy/serverpc/{_sortername}/sort_recording.sh" ) SPIKE_SORTER_NAME = 'iblsorter' - PYKILOSORT_REPO = Path.home().joinpath('Documents/PYTHON/SPIKE_SORTING/ibl-sorter') + SORTER_REPOSITORY = Path.home().joinpath('Documents/PYTHON/SPIKE_SORTING/ibl-sorter') @property def signature(self): signature = { - 'input_files': [('*ap.meta', f'{self.device_collection}/{self.pname}', True), - ('*ap.*bin', f'{self.device_collection}/{self.pname}', True), - ('*ap.ch', f'{self.device_collection}/{self.pname}', False), - ('*sync.npy', f'{self.device_collection}/{self.pname}', True)], - 'output_files': [('spike_sorting_pykilosort.log', f'spike_sorters/pykilosort/{self.pname}', True), - ('_iblqc_ephysTimeRmsAP.rms.npy', f'{self.device_collection}/{self.pname}', True), - ('_iblqc_ephysTimeRmsAP.timestamps.npy', f'{self.device_collection}/{self.pname}', True)] + 'input_files': [ + ('*ap.meta', f'{self.device_collection}/{self.pname}', True), + ('*ap.*bin', f'{self.device_collection}/{self.pname}', True), + ('*ap.ch', f'{self.device_collection}/{self.pname}', False), + ('*sync.npy', f'{self.device_collection}/{self.pname}', True) + ], + 'output_files': [ + # ./raw_ephys_data/probe00/ + ('_iblqc_ephysTimeRmsAP.rms.npy', f'{self.device_collection}/{self.pname}/', True), + ('_iblqc_ephysTimeRmsAP.timestamps.npy', f'{self.device_collection}/{self.pname}/', True), + # ./spike_sorters/iblsorter/probe00 + ('spike_sorting_pykilosort.log', f'spike_sorters/{self._sortername}/{self.pname}', True), + ('_kilosort_raw.output.tar', f'spike_sorters/{self._sortername}/{self.pname}/', True), + # ./alf/probe00/iblsorter + ('_kilosort_whitening.matrix.npy', f'alf/{self.pname}/{self._sortername}/', True), + ('_phy_spikes_subset.channels.npy', f'alf/{self.pname}/{self._sortername}/', True), + ('_phy_spikes_subset.spikes.npy', f'alf/{self.pname}/{self._sortername}/', True), + ('_phy_spikes_subset.waveforms.npy', f'alf/{self.pname}/{self._sortername}/', True), + ('channels.labels.npy', f'alf/{self.pname}/{self._sortername}/', True), + ('channels.localCoordinates.npy', f'alf/{self.pname}/{self._sortername}/', True), + ('channels.rawInd.npy', f'alf/{self.pname}/{self._sortername}/', True), + ('clusters.amps.npy', f'alf/{self.pname}/{self._sortername}/', True), + ('clusters.channels.npy', f'alf/{self.pname}/{self._sortername}/', True), + ('clusters.depths.npy', f'alf/{self.pname}/{self._sortername}/', True), + ('clusters.metrics.pqt', f'alf/{self.pname}/{self._sortername}/', True), + ('clusters.peakToTrough.npy', f'alf/{self.pname}/{self._sortername}/', True), + ('clusters.uuids.csv', f'alf/{self.pname}/{self._sortername}/', True), + ('clusters.waveforms.npy', f'alf/{self.pname}/{self._sortername}/', True), + ('clusters.waveformsChannels.npy', f'alf/{self.pname}/{self._sortername}/', True), + ('drift.times.npy', f'alf/{self.pname}/{self._sortername}/', True), + ('drift.um.npy', f'alf/{self.pname}/{self._sortername}/', True), + ('drift_depths.um.npy', f'alf/{self.pname}/{self._sortername}/', True), + ('passingSpikes.table.pqt', f'alf/{self.pname}/{self._sortername}/', True), + ('spikes.amps.npy', f'alf/{self.pname}/{self._sortername}/', True), + ('spikes.clusters.npy', f'alf/{self.pname}/{self._sortername}/', True), + ('spikes.depths.npy', f'alf/{self.pname}/{self._sortername}/', True), + ('spikes.samples.npy', f'alf/{self.pname}/{self._sortername}/', True), + ('spikes.templates.npy', f'alf/{self.pname}/{self._sortername}/', True), + ('spikes.times.npy', f'alf/{self.pname}/{self._sortername}/', True), + ('templates.amps.npy', f'alf/{self.pname}/{self._sortername}/', True), + ('templates.waveforms.npy', f'alf/{self.pname}/{self._sortername}/', True), + ('templates.waveformsChannels.npy', f'alf/{self.pname}/{self._sortername}/', True), + ], } return signature @@ -630,7 +680,7 @@ def _run_iblsort(self, ap_file): (discontinued support for old spike sortings in the probe folder <1.5.5) :return: path of the folder containing ks2 spike sorting output """ - self.version = self._fetch_iblsorter_version(self.PYKILOSORT_REPO) + self.version = self._fetch_iblsorter_version(self.SORTER_REPOSITORY) label = ap_file.parts[-2] # this is usually the probe name sorter_dir = self.session_path.joinpath("spike_sorters", self.SPIKE_SORTER_NAME, label) self.FORCE_RERUN = False @@ -721,7 +771,9 @@ def _run(self): out, _ = ibllib.ephys.spikes.sync_spike_sorting(ap_file=ap_file, out_path=probe_out_path) out_files.extend(out) # Now compute the unit metrics - _, df_units, drift = self.compute_cell_qc(probe_out_path) + files_qc, df_units, drift = self.compute_cell_qc(probe_out_path) + out_files.extend(files_qc) + # convert ks2_output into tar file and also register # Make this in case spike sorting is in old raw_ephys_data folders, for new # sessions it should already exist From 109998941c245f9eb55a66a251298344430fc925 Mon Sep 17 00:00:00 2001 From: owinter Date: Tue, 17 Sep 2024 15:31:25 +0100 Subject: [PATCH 16/30] configure task to decompress cbin beforehand --- ibllib/ephys/sync_probes.py | 2 +- ibllib/pipes/ephys_tasks.py | 82 ++++++++++++++++++++++++------------- ibllib/pipes/tasks.py | 4 +- 3 files changed, 56 insertions(+), 32 deletions(-) diff --git a/ibllib/ephys/sync_probes.py b/ibllib/ephys/sync_probes.py index 3f3411479..54106b245 100644 --- a/ibllib/ephys/sync_probes.py +++ b/ibllib/ephys/sync_probes.py @@ -47,7 +47,7 @@ def sync(ses_path, **kwargs): return version3B(ses_path, **kwargs) -def version3A(ses_path, display=True, type='smooth', tol=2.1): +def version3A(ses_path, display=True, type='smooth', tol=2.1, probe_names=None): """ From a session path with _spikeglx_sync arrays extracted, locate ephys files for 3A and outputs one sync.timestamps.probeN.npy file per acquired probe. By convention the reference diff --git a/ibllib/pipes/ephys_tasks.py b/ibllib/pipes/ephys_tasks.py index 0a3ced096..3a40d4d79 100644 --- a/ibllib/pipes/ephys_tasks.py +++ b/ibllib/pipes/ephys_tasks.py @@ -1,3 +1,4 @@ +import hashlib import logging import traceback from pathlib import Path @@ -355,9 +356,9 @@ class EphysSyncPulses(SyncPulses): @property def signature(self): signature = { - 'input_files': [('*nidq.cbin', self.sync_collection, True), + 'input_files': [('*nidq.cbin', self.sync_collection, False), ('*nidq.ch', self.sync_collection, False), - ('*nidq.meta', self.sync_collection, True), + ('*nidq.meta', self.sync_collection, False), ('*nidq.wiring.json', self.sync_collection, True)], 'output_files': [('_spikeglx_sync.times.npy', self.sync_collection, True), ('_spikeglx_sync.polarities.npy', self.sync_collection, True), @@ -397,9 +398,14 @@ def signature(self): [('*ap.cbin', f'{self.device_collection}/{pname}', True) for pname in self.pname] + [('*ap.ch', f'{self.device_collection}/{pname}', True) for pname in self.pname] + [('*ap.wiring.json', f'{self.device_collection}/{pname}', False) for pname in self.pname] + - [('_spikeglx_sync.times.npy', self.sync_collection, True), - ('_spikeglx_sync.polarities.npy', self.sync_collection, True), - ('_spikeglx_sync.channels.npy', self.sync_collection, True)], + [('_spikeglx_sync.times.*npy', f'{self.device_collection}/{pname}', False) for pname in self.pname] + + [('_spikeglx_sync.polarities.*npy', f'{self.device_collection}/{pname}', False) for pname in self.pname] + + [('_spikeglx_sync.channels.*npy', f'{self.device_collection}/{pname}', False) for pname in self.pname] + + [('_spikeglx_sync.times.*npy', self.sync_collection, True), + ('_spikeglx_sync.polarities.*npy', self.sync_collection, True), + ('_spikeglx_sync.channels.*npy', self.sync_collection, True), + ('*ap.meta', self.sync_collection, True) + ], 'output_files': [(f'_spikeglx_sync.times.{pname}.npy', f'{self.device_collection}/{pname}', True) for pname in self.pname] + [(f'_spikeglx_sync.polarities.{pname}.npy', f'{self.device_collection}/{pname}', True) @@ -599,6 +605,7 @@ def signature(self): # ./raw_ephys_data/probe00/ ('_iblqc_ephysTimeRmsAP.rms.npy', f'{self.device_collection}/{self.pname}/', True), ('_iblqc_ephysTimeRmsAP.timestamps.npy', f'{self.device_collection}/{self.pname}/', True), + ('_iblqc_ephysSaturation.samples.npy', f'{self.device_collection}/{self.pname}/', True), # ./spike_sorters/iblsorter/probe00 ('spike_sorting_pykilosort.log', f'spike_sorters/{self._sortername}/{self.pname}', True), ('_kilosort_raw.output.tar', f'spike_sorters/{self._sortername}/{self.pname}/', True), @@ -635,6 +642,26 @@ def signature(self): } return signature + @property + def _temporary_folder(self): + """ + Constructs a path to a temporary folder for the spike sorting output and scratch files + This is usually on a high performance drive, and we should factor around 2.5 times the uncompressed raw recording size + For a scratch drive at /mnt/h0 we would have the following temp dir: + /mnt/h0/iblsorter_1.8.0_CSHL071_2020-10-04_001_probe01/ + """ + # get the scratch drive from the shell script + with open(self.SHELL_SCRIPT) as fid: + lines = fid.readlines() + line = [line for line in lines if line.startswith("SCRATCH_DRIVE=")][0] + m = re.search(r"\=(.*?)(\#|\n)", line)[0] + scratch_drive = Path(m[1:-1].strip()) + assert scratch_drive.exists(), f"Scratch drive {scratch_drive} not found" + # get the version of the sorter + self.version = self._fetch_iblsorter_version(self.SORTER_REPOSITORY) + spikesorter_dir = f"{self.version}_{'_'.join(list(self.session_path.parts[-3:]))}_{self.pname}" + return scratch_drive.joinpath(spikesorter_dir) + @staticmethod def _sample2v(ap_file): md = spikeglx.read_meta_data(ap_file.with_suffix(".meta")) @@ -680,9 +707,8 @@ def _run_iblsort(self, ap_file): (discontinued support for old spike sortings in the probe folder <1.5.5) :return: path of the folder containing ks2 spike sorting output """ - self.version = self._fetch_iblsorter_version(self.SORTER_REPOSITORY) - label = ap_file.parts[-2] # this is usually the probe name - sorter_dir = self.session_path.joinpath("spike_sorters", self.SPIKE_SORTER_NAME, label) + sorter_dir = self.session_path.joinpath("spike_sorters", self.SPIKE_SORTER_NAME, self.pname) + temp_dir = self._temporary_folder self.FORCE_RERUN = False if not self.FORCE_RERUN: log_file = sorter_dir.joinpath(f"spike_sorting_{self.SPIKE_SORTER_NAME}.log") @@ -694,22 +720,13 @@ def _run_iblsort(self, ap_file): return sorter_dir else: self.FORCE_RERUN = True - # get the scratch drive from the shell script - with open(self.SHELL_SCRIPT) as fid: - lines = fid.readlines() - line = [line for line in lines if line.startswith("SCRATCH_DRIVE=")][0] - m = re.search(r"\=(.*?)(\#|\n)", line)[0] - scratch_drive = Path(m[1:-1].strip()) - assert scratch_drive.exists(), f"Scratch drive {scratch_drive} not found" - spikesorter_dir = f"{self.version}_{'_'.join(list(self.session_path.parts[-3:]))}_{self.pname}" - temp_dir = scratch_drive.joinpath(spikesorter_dir) - _logger.info(f"job progress command: tail -f {temp_dir} *.log") + _logger.info(f"job progress command: tail -f {self._temporary_folder} *.log") temp_dir.mkdir(parents=True, exist_ok=True) check_nvidia_driver() try: # if pykilosort is in the environment, use the installed version within the task import iblsorter.ibl # noqa - iblsorter.ibl.run_spike_sorting_ibl(bin_file=ap_file, scratch_dir=temp_dir) + iblsorter.ibl.run_spike_sorting_ibl(bin_file=ap_file, scratch_dir=temp_dir, delete=False) except ImportError: command2run = f"{self.SHELL_SCRIPT} {ap_file} {temp_dir}" _logger.info(command2run) @@ -732,10 +749,7 @@ def _run_iblsort(self, ap_file): _logger.error(log) break raise RuntimeError(f"{self.SPIKE_SORTER_NAME} {info_str}, {error_str}") - shutil.copytree(temp_dir.joinpath('output'), sorter_dir, dirs_exist_ok=True) - shutil.rmtree(temp_dir, ignore_errors=True) - return sorter_dir def _run(self): @@ -751,35 +765,44 @@ def _run(self): ap_files = [(ef.get("ap"), ef.get("label")) for ef in efiles if "ap" in ef.keys()] assert len(ap_files) != 0, f"No ap file found for probe {self.session_path.joinpath(self.device_collection, self.pname)}" assert len(ap_files) == 1, f"Several bin files found for the same probe {ap_files}" - ap_file, label = ap_files[0] + ap_file_original, label = ap_files[0] out_files = [] - ks2_dir = self._run_iblsort(ap_file) # runs the sorter, skips if it already ran + sr = spikeglx.Reader(ap_file_original) + if sr.is_mtscomp: + ap_file = sr.decompress_to_scratch(scratch_dir=self._temporary_folder) + else: + ap_file = ap_file_original + sorter_dir = self._run_iblsort(ap_file) # runs the sorter, skips if it already ran + # convert the data to ALF in the ./alf/probeXX/SPIKE_SORTER_NAME folder probe_out_path = self.session_path.joinpath("alf", label, self.SPIKE_SORTER_NAME) shutil.rmtree(probe_out_path, ignore_errors=True) probe_out_path.mkdir(parents=True, exist_ok=True) ibllib.ephys.spikes.ks2_to_alf( - ks2_dir, + sorter_dir, bin_path=ap_file.parent, out_path=probe_out_path, bin_file=ap_file, ampfactor=self._sample2v(ap_file), ) - logfile = ks2_dir.joinpath(f"spike_sorting_{self.SPIKE_SORTER_NAME}.log") + logfile = sorter_dir.joinpath(f"spike_sorting_{self.SPIKE_SORTER_NAME}.log") if logfile.exists(): shutil.copyfile(logfile, probe_out_path.joinpath(f"_ibl_log.info_{self.SPIKE_SORTER_NAME}.log")) + # recover the QC files from the spike sorting output + for file_qc in sorter_dir.rglob('_iblqc_*.npy'): + shutil.copy(file_qc, ap_file_original.parent.joinpath(file_qc.name)) + out_files.append(ap_file_original.parent.joinpath(file_qc.name)) # Sync spike sorting with the main behaviour clock: the nidq for 3B+ and the main probe for 3A - out, _ = ibllib.ephys.spikes.sync_spike_sorting(ap_file=ap_file, out_path=probe_out_path) + out, _ = ibllib.ephys.spikes.sync_spike_sorting(ap_file=ap_file_original, out_path=probe_out_path) out_files.extend(out) # Now compute the unit metrics files_qc, df_units, drift = self.compute_cell_qc(probe_out_path) out_files.extend(files_qc) - # convert ks2_output into tar file and also register # Make this in case spike sorting is in old raw_ephys_data folders, for new # sessions it should already exist tar_dir = self.session_path.joinpath('spike_sorters', self.SPIKE_SORTER_NAME, label) tar_dir.mkdir(parents=True, exist_ok=True) - out = ibllib.ephys.spikes.ks2_to_tar(ks2_dir, tar_dir, force=self.FORCE_RERUN) + out = ibllib.ephys.spikes.ks2_to_tar(sorter_dir, tar_dir, force=self.FORCE_RERUN) out_files.extend(out) # run waveform extraction _logger.info("Running waveform extraction") @@ -801,6 +824,7 @@ def _run(self): wfs_dtype=np.float16, preprocess_steps=["phase_shift", "bad_channel_interpolation", "butterworth", "car"] ) + shutil.rmtree(self._temporary_folder) if self.one: eid = self.one.path2eid(self.session_path, query_type='remote') ins = self.one.alyx.rest('insertions', 'list', session=eid, name=label, query_type='remote') diff --git a/ibllib/pipes/tasks.py b/ibllib/pipes/tasks.py index 2d0712461..bb9b81a24 100644 --- a/ibllib/pipes/tasks.py +++ b/ibllib/pipes/tasks.py @@ -423,7 +423,7 @@ def assert_expected_outputs(self, raise_error=True): return everything_is_fine, files - def assert_expected_inputs(self, raise_error=True): + def assert_expected_inputs(self, raise_error=True, raise_ambiguous=False): """ Check that all the files necessary to run the task have been are present on disk. @@ -458,7 +458,7 @@ def assert_expected_inputs(self, raise_error=True): for k, v in variant_datasets.items() if any(v)} _logger.error('Ambiguous input datasets found: %s', ambiguous) - if raise_error or self.location == 'sdsc': # take no chances on SDSC + if raise_ambiguous or self.location == 'sdsc': # take no chances on SDSC # This could be mitigated if loading with data OneSDSC raise NotImplementedError( 'Multiple variant datasets found. Loading for these is undefined.') From f3f825c14308648b2be5551da7acb87a4eb76b32 Mon Sep 17 00:00:00 2001 From: owinter Date: Wed, 16 Oct 2024 15:02:26 +0100 Subject: [PATCH 17/30] spike sorting loader with empty sorter name --- brainbox/io/one.py | 12 +++++++----- ibllib/pipes/ephys_tasks.py | 3 ++- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/brainbox/io/one.py b/brainbox/io/one.py index c0a21803c..e6a1d2be0 100644 --- a/brainbox/io/one.py +++ b/brainbox/io/one.py @@ -874,7 +874,10 @@ def _get_spike_sorting_collection(self, spike_sorter=None): for sorter in list([spike_sorter, 'iblsorter', 'pykilosort']): if sorter is None: continue - collection = next(filter(lambda c: c == f'alf/{self.pname}/{sorter}', self.collections), None) + if sorter == "": + collection = next(filter(lambda c: c == f'alf/{self.pname}', self.collections), None) + else: + collection = next(filter(lambda c: c == f'alf/{self.pname}/{sorter}', self.collections), None) if collection is not None: return collection # if none is found amongst the defaults, prefers the shortest @@ -987,14 +990,13 @@ def download_raw_waveforms(self, **kwargs): """ _logger.debug(f"loading waveforms from {self.collection}") return self.one.load_object( - self.eid, "waveforms", - attribute=["traces", "templates", "table", "channels"], + id=self.eid, obj="waveforms", attribute=["traces", "templates", "table", "channels"], collection=self._get_spike_sorting_collection("pykilosort"), download_only=True, **kwargs ) def raw_waveforms(self, **kwargs): wf_paths = self.download_raw_waveforms(**kwargs) - return WaveformsLoader(wf_paths[0].parent, wfs_dtype=np.float16) + return WaveformsLoader(wf_paths[0].parent) def load_channels(self, **kwargs): """ @@ -1027,7 +1029,7 @@ def load_channels(self, **kwargs): self.histology = 'alf' return Bunch(channels) - def load_spike_sorting(self, spike_sorter='pykilosort', revision=None, enforce_version=True, good_units=False, **kwargs): + def load_spike_sorting(self, spike_sorter='iblsorter', revision=None, enforce_version=True, good_units=False, **kwargs): """ Loads spikes, clusters and channels diff --git a/ibllib/pipes/ephys_tasks.py b/ibllib/pipes/ephys_tasks.py index 3a40d4d79..9f4f7e74f 100644 --- a/ibllib/pipes/ephys_tasks.py +++ b/ibllib/pipes/ephys_tasks.py @@ -824,7 +824,8 @@ def _run(self): wfs_dtype=np.float16, preprocess_steps=["phase_shift", "bad_channel_interpolation", "butterworth", "car"] ) - shutil.rmtree(self._temporary_folder) + _logger.info(f"Cleaning up temporary folder {self._temporary_folder}") + shutil.rmtree(self._temporary_folder, ignore_errors=True) if self.one: eid = self.one.path2eid(self.session_path, query_type='remote') ins = self.one.alyx.rest('insertions', 'list', session=eid, name=label, query_type='remote') From 7443926d2a9976db538105e0af3c6e59f096ffe6 Mon Sep 17 00:00:00 2001 From: owinter Date: Thu, 17 Oct 2024 15:52:04 +0100 Subject: [PATCH 18/30] revert decompressing ap cbin file before sorting --- ibllib/pipes/ephys_tasks.py | 42 +++++++++++++++++-------------------- 1 file changed, 19 insertions(+), 23 deletions(-) diff --git a/ibllib/pipes/ephys_tasks.py b/ibllib/pipes/ephys_tasks.py index 9f4f7e74f..6ed5df174 100644 --- a/ibllib/pipes/ephys_tasks.py +++ b/ibllib/pipes/ephys_tasks.py @@ -1,4 +1,3 @@ -import hashlib import logging import traceback from pathlib import Path @@ -394,18 +393,19 @@ def __init__(self, *args, **kwargs): @property def signature(self): signature = { - 'input_files': [('*ap.meta', f'{self.device_collection}/{pname}', True) for pname in self.pname] + - [('*ap.cbin', f'{self.device_collection}/{pname}', True) for pname in self.pname] + - [('*ap.ch', f'{self.device_collection}/{pname}', True) for pname in self.pname] + - [('*ap.wiring.json', f'{self.device_collection}/{pname}', False) for pname in self.pname] + - [('_spikeglx_sync.times.*npy', f'{self.device_collection}/{pname}', False) for pname in self.pname] + - [('_spikeglx_sync.polarities.*npy', f'{self.device_collection}/{pname}', False) for pname in self.pname] + - [('_spikeglx_sync.channels.*npy', f'{self.device_collection}/{pname}', False) for pname in self.pname] + - [('_spikeglx_sync.times.*npy', self.sync_collection, True), - ('_spikeglx_sync.polarities.*npy', self.sync_collection, True), - ('_spikeglx_sync.channels.*npy', self.sync_collection, True), - ('*ap.meta', self.sync_collection, True) - ], + 'input_files': + [('*ap.meta', f'{self.device_collection}/{pname}', True) for pname in self.pname] + + [('*ap.cbin', f'{self.device_collection}/{pname}', True) for pname in self.pname] + + [('*ap.ch', f'{self.device_collection}/{pname}', True) for pname in self.pname] + + [('*ap.wiring.json', f'{self.device_collection}/{pname}', False) for pname in self.pname] + + [('_spikeglx_sync.times.*npy', f'{self.device_collection}/{pname}', False) for pname in self.pname] + + [('_spikeglx_sync.polarities.*npy', f'{self.device_collection}/{pname}', False) for pname in self.pname] + + [('_spikeglx_sync.channels.*npy', f'{self.device_collection}/{pname}', False) for pname in self.pname] + + [('_spikeglx_sync.times.*npy', self.sync_collection, True), + ('_spikeglx_sync.polarities.*npy', self.sync_collection, True), + ('_spikeglx_sync.channels.*npy', self.sync_collection, True), + ('*ap.meta', self.sync_collection, True) + ], 'output_files': [(f'_spikeglx_sync.times.{pname}.npy', f'{self.device_collection}/{pname}', True) for pname in self.pname] + [(f'_spikeglx_sync.polarities.{pname}.npy', f'{self.device_collection}/{pname}', True) @@ -765,13 +765,8 @@ def _run(self): ap_files = [(ef.get("ap"), ef.get("label")) for ef in efiles if "ap" in ef.keys()] assert len(ap_files) != 0, f"No ap file found for probe {self.session_path.joinpath(self.device_collection, self.pname)}" assert len(ap_files) == 1, f"Several bin files found for the same probe {ap_files}" - ap_file_original, label = ap_files[0] + ap_file, label = ap_files[0] out_files = [] - sr = spikeglx.Reader(ap_file_original) - if sr.is_mtscomp: - ap_file = sr.decompress_to_scratch(scratch_dir=self._temporary_folder) - else: - ap_file = ap_file_original sorter_dir = self._run_iblsort(ap_file) # runs the sorter, skips if it already ran # convert the data to ALF in the ./alf/probeXX/SPIKE_SORTER_NAME folder probe_out_path = self.session_path.joinpath("alf", label, self.SPIKE_SORTER_NAME) @@ -789,10 +784,10 @@ def _run(self): shutil.copyfile(logfile, probe_out_path.joinpath(f"_ibl_log.info_{self.SPIKE_SORTER_NAME}.log")) # recover the QC files from the spike sorting output for file_qc in sorter_dir.rglob('_iblqc_*.npy'): - shutil.copy(file_qc, ap_file_original.parent.joinpath(file_qc.name)) - out_files.append(ap_file_original.parent.joinpath(file_qc.name)) + shutil.copy(file_qc, ap_file.parent.joinpath(file_qc.name)) + out_files.append(ap_file.parent.joinpath(file_qc.name)) # Sync spike sorting with the main behaviour clock: the nidq for 3B+ and the main probe for 3A - out, _ = ibllib.ephys.spikes.sync_spike_sorting(ap_file=ap_file_original, out_path=probe_out_path) + out, _ = ibllib.ephys.spikes.sync_spike_sorting(ap_file=ap_file, out_path=probe_out_path) out_files.extend(out) # Now compute the unit metrics files_qc, df_units, drift = self.compute_cell_qc(probe_out_path) @@ -822,7 +817,8 @@ def _run(self): chunksize_samples=int(30_000), n_jobs=None, wfs_dtype=np.float16, - preprocess_steps=["phase_shift", "bad_channel_interpolation", "butterworth", "car"] + preprocess_steps=["phase_shift", "bad_channel_interpolation", "butterworth", "car"], + scratch_dir=self._temporary_folder, ) _logger.info(f"Cleaning up temporary folder {self._temporary_folder}") shutil.rmtree(self._temporary_folder, ignore_errors=True) From c1365cfc9c8f18b40feb8ca53dddfd227ecb69df Mon Sep 17 00:00:00 2001 From: Mayo Faulkner Date: Fri, 18 Oct 2024 13:10:32 +0100 Subject: [PATCH 19/30] s3 patcher --- ibllib/oneibl/patcher.py | 49 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 49 insertions(+) diff --git a/ibllib/oneibl/patcher.py b/ibllib/oneibl/patcher.py index 3738d7bcf..85b676938 100644 --- a/ibllib/oneibl/patcher.py +++ b/ibllib/oneibl/patcher.py @@ -633,3 +633,52 @@ def _scp(self, local_path, remote_path, dry=True): def _rm(self, flatiron_path, dry=True): raise PermissionError("This Patcher does not have admin permissions to remove data " "from the FlatIron server") + + +class S3Patcher(Patcher): + + def __init__(self, one=None): + assert one + super().__init__(one=one) + self.s3_repo = self.s3_path = 's3_patcher' + + # Instantiate boto connection + # self.s3, self.bucket = get_s3_from_alyx(self.one.alyx, repo_name=self.s3_repo) + + def check_datasets(self, file_list): + # Here we want to check if the datasets exist, if they do we don't want to patch unless we force. + exists = [] + for file in file_list: + collection = full_path_parts(file, as_dict=True)['collection'] + dset = self.one.alyx.rest('datasets', 'list', session=self.one.path2eid(file), name=file.name, + collection=collection) + if len(dset) > 0: + exists.append(file) + + return exists + + def patch_dataset(self, file_list, dry=False, ftp=False, **kwargs): + + exists = self.check_datasets(file_list) + if len(exists) > 0: + # log a warning here that the files already exist, must force + return + + response = super().patch_dataset(file_list, dry=dry, repository=self.s3_repo, ftp=False, **kwargs) + # need to patch the file records to be consistent + for ds in response: + frs = ds['file_records'] + fr_server = next(filter(lambda fr: 'flatiron' in fr['data_repository'], frs)) + # Update the flatiron file record to be false + self.one.alyx.rest('files', 'partial_update', id=fr_server['id'], + data={'exists': False}) + def _scp(self, local_path, remote_path, dry=True): + + aws_remote_path = Path(self.s3_path).joinpath(remote_path.relative_to(FLATIRON_MOUNT)) + # TODO logging to track the progress of uploading the file + #self.s3.Bucket(self.bucket).upload_file(str(PurePosixPath(local_path)), str(PurePosixPath(aws_remote_path))) + + return 0, '' + + def _rm(self, *args, **kwargs): + raise PermissionError("This Patcher does not have admin permissions to remove data.") From 42c44000d70f609110fe8bd17d21183ad09301e1 Mon Sep 17 00:00:00 2001 From: owinter Date: Fri, 18 Oct 2024 16:54:31 +0100 Subject: [PATCH 20/30] Fix bug in the make sorting plots task signature --- ibllib/pipes/ephys_tasks.py | 2 +- ibllib/plots/figures.py | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/ibllib/pipes/ephys_tasks.py b/ibllib/pipes/ephys_tasks.py index 6ed5df174..e28963ee5 100644 --- a/ibllib/pipes/ephys_tasks.py +++ b/ibllib/pipes/ephys_tasks.py @@ -782,7 +782,7 @@ def _run(self): logfile = sorter_dir.joinpath(f"spike_sorting_{self.SPIKE_SORTER_NAME}.log") if logfile.exists(): shutil.copyfile(logfile, probe_out_path.joinpath(f"_ibl_log.info_{self.SPIKE_SORTER_NAME}.log")) - # recover the QC files from the spike sorting output + # recover the QC files from the spike sorting output and copy them for file_qc in sorter_dir.rglob('_iblqc_*.npy'): shutil.copy(file_qc, ap_file.parent.joinpath(file_qc.name)) out_files.append(ap_file.parent.joinpath(file_qc.name)) diff --git a/ibllib/plots/figures.py b/ibllib/plots/figures.py index 384042add..14a6bb554 100644 --- a/ibllib/plots/figures.py +++ b/ibllib/plots/figures.py @@ -18,6 +18,7 @@ import one.alf.io as alfio from one.alf.exceptions import ALFObjectNotFound from ibllib.io.video import get_video_frame, url_from_eid +from ibllib.oneibl.data_handlers import ExpectedDataset import spikeglx import neuropixel from brainbox.plot import driftmap @@ -387,7 +388,6 @@ def get_probe_signature(self): def get_signatures(self, **kwargs): files_spikes = Path(self.session_path).joinpath('alf').rglob('spikes.times.npy') folder_probes = [f.parent for f in files_spikes] - full_input_files = [] for sig in self.signature['input_files']: for folder in folder_probes: @@ -396,8 +396,9 @@ def get_signatures(self, **kwargs): self.input_files = full_input_files else: self.input_files = self.signature['input_files'] - self.output_files = self.signature['output_files'] + self.input_files = [ExpectedDataset.input(*i) for i in self.input_files] + self.output_files = [ExpectedDataset.output(*i) for i in self.output_files] class BadChannelsAp(ReportSnapshotProbe): From bf77afe177465e4910cb388f4ed395bd5de4f7bb Mon Sep 17 00:00:00 2001 From: Mayo Faulkner Date: Fri, 18 Oct 2024 17:17:33 +0100 Subject: [PATCH 21/30] add ec2 datahandler --- ibllib/oneibl/data_handlers.py | 10 +++++----- ibllib/oneibl/patcher.py | 23 +++++++++++++---------- ibllib/pipes/tasks.py | 2 ++ 3 files changed, 20 insertions(+), 15 deletions(-) diff --git a/ibllib/oneibl/data_handlers.py b/ibllib/oneibl/data_handlers.py index ba713babb..cefe949ab 100644 --- a/ibllib/oneibl/data_handlers.py +++ b/ibllib/oneibl/data_handlers.py @@ -21,7 +21,7 @@ from iblutil.util import flatten, ensure_list from ibllib.oneibl.registration import register_dataset, get_lab, get_local_data_repository -from ibllib.oneibl.patcher import FTPPatcher, SDSCPatcher, SDSC_ROOT_PATH, SDSC_PATCH_PATH +from ibllib.oneibl.patcher import FTPPatcher, SDSCPatcher, SDSC_ROOT_PATH, SDSC_PATCH_PATH, S3Patcher _logger = logging.getLogger(__name__) @@ -747,7 +747,7 @@ def cleanUp(self): os.unlink(file) -class RemoteHttpDataHandler(DataHandler): +class RemoteEC2DataHandler(DataHandler): def __init__(self, session_path, signature, one=None): """ Data handler for running tasks on remote compute node. Will download missing data via http using ONE @@ -774,9 +774,9 @@ def uploadData(self, outputs, version, **kwargs): :return: output info of registered datasets """ versions = super().uploadData(outputs, version) - ftp_patcher = FTPPatcher(one=self.one) - return ftp_patcher.create_dataset(path=outputs, created_by=self.one.alyx.user, - versions=versions, **kwargs) + s3_patcher = S3Patcher(one=self.one) + return s3_patcher.patch_dataset(outputs, created_by=self.one.alyx.user, + versions=versions, **kwargs) class RemoteAwsDataHandler(DataHandler): diff --git a/ibllib/oneibl/patcher.py b/ibllib/oneibl/patcher.py index 85b676938..564b96087 100644 --- a/ibllib/oneibl/patcher.py +++ b/ibllib/oneibl/patcher.py @@ -34,13 +34,13 @@ import globus_sdk import iblutil.io.params as iopar from iblutil.util import ensure_list -from one.alf.files import get_session_path, add_uuid_string +from one.alf.files import get_session_path, add_uuid_string, full_path_parts from one.alf.spec import is_uuid_string, is_uuid from one import params from one.webclient import AlyxClient from one.converters import path_from_dataset from one.remote import globus -from one.remote.aws import url2uri +from one.remote.aws import url2uri, get_s3_from_alyx from ibllib.oneibl.registration import register_dataset @@ -640,10 +640,11 @@ class S3Patcher(Patcher): def __init__(self, one=None): assert one super().__init__(one=one) - self.s3_repo = self.s3_path = 's3_patcher' + self.s3_repo = 's3_patcher' + self.s3_path = 'patcher' # Instantiate boto connection - # self.s3, self.bucket = get_s3_from_alyx(self.one.alyx, repo_name=self.s3_repo) + self.s3, self.bucket = get_s3_from_alyx(self.one.alyx, repo_name=self.s3_repo) def check_datasets(self, file_list): # Here we want to check if the datasets exist, if they do we don't want to patch unless we force. @@ -657,26 +658,28 @@ def check_datasets(self, file_list): return exists - def patch_dataset(self, file_list, dry=False, ftp=False, **kwargs): + def patch_dataset(self, file_list, dry=False, ftp=False, force=False, **kwargs): exists = self.check_datasets(file_list) - if len(exists) > 0: - # log a warning here that the files already exist, must force + if len(exists) > 0 and not force: + _logger.error(f'Files: {", ".join([f.name for f in file_list])} already exist, to force set force=True') return response = super().patch_dataset(file_list, dry=dry, repository=self.s3_repo, ftp=False, **kwargs) - # need to patch the file records to be consistent + # TODO in an ideal case the flatiron filerecord won't be altered when we register this dataset. This requires + # changing the the alyx.data.register_view for ds in response: frs = ds['file_records'] fr_server = next(filter(lambda fr: 'flatiron' in fr['data_repository'], frs)) # Update the flatiron file record to be false self.one.alyx.rest('files', 'partial_update', id=fr_server['id'], data={'exists': False}) + def _scp(self, local_path, remote_path, dry=True): aws_remote_path = Path(self.s3_path).joinpath(remote_path.relative_to(FLATIRON_MOUNT)) - # TODO logging to track the progress of uploading the file - #self.s3.Bucket(self.bucket).upload_file(str(PurePosixPath(local_path)), str(PurePosixPath(aws_remote_path))) + _logger.info(f'Transferring file {local_path} to {aws_remote_path}') + self.s3.Bucket(self.bucket).upload_file(str(PurePosixPath(local_path)), str(PurePosixPath(aws_remote_path))) return 0, '' diff --git a/ibllib/pipes/tasks.py b/ibllib/pipes/tasks.py index 61125a635..eb2aaa066 100644 --- a/ibllib/pipes/tasks.py +++ b/ibllib/pipes/tasks.py @@ -528,6 +528,8 @@ def get_data_handler(self, location=None): dhandler = data_handlers.SDSCDataHandler(self, self.session_path, self.signature, one=self.one) elif location == 'popeye': dhandler = data_handlers.PopeyeDataHandler(self, self.session_path, self.signature, one=self.one) + elif location == 'ec2': + dhandler = data_handlers.RemoteEC2DataHandler(self, self.session_path, self.signature, one=self.one) else: raise ValueError(f'Unknown location "{location}"') return dhandler From 50ba7fb478ee0a598887b878582cdfaf43cbf4d1 Mon Sep 17 00:00:00 2001 From: owinter Date: Fri, 18 Oct 2024 18:28:23 +0100 Subject: [PATCH 22/30] update iblsorter task signatures --- ibllib/pipes/ephys_tasks.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/ibllib/pipes/ephys_tasks.py b/ibllib/pipes/ephys_tasks.py index e28963ee5..84bdc75f9 100644 --- a/ibllib/pipes/ephys_tasks.py +++ b/ibllib/pipes/ephys_tasks.py @@ -607,7 +607,7 @@ def signature(self): ('_iblqc_ephysTimeRmsAP.timestamps.npy', f'{self.device_collection}/{self.pname}/', True), ('_iblqc_ephysSaturation.samples.npy', f'{self.device_collection}/{self.pname}/', True), # ./spike_sorters/iblsorter/probe00 - ('spike_sorting_pykilosort.log', f'spike_sorters/{self._sortername}/{self.pname}', True), + ('spike_sorting_iblsorter.log', f'spike_sorters/{self._sortername}/{self.pname}', True), ('_kilosort_raw.output.tar', f'spike_sorters/{self._sortername}/{self.pname}/', True), # ./alf/probe00/iblsorter ('_kilosort_whitening.matrix.npy', f'alf/{self.pname}/{self._sortername}/', True), @@ -697,7 +697,7 @@ def _fetch_iblsorter_run_version(log_file): line = fid.readline() version = re.search('version (.*), output', line) version = version or re.search('version (.*)', line) # old versions have output, new have a version line - version = re.sub(r'\^\[{2}[0-9]+m', '', version.group(1)) # removes the coloring tags + version = re.sub(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])', '', version.group(1)) return version def _run_iblsort(self, ap_file): @@ -783,8 +783,7 @@ def _run(self): if logfile.exists(): shutil.copyfile(logfile, probe_out_path.joinpath(f"_ibl_log.info_{self.SPIKE_SORTER_NAME}.log")) # recover the QC files from the spike sorting output and copy them - for file_qc in sorter_dir.rglob('_iblqc_*.npy'): - shutil.copy(file_qc, ap_file.parent.joinpath(file_qc.name)) + for file_qc in ap_file.parent.glob('_iblqc_*.npy'): out_files.append(ap_file.parent.joinpath(file_qc.name)) # Sync spike sorting with the main behaviour clock: the nidq for 3B+ and the main probe for 3A out, _ = ibllib.ephys.spikes.sync_spike_sorting(ap_file=ap_file, out_path=probe_out_path) From 70a6aa214d4551207f600390d25ef862c61c773a Mon Sep 17 00:00:00 2001 From: Mayo Faulkner Date: Mon, 21 Oct 2024 13:01:41 +0100 Subject: [PATCH 23/30] clobber=True --- ibllib/oneibl/patcher.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ibllib/oneibl/patcher.py b/ibllib/oneibl/patcher.py index 564b96087..22f682df4 100644 --- a/ibllib/oneibl/patcher.py +++ b/ibllib/oneibl/patcher.py @@ -652,7 +652,7 @@ def check_datasets(self, file_list): for file in file_list: collection = full_path_parts(file, as_dict=True)['collection'] dset = self.one.alyx.rest('datasets', 'list', session=self.one.path2eid(file), name=file.name, - collection=collection) + collection=collection, clobber=True) if len(dset) > 0: exists.append(file) From ac7fb6dd85965c97e44cc710607f6f7ddd25340e Mon Sep 17 00:00:00 2001 From: Mayo Faulkner Date: Mon, 21 Oct 2024 13:16:10 +0100 Subject: [PATCH 24/30] add make remotehttp --- ibllib/oneibl/data_handlers.py | 32 ++++++++++++++++++++++++++++++++ ibllib/pipes/tasks.py | 2 +- 2 files changed, 33 insertions(+), 1 deletion(-) diff --git a/ibllib/oneibl/data_handlers.py b/ibllib/oneibl/data_handlers.py index cefe949ab..632866775 100644 --- a/ibllib/oneibl/data_handlers.py +++ b/ibllib/oneibl/data_handlers.py @@ -779,6 +779,38 @@ def uploadData(self, outputs, version, **kwargs): versions=versions, **kwargs) +class RemoteHttpDataHandler(DataHandler): + def __init__(self, session_path, signature, one=None): + """ + Data handler for running tasks on remote compute node. Will download missing data via http using ONE + + :param session_path: path to session + :param signature: input and output file signatures + :param one: ONE instance + """ + super().__init__(session_path, signature, one=one) + + def setUp(self): + """ + Function to download necessary data to run tasks using ONE + :return: + """ + df = super().getData() + self.one._check_filesystem(df) + + def uploadData(self, outputs, version, **kwargs): + """ + Function to upload and register data of completed task via FTP patcher + :param outputs: output files from task to register + :param version: ibllib version + :return: output info of registered datasets + """ + versions = super().uploadData(outputs, version) + ftp_patcher = FTPPatcher(one=self.one) + return ftp_patcher.create_dataset(path=outputs, created_by=self.one.alyx.user, + versions=versions, **kwargs) + + class RemoteAwsDataHandler(DataHandler): def __init__(self, task, session_path, signature, one=None): """ diff --git a/ibllib/pipes/tasks.py b/ibllib/pipes/tasks.py index eb2aaa066..4c71b1914 100644 --- a/ibllib/pipes/tasks.py +++ b/ibllib/pipes/tasks.py @@ -529,7 +529,7 @@ def get_data_handler(self, location=None): elif location == 'popeye': dhandler = data_handlers.PopeyeDataHandler(self, self.session_path, self.signature, one=self.one) elif location == 'ec2': - dhandler = data_handlers.RemoteEC2DataHandler(self, self.session_path, self.signature, one=self.one) + dhandler = data_handlers.RemoteEC2DataHandler(self.session_path, self.signature, one=self.one) else: raise ValueError(f'Unknown location "{location}"') return dhandler From 0e5e43ef40a8d40c028a09963276b192d3117d8e Mon Sep 17 00:00:00 2001 From: olivier Date: Fri, 25 Oct 2024 12:00:22 +0100 Subject: [PATCH 25/30] add an optional scratch_folder argument to the task --- brainbox/io/spikeglx.py | 1 + ibllib/oneibl/data_handlers.py | 2 +- ibllib/pipes/ephys_tasks.py | 34 ++++++++++++++++++---------------- ibllib/pipes/tasks.py | 6 ++++-- 4 files changed, 24 insertions(+), 19 deletions(-) diff --git a/brainbox/io/spikeglx.py b/brainbox/io/spikeglx.py index 9c0618c11..fff72c5f2 100644 --- a/brainbox/io/spikeglx.py +++ b/brainbox/io/spikeglx.py @@ -128,6 +128,7 @@ def __init__(self, pid, one, typ='ap', cache_folder=None, remove_cached=False): self.file_chunks = self.one.load_dataset(self.eid, f'*.{typ}.ch', collection=f"*{self.pname}") meta_file = self.one.load_dataset(self.eid, f'*.{typ}.meta', collection=f"*{self.pname}") cbin_rec = self.one.list_datasets(self.eid, collection=f"*{self.pname}", filename=f'*{typ}.*bin', details=True) + cbin_rec.index = cbin_rec.index.map(lambda x: (self.eid, x)) self.url_cbin = self.one.record2url(cbin_rec)[0] with open(self.file_chunks, 'r') as f: self.chunks = json.load(f) diff --git a/ibllib/oneibl/data_handlers.py b/ibllib/oneibl/data_handlers.py index 632866775..b0c40c735 100644 --- a/ibllib/oneibl/data_handlers.py +++ b/ibllib/oneibl/data_handlers.py @@ -768,7 +768,7 @@ def setUp(self): def uploadData(self, outputs, version, **kwargs): """ - Function to upload and register data of completed task via FTP patcher + Function to upload and register data of completed task via S3 patcher :param outputs: output files from task to register :param version: ibllib version :return: output info of registered datasets diff --git a/ibllib/pipes/ephys_tasks.py b/ibllib/pipes/ephys_tasks.py index 84bdc75f9..bb339f048 100644 --- a/ibllib/pipes/ephys_tasks.py +++ b/ibllib/pipes/ephys_tasks.py @@ -643,7 +643,7 @@ def signature(self): return signature @property - def _temporary_folder(self): + def scratch_folder_run(self): """ Constructs a path to a temporary folder for the spike sorting output and scratch files This is usually on a high performance drive, and we should factor around 2.5 times the uncompressed raw recording size @@ -651,11 +651,14 @@ def _temporary_folder(self): /mnt/h0/iblsorter_1.8.0_CSHL071_2020-10-04_001_probe01/ """ # get the scratch drive from the shell script - with open(self.SHELL_SCRIPT) as fid: - lines = fid.readlines() - line = [line for line in lines if line.startswith("SCRATCH_DRIVE=")][0] - m = re.search(r"\=(.*?)(\#|\n)", line)[0] - scratch_drive = Path(m[1:-1].strip()) + if self.scratch_folder is None: + with open(self.SHELL_SCRIPT) as fid: + lines = fid.readlines() + line = [line for line in lines if line.startswith("SCRATCH_DRIVE=")][0] + m = re.search(r"\=(.*?)(\#|\n)", line)[0] + scratch_drive = Path(m[1:-1].strip()) + else: + scratch_drive = self.scratch_folder assert scratch_drive.exists(), f"Scratch drive {scratch_drive} not found" # get the version of the sorter self.version = self._fetch_iblsorter_version(self.SORTER_REPOSITORY) @@ -708,7 +711,6 @@ def _run_iblsort(self, ap_file): :return: path of the folder containing ks2 spike sorting output """ sorter_dir = self.session_path.joinpath("spike_sorters", self.SPIKE_SORTER_NAME, self.pname) - temp_dir = self._temporary_folder self.FORCE_RERUN = False if not self.FORCE_RERUN: log_file = sorter_dir.joinpath(f"spike_sorting_{self.SPIKE_SORTER_NAME}.log") @@ -720,15 +722,15 @@ def _run_iblsort(self, ap_file): return sorter_dir else: self.FORCE_RERUN = True - _logger.info(f"job progress command: tail -f {self._temporary_folder} *.log") - temp_dir.mkdir(parents=True, exist_ok=True) + _logger.info(f"job progress command: tail -f {self.scratch_folder_run} *.log") + self.scratch_folder_run.mkdir(parents=True, exist_ok=True) check_nvidia_driver() try: # if pykilosort is in the environment, use the installed version within the task import iblsorter.ibl # noqa - iblsorter.ibl.run_spike_sorting_ibl(bin_file=ap_file, scratch_dir=temp_dir, delete=False) + iblsorter.ibl.run_spike_sorting_ibl(bin_file=ap_file, scratch_dir=self.scratch_folder_run, delete=False) except ImportError: - command2run = f"{self.SHELL_SCRIPT} {ap_file} {temp_dir}" + command2run = f"{self.SHELL_SCRIPT} {ap_file} {self.scratch_folder_run}" _logger.info(command2run) process = subprocess.Popen( command2run, @@ -743,13 +745,13 @@ def _run_iblsort(self, ap_file): if process.returncode != 0: error_str = error.decode("utf-8").strip() # try and get the kilosort log if any - for log_file in temp_dir.rglob('*_kilosort.log'): + for log_file in self.scratch_folder_run.rglob('*_kilosort.log'): with open(log_file) as fid: log = fid.read() _logger.error(log) break raise RuntimeError(f"{self.SPIKE_SORTER_NAME} {info_str}, {error_str}") - shutil.copytree(temp_dir.joinpath('output'), sorter_dir, dirs_exist_ok=True) + shutil.copytree(self.scratch_folder_run.joinpath('output'), sorter_dir, dirs_exist_ok=True) return sorter_dir def _run(self): @@ -817,10 +819,10 @@ def _run(self): n_jobs=None, wfs_dtype=np.float16, preprocess_steps=["phase_shift", "bad_channel_interpolation", "butterworth", "car"], - scratch_dir=self._temporary_folder, + scratch_dir=self.scratch_folder_run, ) - _logger.info(f"Cleaning up temporary folder {self._temporary_folder}") - shutil.rmtree(self._temporary_folder, ignore_errors=True) + _logger.info(f"Cleaning up temporary folder {self.scratch_folder_run}") + shutil.rmtree(self.scratch_folder_run, ignore_errors=True) if self.one: eid = self.one.path2eid(self.session_path, query_type='remote') ins = self.one.alyx.rest('insertions', 'list', session=eid, name=label, query_type='remote') diff --git a/ibllib/pipes/tasks.py b/ibllib/pipes/tasks.py index d592d5c29..14ead0fdd 100644 --- a/ibllib/pipes/tasks.py +++ b/ibllib/pipes/tasks.py @@ -114,7 +114,7 @@ class Task(abc.ABC): env = None # the environment name within which to run the task (NB: the env is not activated automatically!) def __init__(self, session_path, parents=None, taskid=None, one=None, - machine=None, clobber=True, location='server', **kwargs): + machine=None, clobber=True, location='server', scratch_folder=None, **kwargs): """ Base task class :param session_path: session path @@ -125,7 +125,8 @@ def __init__(self, session_path, parents=None, taskid=None, one=None, :param clobber: whether or not to overwrite log on rerun :param location: location where task is run. Options are 'server' (lab local servers'), 'remote' (remote compute node, data required for task downloaded via one), 'AWS' (remote compute node, data required for task downloaded via AWS), - or 'SDSC' (SDSC flatiron compute node) # TODO 'Globus' (remote compute node, data required for task downloaded via Globus) + or 'SDSC' (SDSC flatiron compute node) + :param scratch_folder: optional: Path where to write intermediate temporary data :param args: running arguments """ self.taskid = taskid @@ -141,6 +142,7 @@ def __init__(self, session_path, parents=None, taskid=None, one=None, self.clobber = clobber self.location = location self.plot_tasks = [] # Plotting task/ tasks to create plot outputs during the task + self.scratch_folder = scratch_folder self.kwargs = kwargs @property From 3238a8d81fe3e02f5073f46b93fe18c7175c32b3 Mon Sep 17 00:00:00 2001 From: olivier Date: Fri, 25 Oct 2024 15:35:07 +0100 Subject: [PATCH 26/30] bugfix Streamer / ONE 2.10.1: pass the eid --- brainbox/io/spikeglx.py | 3 +-- requirements.txt | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/brainbox/io/spikeglx.py b/brainbox/io/spikeglx.py index fff72c5f2..a66f42c26 100644 --- a/brainbox/io/spikeglx.py +++ b/brainbox/io/spikeglx.py @@ -128,8 +128,7 @@ def __init__(self, pid, one, typ='ap', cache_folder=None, remove_cached=False): self.file_chunks = self.one.load_dataset(self.eid, f'*.{typ}.ch', collection=f"*{self.pname}") meta_file = self.one.load_dataset(self.eid, f'*.{typ}.meta', collection=f"*{self.pname}") cbin_rec = self.one.list_datasets(self.eid, collection=f"*{self.pname}", filename=f'*{typ}.*bin', details=True) - cbin_rec.index = cbin_rec.index.map(lambda x: (self.eid, x)) - self.url_cbin = self.one.record2url(cbin_rec)[0] + self.url_cbin = self.one.record2url(cbin_rec, eid=self.eid)[0] with open(self.file_chunks, 'r') as f: self.chunks = json.load(f) self.chunks['chunk_bounds'] = np.array(self.chunks['chunk_bounds']) diff --git a/requirements.txt b/requirements.txt index c6c7427e0..aac6a6704 100644 --- a/requirements.txt +++ b/requirements.txt @@ -26,7 +26,7 @@ iblatlas>=0.5.3 ibl-neuropixel>=1.0.1 iblutil>=1.13.0 mtscomp>=1.0.1 -ONE-api~=2.9.rc0 +ONE-api>=2.10.1 phylib>=2.6.0 psychofit slidingRP>=1.1.1 # steinmetz lab refractory period metrics From 98fc54a5d1551fcf3f00404bca1c411d710a9387 Mon Sep 17 00:00:00 2001 From: olivier Date: Fri, 25 Oct 2024 15:35:35 +0100 Subject: [PATCH 27/30] SpikeSorting task: recover raw data qc files --- ibllib/pipes/ephys_tasks.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/ibllib/pipes/ephys_tasks.py b/ibllib/pipes/ephys_tasks.py index bb339f048..8596a1619 100644 --- a/ibllib/pipes/ephys_tasks.py +++ b/ibllib/pipes/ephys_tasks.py @@ -785,8 +785,9 @@ def _run(self): if logfile.exists(): shutil.copyfile(logfile, probe_out_path.joinpath(f"_ibl_log.info_{self.SPIKE_SORTER_NAME}.log")) # recover the QC files from the spike sorting output and copy them - for file_qc in ap_file.parent.glob('_iblqc_*.npy'): - out_files.append(ap_file.parent.joinpath(file_qc.name)) + for file_qc in sorter_dir.glob('_iblqc_*.npy'): + shutil.move(file_qc, file_qc_out := ap_file.parent.joinpath(file_qc.name)) + out_files.append(file_qc_out) # Sync spike sorting with the main behaviour clock: the nidq for 3B+ and the main probe for 3A out, _ = ibllib.ephys.spikes.sync_spike_sorting(ap_file=ap_file, out_path=probe_out_path) out_files.extend(out) From bf8fea55cb99214c278dbd4abbc99328d749fe5d Mon Sep 17 00:00:00 2001 From: olivier Date: Fri, 25 Oct 2024 15:47:28 +0100 Subject: [PATCH 28/30] fix task queue test --- ibllib/tests/test_pipes.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/ibllib/tests/test_pipes.py b/ibllib/tests/test_pipes.py index 56ef51e68..9383d6dad 100644 --- a/ibllib/tests/test_pipes.py +++ b/ibllib/tests/test_pipes.py @@ -38,8 +38,8 @@ def test_task_queue(self, lab_repo_mock): lab_repo_mock.return_value = 'foo_repo' tasks = [ {'executable': 'ibllib.pipes.mesoscope_tasks.MesoscopePreprocess', 'priority': 80}, - {'executable': 'ibllib.pipes.ephys_tasks.SpikeSorting', 'priority': SpikeSorting.priority}, - {'executable': 'ibllib.pipes.base_tasks.RegisterRawDataTask', 'priority': RegisterRawDataTask.priority} + {'executable': 'ibllib.pipes.ephys_tasks.SpikeSorting', 'priority': SpikeSorting.priority}, # 60 + {'executable': 'ibllib.pipes.base_tasks.RegisterRawDataTask', 'priority': RegisterRawDataTask.priority} # 100 ] alyx = mock.Mock(spec=AlyxClient) alyx.rest.return_value = tasks @@ -49,10 +49,10 @@ def test_task_queue(self, lab_repo_mock): self.assertIn('foolab', alyx.rest.call_args.kwargs.get('django', '')) self.assertIn('foo_repo', alyx.rest.call_args.kwargs.get('django', '')) # Expect to return tasks in descending priority order, without mesoscope task (different env) - self.assertEqual([tasks[2], tasks[1]], queue) + self.assertEqual([tasks[2]], queue) # Expect only mesoscope task returned when relevant env passed - queue = local_server.task_queue(lab='foolab', alyx=alyx, env=('suite2p',)) - self.assertEqual([tasks[0]], queue) + queue = local_server.task_queue(lab='foolab', alyx=alyx, env=('suite2p', 'iblsorter')) + self.assertEqual([tasks[0], tasks[1]], queue) # Expect no tasks as mesoscope task is a large job queue = local_server.task_queue(mode='small', lab='foolab', alyx=alyx, env=('suite2p',)) self.assertEqual([], queue) From b67d23a0b13ff7aab98dd26a0817d209f995f4fd Mon Sep 17 00:00:00 2001 From: olivier Date: Fri, 25 Oct 2024 17:19:48 +0100 Subject: [PATCH 29/30] fix streamer with one 2.10 --- brainbox/io/spikeglx.py | 3 ++- requirements.txt | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/brainbox/io/spikeglx.py b/brainbox/io/spikeglx.py index a66f42c26..fff72c5f2 100644 --- a/brainbox/io/spikeglx.py +++ b/brainbox/io/spikeglx.py @@ -128,7 +128,8 @@ def __init__(self, pid, one, typ='ap', cache_folder=None, remove_cached=False): self.file_chunks = self.one.load_dataset(self.eid, f'*.{typ}.ch', collection=f"*{self.pname}") meta_file = self.one.load_dataset(self.eid, f'*.{typ}.meta', collection=f"*{self.pname}") cbin_rec = self.one.list_datasets(self.eid, collection=f"*{self.pname}", filename=f'*{typ}.*bin', details=True) - self.url_cbin = self.one.record2url(cbin_rec, eid=self.eid)[0] + cbin_rec.index = cbin_rec.index.map(lambda x: (self.eid, x)) + self.url_cbin = self.one.record2url(cbin_rec)[0] with open(self.file_chunks, 'r') as f: self.chunks = json.load(f) self.chunks['chunk_bounds'] = np.array(self.chunks['chunk_bounds']) diff --git a/requirements.txt b/requirements.txt index aac6a6704..31cdb0898 100644 --- a/requirements.txt +++ b/requirements.txt @@ -26,7 +26,7 @@ iblatlas>=0.5.3 ibl-neuropixel>=1.0.1 iblutil>=1.13.0 mtscomp>=1.0.1 -ONE-api>=2.10.1 +ONE-api>=2.10 phylib>=2.6.0 psychofit slidingRP>=1.1.1 # steinmetz lab refractory period metrics From 32c80de70c547e29e4b8a206c65b39e0ea6b8ec3 Mon Sep 17 00:00:00 2001 From: owinter Date: Mon, 28 Oct 2024 15:44:11 +0000 Subject: [PATCH 30/30] spike sorting loading: relax the version until the BWM is patched --- brainbox/io/one.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/brainbox/io/one.py b/brainbox/io/one.py index e6a1d2be0..ee55ea9d0 100644 --- a/brainbox/io/one.py +++ b/brainbox/io/one.py @@ -1029,7 +1029,7 @@ def load_channels(self, **kwargs): self.histology = 'alf' return Bunch(channels) - def load_spike_sorting(self, spike_sorter='iblsorter', revision=None, enforce_version=True, good_units=False, **kwargs): + def load_spike_sorting(self, spike_sorter='iblsorter', revision=None, enforce_version=False, good_units=False, **kwargs): """ Loads spikes, clusters and channels