Skip to content

Commit d912021

Browse files
authored
Merge pull request #131 from awslabs/read-fixed-width
Add Pandas.read_fwf(), Pandas.read_fwf_list() and Pandas.read_fwf_pre…
2 parents 3915f44 + 3fc8edc commit d912021

File tree

5 files changed

+165
-8
lines changed

5 files changed

+165
-8
lines changed

awswrangler/pandas.py

Lines changed: 133 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -237,7 +237,7 @@ def _find_terminator(body, sep, quoting, quotechar, lineterminator):
237237

238238
@staticmethod
239239
def _read_csv_once(session_primitives: "SessionPrimitives", bucket_name: str, key_path: str,
240-
**pd_additional_kwargs):
240+
**pd_additional_kwargs) -> pd.DataFrame:
241241
"""
242242
Read a single CSV file from Amazon S3 using optimized strategies.
243243
@@ -256,7 +256,7 @@ def _read_csv_once(session_primitives: "SessionPrimitives", bucket_name: str, ke
256256
if pd_additional_kwargs.get('compression', 'infer') == 'infer':
257257
pd_additional_kwargs['compression'] = infer_compression(key_path, compression='infer')
258258

259-
dataframe = pd.read_csv(buff, **pd_additional_kwargs)
259+
dataframe: pd.DataFrame = pd.read_csv(buff, **pd_additional_kwargs)
260260
buff.close()
261261
return dataframe
262262

@@ -1613,16 +1613,18 @@ def read_csv_list(
16131613
logger.debug(f"procs_cpu_bound: {procs_cpu_bound}")
16141614
session_primitives = self._session.primitives
16151615
if len(paths) == 1:
1616-
path = paths[0]
1616+
path: str = paths[0]
1617+
bucket_name: str
1618+
key_path: str
16171619
bucket_name, key_path = Pandas._parse_path(path)
16181620
logger.debug(f"path: {path}")
16191621
df: pd.DataFrame = self._read_csv_once(session_primitives=self._session.primitives,
16201622
bucket_name=bucket_name,
16211623
key_path=key_path,
16221624
**pd_additional_kwargs)
16231625
else:
1624-
procs = []
1625-
receive_pipes = []
1626+
procs: list = []
1627+
receive_pipes: list = []
16261628
logger.debug(f"len(paths): {len(paths)}")
16271629
for path in paths:
16281630
receive_pipe, send_pipe = mp.Pipe()
@@ -1639,7 +1641,7 @@ def read_csv_list(
16391641
dfs: List[pd.DataFrame] = []
16401642
for i in range(len(procs)):
16411643
logger.debug(f"Waiting pipe number: {i}")
1642-
df_received = receive_pipes[i].recv()
1644+
df_received: pd.DataFrame = receive_pipes[i].recv()
16431645
dfs.append(df_received)
16441646
logger.debug(f"Waiting proc number: {i}")
16451647
procs[i].join()
@@ -1689,3 +1691,128 @@ def read_csv_prefix(
16891691
max_result_size=max_result_size,
16901692
procs_cpu_bound=procs_cpu_bound,
16911693
**pd_additional_kwargs)
1694+
1695+
@staticmethod
1696+
def _read_fwf(session_primitives: "SessionPrimitives", bucket_name: str, key_path: str, **pd_additional_kwargs) -> pd.DataFrame:
1697+
"""
1698+
Read a single fixed-width formatted file from Amazon S3 using optimized strategies.
1699+
1700+
:param session_primitives: SessionPrimitives()
1701+
:param bucket_name: S3 bucket name
1702+
:param key_path: S3 key path (w/o bucket)
1703+
:param **pd_additional_kwargs: Additional parameters forwarded to pandas.read_fwf
1704+
:return: Pandas Dataframe
1705+
"""
1706+
buff = BytesIO()
1707+
session: Session = session_primitives.session
1708+
client_s3 = session.boto3_session.client(service_name="s3", use_ssl=True, config=session.botocore_config)
1709+
client_s3.download_fileobj(Bucket=bucket_name, Key=key_path, Fileobj=buff)
1710+
buff.seek(0)
1711+
if pd_additional_kwargs.get('compression', 'infer') == 'infer':
1712+
pd_additional_kwargs['compression'] = infer_compression(key_path, compression='infer')
1713+
dataframe: pd.DataFrame = pd.read_fwf(buff, **pd_additional_kwargs)
1714+
buff.close()
1715+
return dataframe
1716+
1717+
@staticmethod
1718+
def _read_fwf_remote(send_pipe: mp.connection.Connection, session_primitives: "SessionPrimitives",
1719+
bucket_name: str, key_path: str, **pd_additional_kwargs):
1720+
df: pd.DataFrame = Pandas._read_fwf(session_primitives=session_primitives,
1721+
bucket_name=bucket_name,
1722+
key_path=key_path,
1723+
**pd_additional_kwargs)
1724+
send_pipe.send(df)
1725+
send_pipe.close()
1726+
1727+
def read_fwf(self, path: str, **pd_additional_kwargs) -> pd.DataFrame:
1728+
"""
1729+
Read a single fixed-width formatted file from Amazon S3 using optimized strategies.
1730+
1731+
:param path: Amazon S3 path (e.g. s3://bucket_name/key_name)
1732+
:param pd_additional_kwargs: Additional parameters forwarded to pandas.read_fwf
1733+
:return: Pandas Dataframe
1734+
"""
1735+
bucket_name, key_path = self._parse_path(path)
1736+
dataframe: pd.DataFrame = self._read_fwf(
1737+
session_primitives=self._session.primitives,
1738+
bucket_name=bucket_name,
1739+
key_path=key_path,
1740+
**pd_additional_kwargs)
1741+
return dataframe
1742+
1743+
def read_fwf_list(
1744+
self,
1745+
paths: List[str],
1746+
procs_cpu_bound: Optional[int] = None,
1747+
**pd_additional_kwargs,
1748+
) -> pd.DataFrame:
1749+
"""
1750+
Read a list of fixed-width formatted files from Amazon S3 using optimized strategies.
1751+
1752+
:param paths: List of Amazon S3 paths (e.g. ['s3://bucket_name/key_name1', 's3://bucket_name/key_name2'])
1753+
:param procs_cpu_bound: Number of cores used for CPU bound tasks
1754+
:param pd_additional_kwargs: Additional parameters forwarded to pandas.read_fwf
1755+
:return: Pandas Dataframe
1756+
"""
1757+
procs_cpu_bound = procs_cpu_bound if procs_cpu_bound is not None else self._session.procs_cpu_bound if self._session.procs_cpu_bound is not None else 1
1758+
logger.debug(f"procs_cpu_bound: {procs_cpu_bound}")
1759+
session_primitives = self._session.primitives
1760+
if len(paths) == 1:
1761+
path: str = paths[0]
1762+
bucket_name: str
1763+
key_path: str
1764+
bucket_name, key_path = Pandas._parse_path(path)
1765+
logger.debug(f"path: {path}")
1766+
df: pd.DataFrame = self._read_fwf(session_primitives=self._session.primitives,
1767+
bucket_name=bucket_name,
1768+
key_path=key_path,
1769+
**pd_additional_kwargs)
1770+
else:
1771+
procs: list = []
1772+
receive_pipes: list = []
1773+
logger.debug(f"len(paths): {len(paths)}")
1774+
for path in paths:
1775+
receive_pipe, send_pipe = mp.Pipe()
1776+
bucket_name, key_path = Pandas._parse_path(path)
1777+
logger.debug(f"launching path: {path}")
1778+
proc = mp.Process(target=self._read_fwf_remote,
1779+
args=(send_pipe, session_primitives, bucket_name, key_path),
1780+
kwargs=pd_additional_kwargs)
1781+
proc.daemon = False
1782+
proc.start()
1783+
procs.append(proc)
1784+
receive_pipes.append(receive_pipe)
1785+
utils.wait_process_release(processes=procs, target_number=procs_cpu_bound)
1786+
dfs: List[pd.DataFrame] = []
1787+
for i in range(len(procs)):
1788+
logger.debug(f"Waiting pipe number: {i}")
1789+
df_received: pd.DataFrame = receive_pipes[i].recv()
1790+
dfs.append(df_received)
1791+
logger.debug(f"Waiting proc number: {i}")
1792+
procs[i].join()
1793+
logger.debug(f"Closing proc number: {i}")
1794+
receive_pipes[i].close()
1795+
logger.debug(f"Concatenating all {len(paths)} DataFrames...")
1796+
df = pd.concat(objs=dfs, ignore_index=True, sort=False)
1797+
logger.debug("Concatenation done!")
1798+
return df
1799+
1800+
def read_fwf_prefix(
1801+
self,
1802+
path_prefix: str,
1803+
procs_cpu_bound: Optional[int] = None,
1804+
**pd_additional_kwargs,
1805+
) -> pd.DataFrame:
1806+
"""
1807+
Read all fixed-width formatted files from a given Amazon S3 prefix using optimized strategies.
1808+
1809+
:param path_prefix: Amazon S3 prefix (e.g. s3://bucket_name/prefix)
1810+
:param procs_cpu_bound: Number of cores used for CPU bound tasks
1811+
:param pd_additional_kwargs: Additional parameters forwarded to pandas.read_fwf
1812+
:return: Pandas Dataframe
1813+
"""
1814+
paths: List[str] = self._session.s3.list_objects(path=path_prefix)
1815+
paths = [p for p in paths if not p.endswith("/")]
1816+
return self.read_fwf_list(paths=paths,
1817+
procs_cpu_bound=procs_cpu_bound,
1818+
**pd_additional_kwargs)

awswrangler/session.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1-
import os
21
import importlib
2+
from os import cpu_count
33
from typing import Optional, Dict
44
from sys import version_info
55
from logging import getLogger, Logger
@@ -95,7 +95,7 @@ def __init__(self,
9595
self._s3_additional_kwargs: Optional[Dict[str, str]] = s3_additional_kwargs
9696
self._spark_context = spark_context
9797
self._spark_session = spark_session
98-
cpus: Optional[int] = os.cpu_count()
98+
cpus: Optional[int] = cpu_count()
9999
self._procs_cpu_bound: int = 1 if cpus is None else cpus if procs_cpu_bound is None else procs_cpu_bound
100100
self._procs_io_bound: int = 1 if cpus is None else cpus * Session.PROCS_IO_BOUND_FACTOR if procs_io_bound is None else procs_io_bound
101101
self._athena_workgroup: str = athena_workgroup

data_samples/fwf_nano.txt

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
1 Herfelingen27-12-18
2+
2 Lambusart14-06-18
3+
3Spormaggiore15-04-18
4+
4 Buizingen05-09-19
5+
5 San Rafael04-09-19

data_samples/fwf_nano.txt.zip

263 Bytes
Binary file not shown.

testing/test_awswrangler/test_pandas.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2492,3 +2492,28 @@ def test_to_csv_string(bucket, database):
24922492
ctas_approach=False)
24932493
wr.s3.delete_objects(path=path)
24942494
assert df.equals(df2)
2495+
2496+
2497+
@pytest.mark.parametrize("sample, row_num", [
2498+
("data_samples/fwf_nano.txt", 5),
2499+
("data_samples/fwf_nano.txt.zip", 5)
2500+
])
2501+
def test_read_fwf(bucket, sample, row_num):
2502+
path = f"s3://{bucket}/{sample}"
2503+
wr.s3.delete_objects(path=f"s3://{bucket}/")
2504+
boto3.client("s3").upload_file(sample, bucket, sample)
2505+
dataframe = wr.pandas.read_fwf(path=path, widths=[1, 12, 8], names=["id", "name", "date"])
2506+
wr.s3.delete_objects(path=path)
2507+
assert len(dataframe.index) == row_num
2508+
2509+
2510+
def test_read_fwf_prefix(bucket):
2511+
path = f"s3://{bucket}/data_samples/"
2512+
wr.s3.delete_objects(path=f"s3://{bucket}/")
2513+
boto3.client("s3").upload_file("data_samples/fwf_nano.txt", bucket, "data_samples/fwf_nano.txt")
2514+
boto3.client("s3").upload_file("data_samples/fwf_nano.txt.zip", bucket, "data_samples/fwf_nano.txt.zip")
2515+
sleep(10)
2516+
dataframe = wr.pandas.read_fwf_prefix(path_prefix=path, widths=[1, 12, 8], names=["id", "name", "date"])
2517+
wr.s3.delete_objects(path=path)
2518+
print(dataframe)
2519+
assert len(dataframe.index) == 10

0 commit comments

Comments
 (0)