Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Changelog

All notable changes to this project will be documented in this file.

The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [0.0.2] - 2024-10-21

### Added

- typehints for self
- doc strings

## [0.0.1] - 2024-10-16

### Added

- initial version
10 changes: 1 addition & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,9 @@

library to manipulate data

## Installation instructions
## Installation

```sh
pip install mysiar-data-flow
```

## DataFlow.DataFrame

### Usage
For now check [mysiar_data_flow/data_flow.py](mysiar_data_flow/data_flow.py) file for interface



![work in progress](.github/5578703.png)
30 changes: 30 additions & 0 deletions Usage.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Usage

## DataFlow.DataFrame


Create empty data frame object in memory
```python

from mysiar_data_flow import DataFlow

df = DataFlow().DataFrame()
df.from_pandas(df=pandas_data_frame_obj)

```
Create data frame object in memory from Pandas data frame
```python

from mysiar_data_flow import DataFlow

df = DataFlow().DataFrame().from_pandas(df=pandas_data_frame_obj)
```



---
For more check [mysiar_data_flow/data_flow.py](https://github.com/mysiar-org/python-data-flow/blob/master/mysiar_data_flow/data_flow.py) file for interface



![work in progress](.github/5578703.png)
6 changes: 6 additions & 0 deletions mysiar_data_flow/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,7 @@
"""
.. include:: ../README.md
.. include:: ../Usage.md
.. include:: ../CHANGELOG.md
"""

from .data_flow import DataFlow
143 changes: 88 additions & 55 deletions mysiar_data_flow/data_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,132 +53,149 @@ def __del__(self):
if not self.__in_memory:
delete_file(self.__filename)

def from_fireducks(self, df: fd.DataFrame):
def from_csv(self, filename: str) -> "DataFlow.DataFrame":
if self.__in_memory:
self.__data = df
self.__data = fd.read_csv(filename)
else:
from_fireducks_2_file(df=df, tmp_filename=self.__filename, file_type=self.__file_type)
from_csv_2_file(filename=filename, tmp_filename=self.__filename, file_type=self.__file_type)
return self

def to_fireducks(self) -> fd.DataFrame:
def from_feather(self, filename: str) -> "DataFlow.DataFrame":
if self.__in_memory:
return self.__data
self.__data = fd.from_pandas(feather.read_feather(filename))
else:
return to_fireducks_from_file(tmp_filename=self.__filename, file_type=self.__file_type)
from_feather_2_file(filename=filename, tmp_filename=self.__filename, file_type=self.__file_type)
return self

def from_pandas(self, df: pd.DataFrame):
def from_fireducks(self, df: fd.DataFrame) -> "DataFlow.DataFrame":
if self.__in_memory:
self.__data = fd.from_pandas(df)
self.__data = df
else:
from_pandas_2_file(df=df, tmp_filename=self.__filename, file_type=self.__file_type)
from_fireducks_2_file(df=df, tmp_filename=self.__filename, file_type=self.__file_type)
return self

def to_pandas(self) -> pd.DataFrame:
def from_hdf(self, filename: str) -> "DataFlow.DataFrame":
if self.__in_memory:
return self.__data.to_pandas()
self.__data = fd.read_hdf(filename)
else:
return to_fireducks_from_file(tmp_filename=self.__filename, file_type=self.__file_type).to_pandas()
from_hdf_2_file(filename=filename, tmp_filename=self.__filename, file_type=self.__file_type)
return self

def from_polars(self, df: pl.DataFrame):
def from_json(self, filename: str) -> "DataFlow.DataFrame":
if self.__in_memory:
self.__data = fd.from_pandas(df.to_pandas())
self.__data = fd.read_json(filename)
else:
from_pandas_2_file(df=df.to_pandas(), tmp_filename=self.__filename, file_type=self.__file_type)
from_json_2_file(filename=filename, tmp_filename=self.__filename, file_type=self.__file_type)
return self

def to_polars(self) -> pl.DataFrame:
def from_pandas(self, df: pd.DataFrame) -> "DataFlow.DataFrame":
if self.__in_memory:
return pl.from_pandas(self.__data.to_pandas())
self.__data = fd.from_pandas(df)
else:
return pl.from_pandas(
to_fireducks_from_file(tmp_filename=self.__filename, file_type=self.__file_type).to_pandas()
)
from_pandas_2_file(df=df, tmp_filename=self.__filename, file_type=self.__file_type)
return self

def from_csv(self, filename: str):
def from_parquet(self, filename: str) -> "DataFlow.DataFrame":
if self.__in_memory:
self.__data = fd.read_csv(filename)
self.__data = fd.read_parquet(filename)
else:
from_csv_2_file(filename=filename, tmp_filename=self.__filename, file_type=self.__file_type)
from_parquet_2_file(filename=filename, tmp_filename=self.__filename, file_type=self.__file_type)
return self

def to_csv(self, filename: str, index=False):
def from_polars(self, df: pl.DataFrame) -> "DataFlow.DataFrame":
if self.__in_memory:
self.__data.to_csv(filename, index=index)
self.__data = fd.from_pandas(df.to_pandas())
else:
to_csv_from_file(filename=filename, tmp_filename=self.__filename, file_type=self.__file_type)
from_pandas_2_file(df=df.to_pandas(), tmp_filename=self.__filename, file_type=self.__file_type)
return self

def from_feather(self, filename: str):
def to_csv(self, filename: str, index=False) -> "DataFlow.DataFrame":
if self.__in_memory:
self.__data = fd.from_pandas(feather.read_feather(filename))
self.__data.to_csv(filename, index=index)
else:
from_feather_2_file(filename=filename, tmp_filename=self.__filename, file_type=self.__file_type)
to_csv_from_file(filename=filename, tmp_filename=self.__filename, file_type=self.__file_type)
return self

def to_feather(self, filename: str):
def to_feather(self, filename: str) -> "DataFlow.DataFrame":
if self.__in_memory:
self.__data.to_feather(filename)
else:
to_feather_from_file(filename=filename, tmp_filename=self.__filename, file_type=self.__file_type)
return self

def from_parquet(self, filename: str):
def to_fireducks(self) -> fd.DataFrame:
if self.__in_memory:
self.__data = fd.read_parquet(filename)
return self.__data
else:
from_parquet_2_file(filename=filename, tmp_filename=self.__filename, file_type=self.__file_type)
return self
return to_fireducks_from_file(tmp_filename=self.__filename, file_type=self.__file_type)

def to_parquet(self, filename: str):
def to_hdf(self, filename: str, key: str = "key") -> "DataFlow.DataFrame":
if self.__in_memory:
self.__data.to_parquet(filename)
self.__data.to_hdf(path_or_buf=filename, key=key)
else:
to_parquet_from_file(filename=filename, tmp_filename=self.__filename, file_type=self.__file_type)
to_hdf_from_file(filename=filename, tmp_filename=self.__filename, file_type=self.__file_type, key=key)
return self

def from_json(self, filename: str):
def to_json(self, filename: str) -> "DataFlow.DataFrame":
if self.__in_memory:
self.__data = fd.read_json(filename)
self.__data.to_json(filename)
else:
from_json_2_file(filename=filename, tmp_filename=self.__filename, file_type=self.__file_type)
to_json_from_file(filename=filename, tmp_filename=self.__filename, file_type=self.__file_type)
return self

def to_json(self, filename: str):
def to_pandas(self) -> pd.DataFrame:
if self.__in_memory:
self.__data.to_json(filename)
return self.__data.to_pandas()
else:
to_json_from_file(filename=filename, tmp_filename=self.__filename, file_type=self.__file_type)
return self
return to_fireducks_from_file(tmp_filename=self.__filename, file_type=self.__file_type).to_pandas()

def from_hdf(self, filename: str):
def to_parquet(self, filename: str) -> "DataFlow.DataFrame":
if self.__in_memory:
self.__data = fd.read_hdf(filename)
self.__data.to_parquet(filename)
else:
from_hdf_2_file(filename=filename, tmp_filename=self.__filename, file_type=self.__file_type)
to_parquet_from_file(filename=filename, tmp_filename=self.__filename, file_type=self.__file_type)
return self

def to_hdf(self, filename: str, key: str = "key"):
def to_polars(self) -> pl.DataFrame:
if self.__in_memory:
self.__data.to_hdf(path_or_buf=filename, key=key)
return pl.from_pandas(self.__data.to_pandas())
else:
to_hdf_from_file(filename=filename, tmp_filename=self.__filename, file_type=self.__file_type, key=key)
return self
return pl.from_pandas(
to_fireducks_from_file(tmp_filename=self.__filename, file_type=self.__file_type).to_pandas()
)

def columns(self) -> list:
"""
lists columns in data frame

:return: list - list of columns in data frame
"""
if self.__in_memory:
return self.__data.columns.to_list()
else:
return data_get_columns(tmp_filename=self.__filename, file_type=self.__file_type)

def columns_delete(self, columns: list):
def columns_delete(self, columns: list) -> "DataFlow.DataFrame":
"""
deletes columns from data frame

:param columns: list - list of columns to delete
:return: self
"""
if self.__in_memory:
self.__data.drop(columns=columns, inplace=True)
else:
data_delete_columns(tmp_filename=self.__filename, file_type=self.__file_type, columns=columns)

return self

def columns_rename(self, columns_mapping: dict):
def columns_rename(self, columns_mapping: dict) -> "DataFlow.DataFrame":
"""
rename columns

:param columns_mapping: dict - old_name: new_name pairs ex. {"Year": "year", "Units": "units"}
:return: self
"""
if self.__in_memory:
self.__data.rename(columns=columns_mapping, inplace=True)
else:
Expand All @@ -189,13 +206,28 @@ def columns_rename(self, columns_mapping: dict):
)
return self

