|
| 1 | +""" copy a folder from local to gcp """ |
| 2 | +import logging |
| 3 | +from concurrent import futures |
| 4 | + |
| 5 | +from nowcasting_dataset.filesystem.utils import get_all_filenames_in_path, upload_one_file |
| 6 | + |
| 7 | +logging.basicConfig() |
| 8 | +_LOG = logging.getLogger("nowcasting_dataset") |
| 9 | +_LOG.setLevel(logging.DEBUG) |
| 10 | + |
| 11 | +sets = ["train", "validation", "test"] |
| 12 | +data_sources = ["gsp", "metadata", "nwp", "pv", "satellite", "sun", "topographic"] |
| 13 | + |
| 14 | +GCP_PATH = "gs://solar-pv-nowcasting-data/prepared_ML_training_data/v10" |
| 15 | +LOCAL_PATH = ( |
| 16 | + "/mnt/storage_b/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/" |
| 17 | + "prepared_ML_training_data/v10" |
| 18 | +) |
| 19 | + |
| 20 | +all_filenames = {} |
| 21 | +for dset in sets: |
| 22 | + for data_source in data_sources: |
| 23 | + dir = f"{LOCAL_PATH}/{dset}/{data_source}" |
| 24 | + gsp_dir = f"{GCP_PATH}/{dset}/{data_source}" |
| 25 | + files = get_all_filenames_in_path(dir) |
| 26 | + files = sorted(files) |
| 27 | + # get files already in gsp |
| 28 | + try: |
| 29 | + gsp_files_already = get_all_filenames_in_path(gsp_dir) |
| 30 | + except Exception: |
| 31 | + gsp_files_already = [] |
| 32 | + # only get .nc files |
| 33 | + filenames = [file for file in files if ".nc" in file] |
| 34 | + gsp_files_already = [file for file in gsp_files_already if ".nc" in file] |
| 35 | + print(f"Already {len(gsp_files_already)} in gsp folder already: {gsp_dir}") |
| 36 | + |
| 37 | + # remove file if already in gsp |
| 38 | + filenames = [ |
| 39 | + file |
| 40 | + for file in filenames |
| 41 | + if f'{gsp_dir.replace("gs://","")}/{file.split("/")[-1]}' not in gsp_files_already |
| 42 | + ] |
| 43 | + print(f"There are {len(filenames)} to upload") |
| 44 | + |
| 45 | + files_dict = {file: f'{gsp_dir}/{file.split("/")[-1]}' for file in filenames} |
| 46 | + if len(filenames) > 0: |
| 47 | + all_filenames = {**all_filenames, **files_dict} |
| 48 | + |
| 49 | + |
| 50 | +def one_file(local_file, gsp_file): |
| 51 | + """Copy one file from local to gsp""" |
| 52 | + # can use this index, only to copy files after a certain number |
| 53 | + file_index = int(local_file.split(".")[0][-6:]) |
| 54 | + if file_index > -1: |
| 55 | + print(gsp_file) |
| 56 | + upload_one_file(remote_filename=gsp_file, local_filename=local_file, overwrite=False) |
| 57 | + |
| 58 | + |
| 59 | +# test to see if it works |
| 60 | +one_file(list(all_filenames.keys())[0], all_filenames[list(all_filenames.keys())[0]]) |
| 61 | + |
| 62 | +# loop over files |
| 63 | +with futures.ThreadPoolExecutor(max_workers=2) as executor: |
| 64 | + # Submit tasks to the executor. |
| 65 | + future_examples_per_source = [] |
| 66 | + for k, v in all_filenames.items(): |
| 67 | + task = executor.submit(one_file, local_file=k, gsp_file=v) |
| 68 | + future_examples_per_source.append(task) |
0 commit comments