-
Notifications
You must be signed in to change notification settings - Fork 17
Description
Hi! In maybe_set_aws_credentials
, the AWS_S3_ALLOW_UNSAFE_RENAME
is set to true
. This causes a the following call, with no credentials provided, to fail:
import dask_deltatable as ddt
ddf = ddt.read_deltalake("s3://my/deltatable", storage_options={})
ddf.head(n=10)
This call currently fails with the following error (full traceback at end):
File [/opt/tljh/user/lib/python3.12/site-packages/s3fs/core.py:526](https://nb.cxproduction.cc/opt/tljh/user/lib/python3.12/site-packages/s3fs/core.py#line=525), in S3FileSystem.set_session(self, refresh, kwargs)
524 conf = AioConfig(**config_kwargs)
525 if self.session is None:
--> 526 self.session = aiobotocore.session.AioSession(**self.kwargs)
528 for parameters in (config_kwargs, self.kwargs, init_kwargs, client_kwargs):
529 for option in ("region_name", "endpoint_url"):
TypeError: AioSession.__init__() got an unexpected keyword argument 'AWS_S3_ALLOW_UNSAFE_RENAME'
The read_deltalake
call should succeed, since I'm executing it on an EC2 instance that has an IAM role configured with permissions to my bucket. I've confirmed I can read this same table with polars:
import polars as pl
df = pl.read_delta("s3://my/deltatable")
df.head()
I am able to work around this temporarily by using path s3a://my/deltatable
since maybe_set_aws_credentials
explicitly checks for the s3://
prefix (is this also a bug? it should probably check for s3a
and s3n
too?), but I'm pretty sure this is a bug related to how these credentials eventually make their way to pyarrow. I've included my package versions and full traceback below. If this is indeed a bug, I'm happy to try to write a fix.
Traceback
--------------------------------------------------------------------------- TypeError Traceback (most recent call last) Cell In[21], line 1 ----> 1 ddf.head(n=10)File /opt/tljh/user/lib/python3.12/site-packages/dask_expr/_collection.py:706, in FrameBase.head(self, n, npartitions, compute)
704 out = new_collection(expr.Head(self, n=n, npartitions=npartitions))
705 if compute:
--> 706 out = out.compute()
707 return outFile /opt/tljh/user/lib/python3.12/site-packages/dask_expr/_collection.py:480, in FrameBase.compute(self, fuse, concatenate, **kwargs)
478 out = out.repartition(npartitions=1)
479 out = out.optimize(fuse=fuse)
--> 480 return DaskMethodsMixin.compute(out, **kwargs)File /opt/tljh/user/lib/python3.12/site-packages/dask/base.py:372, in DaskMethodsMixin.compute(self, **kwargs)
348 def compute(self, **kwargs):
349 """Compute this dask collection
350
351 This turns a lazy Dask collection into its in-memory equivalent.
(...)
370 dask.compute
371 """
--> 372 (result,) = compute(self, traverse=False, **kwargs)
373 return resultFile /opt/tljh/user/lib/python3.12/site-packages/dask/base.py:660, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
657 postcomputes.append(x.dask_postcompute())
659 with shorten_traceback():
--> 660 results = schedule(dsk, keys, **kwargs)
662 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])File /opt/tljh/user/lib/python3.12/site-packages/dask_deltatable/core.py:76, in _read_delta_partition(filename, schema, fs, columns, filter, pyarrow_to_pandas, **_kwargs)
70 pyarrow_to_pandas["types_mapper"] = _get_type_mapper(
71 pyarrow_to_pandas.get("types_mapper")
72 )
73 pyarrow_to_pandas["ignore_metadata"] = pyarrow_to_pandas.get(
74 "ignore_metadata", False
75 )
---> 76 table = pa_ds.dataset(
77 source=filename,
78 schema=schema,
79 filesystem=fs,
80 format="parquet",
81 partitioning="hive",
82 ).to_table(filter=filter_expression, columns=columns)
83 return table.to_pandas(**pyarrow_to_pandas)File /opt/tljh/user/lib/python3.12/site-packages/pyarrow/dataset.py:794, in dataset(source, schema, format, filesystem, partitioning, partition_base_dir, exclude_invalid_files, ignore_prefixes)
783 kwargs = dict(
784 schema=schema,
785 filesystem=filesystem,
(...)
790 selector_ignore_prefixes=ignore_prefixes
791 )
793 if _is_path_like(source):
--> 794 return _filesystem_dataset(source, **kwargs)
795 elif isinstance(source, (tuple, list)):
796 if all(_is_path_like(elem) or isinstance(elem, FileInfo) for elem in source):File /opt/tljh/user/lib/python3.12/site-packages/pyarrow/dataset.py:476, in _filesystem_dataset(source, schema, filesystem, partitioning, format, partition_base_dir, exclude_invalid_files, selector_ignore_prefixes)
474 fs, paths_or_selector = _ensure_multiple_sources(source, filesystem)
475 else:
--> 476 fs, paths_or_selector = _ensure_single_source(source, filesystem)
478 options = FileSystemFactoryOptions(
479 partitioning=partitioning,
480 partition_base_dir=partition_base_dir,
481 exclude_invalid_files=exclude_invalid_files,
482 selector_ignore_prefixes=selector_ignore_prefixes
483 )
484 factory = FileSystemDatasetFactory(fs, paths_or_selector, format, options)File /opt/tljh/user/lib/python3.12/site-packages/pyarrow/dataset.py:432, in _ensure_single_source(path, filesystem)
429 path = filesystem.normalize_path(path)
431 # retrieve the file descriptor
--> 432 file_info = filesystem.get_file_info(path)
434 # depending on the path type either return with a recursive
435 # directory selector or as a list containing a single file
436 if file_info.type == FileType.Directory:File /opt/tljh/user/lib/python3.12/site-packages/pyarrow/_fs.pyx:590, in pyarrow._fs.FileSystem.get_file_info()
File /opt/tljh/user/lib/python3.12/site-packages/pyarrow/error.pxi:155, in pyarrow.lib.pyarrow_internal_check_status()
File /opt/tljh/user/lib/python3.12/site-packages/pyarrow/error.pxi:89, in pyarrow.lib.check_status()
File /opt/tljh/user/lib/python3.12/site-packages/pyarrow/_fs.pyx:1498, in pyarrow._fs._cb_get_file_info()
File /opt/tljh/user/lib/python3.12/site-packages/pyarrow/fs.py:322, in FSSpecHandler.get_file_info(self, paths)
320 for path in paths:
321 try:
--> 322 info = self.fs.info(path)
323 except FileNotFoundError:
324 infos.append(FileInfo(path, FileType.NotFound))File /opt/tljh/user/lib/python3.12/site-packages/fsspec/asyn.py:118, in sync_wrapper..wrapper(*args, **kwargs)
115 @functools.wraps(func)
116 def wrapper(*args, **kwargs):
117 self = obj or args[0]
--> 118 return sync(self.loop, func, *args, **kwargs)File /opt/tljh/user/lib/python3.12/site-packages/fsspec/asyn.py:103, in sync(loop, func, timeout, *args, **kwargs)
101 raise FSTimeoutError from return_result
102 elif isinstance(return_result, BaseException):
--> 103 raise return_result
104 else:
105 return return_resultFile /opt/tljh/user/lib/python3.12/site-packages/fsspec/asyn.py:56, in _runner(event, coro, result, timeout)
54 coro = asyncio.wait_for(coro, timeout=timeout)
55 try:
---> 56 result[0] = await coro
57 except Exception as ex:
58 result[0] = exFile /opt/tljh/user/lib/python3.12/site-packages/s3fs/core.py:1426, in S3FileSystem._info(self, path, bucket, key, refresh, version_id)
1424 if key:
1425 try:
-> 1426 out = await self._call_s3(
1427 "head_object",
1428 self.kwargs,
1429 Bucket=bucket,
1430 Key=key,
1431 **version_id_kw(version_id),
1432 **self.req_kw,
1433 )
1434 return {
1435 "ETag": out.get("ETag", ""),
1436 "LastModified": out.get("LastModified", ""),
(...)
1442 "ContentType": out.get("ContentType"),
1443 }
1444 except FileNotFoundError:File /opt/tljh/user/lib/python3.12/site-packages/s3fs/core.py:364, in S3FileSystem._call_s3(self, method, *akwarglist, **kwargs)
363 async def _call_s3(self, method, *akwarglist, **kwargs):
--> 364 await self.set_session()
365 s3 = await self.get_s3(kwargs.get("Bucket"))
366 method = getattr(s3, method)File /opt/tljh/user/lib/python3.12/site-packages/s3fs/core.py:526, in S3FileSystem.set_session(self, refresh, kwargs)
524 conf = AioConfig(**config_kwargs)
525 if self.session is None:
--> 526 self.session = aiobotocore.session.AioSession(**self.kwargs)
528 for parameters in (config_kwargs, self.kwargs, init_kwargs, client_kwargs):
529 for option in ("region_name", "endpoint_url"):TypeError: AioSession.init() got an unexpected keyword argument 'AWS_S3_ALLOW_UNSAFE_RENAME'
Package Versions
Deltalake: 0.23.2 Dask-deltatable: 0.3.3 Pyarrow: 18.1.0