def columns_select(self, columns: list):
def columns_select(self, columns: list) -> "DataFlow.DataFrame":
"""
columns select - columns to keep in data frame

:param columns: list - list of columns to select
:return: self
"""
if self.__in_memory:
self.__data = self.__data[columns]
else:
data_select_columns(tmp_filename=self.__filename, file_type=self.__file_type, columns=columns)
return self

def filter_on_column(self, column: str, value: Any, operator: Operator):
def filter_on_column(self, column: str, value: Any, operator: Operator) -> "DataFlow.DataFrame":
"""
filters data on column

:param column: str - column name
:param value: Any - value
:param operator: mysiar_data_flow.lib.Operator - filter operator
:return: self
"""
if self.__in_memory:
match operator:
case Operator.Eq:
Expand All @@ -218,3 +250,4 @@ def filter_on_column(self, column: str, value: Any, operator: Operator):
value=value,
operator=operator,
)
return self
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
license = {file = "LICENSE"}
[tool.poetry]
name = "mysiar-data-flow"
version = "0.0.2rc1"
version = "0.0.2"
readme = "README.md"
description = "Python data manipulation library"
authors = ["Piotr Synowiec <psynowiec@gmail.com>"]
Expand Down
3 changes: 2 additions & 1 deletion requirements.dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ pyproject-flake8
pytest
pytest-cov
poetry
twine
twine
pdoc