3
3
from pathlib import Path
4
4
import time
5
5
import json
6
+ import string
7
+ import random
6
8
7
9
import numpy as np
8
10
from one .alf .io import remove_uuid_file
@@ -141,12 +143,11 @@ def read(self, nsel=slice(0, 10000), csel=slice(None), sync=True, volts=True):
141
143
n0 = self .chunks ['chunk_bounds' ][first_chunk ]
142
144
_logger .debug (f'Streamer: caching sample { n0 } , (t={ n0 / self .fs } )' )
143
145
self .cache_folder .mkdir (exist_ok = True , parents = True )
144
- sr = self ._download_raw_partial (first_chunk = first_chunk , last_chunk = last_chunk )
146
+ sr , file_cbin = self ._download_raw_partial (first_chunk = first_chunk , last_chunk = last_chunk )
145
147
if not volts :
146
148
data = np .copy (sr ._raw [nsel .start - n0 :nsel .stop - n0 , csel ])
147
149
else :
148
150
data = sr [nsel .start - n0 : nsel .stop - n0 , csel ]
149
-
150
151
sr .close ()
151
152
if self .remove_cached :
152
153
shutil .rmtree (self .target_dir )
@@ -159,16 +160,20 @@ def _download_raw_partial(self, first_chunk=0, last_chunk=0):
159
160
:param first_chunk:
160
161
:param last_chunk:
161
162
:return: spikeglx.Reader of the current chunk, Pathlib.Path of the directory where it is stored
163
+ :return: cbin local path
162
164
"""
163
165
assert str (self .url_cbin ).endswith ('.cbin' )
164
166
webclient = self .one .alyx
165
167
relpath = Path (self .url_cbin .replace (webclient ._par .HTTP_DATA_SERVER , '.' )).parents [0 ]
166
168
# write the temp file into a subdirectory
167
169
tdir_chunk = f"chunk_{ str (first_chunk ).zfill (6 )} _to_{ str (last_chunk ).zfill (6 )} "
168
- target_dir = Path (self .cache_folder , relpath , tdir_chunk )
169
- self .target_dir = target_dir
170
- Path (target_dir ).mkdir (parents = True , exist_ok = True )
171
- ch_file_stream = target_dir .joinpath (self .file_chunks .name ).with_suffix ('.stream.ch' )
170
+ # for parallel processes, there is a risk of collisions if the removed cached flag is set to True
171
+ # if the folder is to be removed append a unique identifier to avoid having duplicate names
172
+ if self .remove_cached :
173
+ tdir_chunk += '' .join ([random .choice (string .ascii_letters ) for _ in np .arange (10 )])
174
+ self .target_dir = Path (self .cache_folder , relpath , tdir_chunk )
175
+ Path (self .target_dir ).mkdir (parents = True , exist_ok = True )
176
+ ch_file_stream = self .target_dir .joinpath (self .file_chunks .name ).with_suffix ('.stream.ch' )
172
177
173
178
# Get the first sample index, and the number of samples to download.
174
179
i0 = self .chunks ['chunk_bounds' ][first_chunk ]
@@ -186,7 +191,7 @@ def _download_raw_partial(self, first_chunk=0, last_chunk=0):
186
191
cmeta_stream = json .load (f )
187
192
if (cmeta_stream .get ('chopped_first_sample' , None ) == i0 and
188
193
cmeta_stream .get ('chopped_total_samples' , None ) == total_samples ):
189
- return spikeglx .Reader (ch_file_stream .with_suffix ('.cbin' ), ignore_warnings = True )
194
+ return spikeglx .Reader (ch_file_stream .with_suffix ('.cbin' ), ignore_warnings = True ), ch_file_stream
190
195
191
196
else :
192
197
shutil .copy (self .file_chunks , ch_file_stream )
@@ -224,7 +229,7 @@ def _download_raw_partial(self, first_chunk=0, last_chunk=0):
224
229
try :
225
230
cbin_local_path = webclient .download_file (
226
231
self .url_cbin , chunks = (first_byte , n_bytes ),
227
- target_dir = target_dir , clobber = True , return_md5 = False )
232
+ target_dir = self . target_dir , clobber = True , return_md5 = False )
228
233
break
229
234
except Exception as e :
230
235
retries += 1
@@ -238,4 +243,4 @@ def _download_raw_partial(self, first_chunk=0, last_chunk=0):
238
243
assert cbin_local_path_renamed .exists ()
239
244
240
245
reader = spikeglx .Reader (cbin_local_path_renamed , ignore_warnings = True )
241
- return reader
246
+ return reader , cbin_local_path
0 commit comments