diff --git a/.gitignore b/.gitignore index b512c09..a00cb39 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,21 @@ -node_modules \ No newline at end of file +# Python cache files +__pycache__/ +*.pyc +*.pyo + +# Environments +.venv/ +env/ +venv/ + +# Node modules +node_modules/ + +# IDE +.vscode/ +.idea/ + +# OS junk +.DS_Store +Thumbs.db +desktop.ini diff --git a/app/etl/__pycache__/extract.cpython-313.pyc b/app/etl/__pycache__/extract.cpython-313.pyc deleted file mode 100644 index 10b5268..0000000 Binary files a/app/etl/__pycache__/extract.cpython-313.pyc and /dev/null differ diff --git a/app/etl/__pycache__/load.cpython-313.pyc b/app/etl/__pycache__/load.cpython-313.pyc deleted file mode 100644 index 2790389..0000000 Binary files a/app/etl/__pycache__/load.cpython-313.pyc and /dev/null differ diff --git a/app/etl/__pycache__/transform.cpython-313.pyc b/app/etl/__pycache__/transform.cpython-313.pyc deleted file mode 100644 index 4347539..0000000 Binary files a/app/etl/__pycache__/transform.cpython-313.pyc and /dev/null differ diff --git a/app/etl/extract.py b/app/etl/extract.py index 94714f2..5044796 100644 --- a/app/etl/extract.py +++ b/app/etl/extract.py @@ -1,8 +1,13 @@ import pandas as pd import os +import logging # TODO (Find & Fix) from typing import Optional +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(levelname)s - %(message)s" +) def extract(path: str = "xyz.csv") -> pd.DataFrame : """ Extracts data from CSV file. @@ -27,14 +32,16 @@ def extract(path: str = "xyz.csv") -> pd.DataFrame : try: # Try different encodings encodings = ['utf-8', 'latin-1', 'cp1252', 'iso-8859-1'] - df = None + df: Optional[pd.DataFrame] = None for encoding in encodings: try: + df = pd.read_csv(path, encoding=encoding) + break # TODO (Find & Fix) - pass + except UnicodeDecodeError: - print(f"Failed to read with encoding '{encoding}'") # Log the encoding that failed + logging.warning(f"Failed to read with encoding '{encoding}'") # Log the encoding that failed if df is None: raise ValueError(f" Could not read CSV with tried encodings: {encodings}") @@ -43,7 +50,7 @@ def extract(path: str = "xyz.csv") -> pd.DataFrame : if df.empty: raise ValueError("File contains no data") - print(f"✅ Extracted {len(df)} rows and {len(df.columns)} columns") # TODO: Use logging instead of print + logging.info(f"✅ Extracted {len(df)} rows and {len(df.columns)} columns") # TODO: Use logging instead of print return df except pd.errors.EmptyDataError: diff --git a/app/etl/load.py b/app/etl/load.py index 97936f9..548b357 100644 --- a/app/etl/load.py +++ b/app/etl/load.py @@ -1,9 +1,14 @@ import pandas as pd import sqlite3 import os +import logging # TODO (Find & Fix) from typing import Optional +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(levelname)s - %(message)s" +) def load(df: pd.DataFrame, db_path: str = "etl_data.db", table_name: str = "processed_data"): """ Loads data into SQLite database with proper error handling and upsert logic. @@ -14,10 +19,10 @@ def load(df: pd.DataFrame, db_path: str = "etl_data.db", table_name: str = "proc table_name: Name of the table to create/update """ if df.empty: - print("⚠️ Warning: Empty DataFrame received, nothing to load") # TODO (Find & Fix) + logging.warning("⚠️ Warning: Empty DataFrame received, nothing to load") # TODO (Find & Fix) return - print(f"🔄 Loading {len(df)} rows into database '{db_path}'") # TODO (Find & Fix) + logging.info(f"🔄 Loading {len(df)} rows into database '{db_path}'") # TODO (Find & Fix) # Ensure directory exists db_dir = os.path.dirname(db_path) @@ -50,13 +55,21 @@ def load(df: pd.DataFrame, db_path: str = "etl_data.db", table_name: str = "proc updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """) + columns = ", ".join(df.columns) + placeholders = ", ".join(["?"] * len(df.columns)) + update_clause = ", ".join([f"{col}=excluded.{col}" for col in df.columns if col != "employee_id"]) + + sql_query = f""" + INSERT INTO {table_name} ({columns}) + VALUES ({placeholders}) + ON CONFLICT(employee_id) DO UPDATE SET {update_clause}; + """ data_to_insert = [tuple(row) for row in df.itertuples(index=False, name=None)] - placeholders = ", ".join(["?"] * len(df.columns)) - column_names = ", ".join(df.columns) - sql_query = f"INSERT OR IGNORE INTO {table_name} ({column_names}) VALUES ({placeholders})" cursor.executemany(sql_query, data_to_insert) conn.commit() + + logging.info(f"Successfully loaded {len(df)} records into '{table_name}'.") # TODO (Find & Fix): Bulk insert without checking for duplicates diff --git a/app/etl/transform.py b/app/etl/transform.py index 13887c6..c197014 100644 --- a/app/etl/transform.py +++ b/app/etl/transform.py @@ -1,8 +1,16 @@ import pandas as pd +import logging from datetime import datetime # TODO (Find & Fix) from typing import Optional + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(levelname)s - %(message)s" +) +logger = logging.getLogger(__name__) + def transform(df: pd.DataFrame) -> pd.DataFrame: """ Transform data by cleaning and standardizing it. @@ -14,31 +22,38 @@ def transform(df: pd.DataFrame) -> pd.DataFrame: Transformed DataFrame """ if df.empty: + raise ValueError("DataFrame is Empty.") # TODO (Find & Fix): Should raise a ValueError if DataFrame is empty - pass + # Create a copy to avoid modifying original df_transformed = df.copy() - print(f"🔄 Starting transformation of {len(df_transformed)} rows") # TODO (Find & Fix): Use logging instead of print + logger.info(f"🔄 Starting transformation of {len(df_transformed)} rows") # TODO (Find & Fix): Use logging instead of print # Handle duplicates initial_rows = len(df_transformed) - # TODO (Find & Fix): Duplicates are not removed + df_transformed.drop_duplicates(inplace=True)# TODO (Find & Fix): Duplicates are not removed duplicates_removed = initial_rows - len(df_transformed) if duplicates_removed > 0: + logger.info(f"Removed {duplicates_removed} duplicate rows.") # TODO (Find & Fix): Should log how many duplicates were removed pass # Handle null values in numeric columns numeric_columns = df_transformed.select_dtypes(include=['number']).columns for col in numeric_columns: + if df_transformed[col].isnull().any(): + mean_value = df_transformed[col].mean() + df_transformed[col].fillna(mean_value, inplace=True) # TODO (Find & Fix): Nulls in numeric columns are not handled pass # Handle null values in text columns text_columns = df_transformed.select_dtypes(include=['object']).columns for col in text_columns: + if df_transformed[col].isnull().any(): + df_transformed[col].fillna("Unknown", inplace=True) # TODO (Find & Fix): Nulls in text columns are not handled pass @@ -47,8 +62,15 @@ def transform(df: pd.DataFrame) -> pd.DataFrame: if any(keyword in col.lower() for keyword in ['date', 'time', 'created', 'updated'])] for col in date_columns: - # TODO (Find & Fix): Date columns are not standardized - pass + df_transformed[col] = pd.to_datetime(df_transformed[col], errors='coerce') + if df_transformed[col].isnull().any(): + median_date = df_transformed[col].median() + df_transformed[col].fillna(median_date, inplace=True)# TODO (Find & Fix): Date columns are not standardized + + for col in text_columns: + df_transformed[col] = df_transformed[col].astype(str).str.strip().str.lower() + + logger.info("Transformation completed successfully.") # TODO (Find & Fix): Text columns are not cleaned (strip, lowercase) return df_transformed \ No newline at end of file diff --git a/app/main.py b/app/main.py index e61920b..d24ed28 100644 --- a/app/main.py +++ b/app/main.py @@ -1,7 +1,12 @@ +import logging from app.etl.extract import extract from app.etl.transform import transform from app.etl.load import load +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(levelname)s - %(message)s" +) def run_pipeline(csv_path: str = "data.csv", db_path: str = "etl_data.db"): """ Run the complete ETL pipeline. @@ -11,41 +16,46 @@ def run_pipeline(csv_path: str = "data.csv", db_path: str = "etl_data.db"): db_path: Path to the output SQLite database """ try: - print("🚀 Starting ETL Pipeline") # TODO (Find & Fix): Use logging instead of print - print(f"📁 Input file: {csv_path}") - print(f"🗄️ Output database: {db_path}") - print("-" * 50) + logging.info("🚀 Starting ETL Pipeline") # TODO (Find & Fix): Use logging instead of logging.info + logging.info(f"📁 Input file: {csv_path}") + logging.info(f"🗄️ Output database: {db_path}") + logging.info("-" * 50) # Extract - print("📥 STEP 1: EXTRACT") + logging.info("📥 STEP 1: EXTRACT") df = extract(csv_path) - print(f"✅ Extracted {len(df)} rows") - print(f"📊 Columns: {list(df.columns)}") - print() + logging.info(f"✅ Extracted {len(df)} rows") + logging.info(f"📊 Columns: {list(df.columns)}") + logging.info("-" * 50) + # Transform - print("🔄 STEP 2: TRANSFORM") + logging.info("🔄 STEP 2: TRANSFORM") df_transformed = transform(df) - print(f"✅ Transformed data ready") - print() + logging.info(f"✅ Transformed data ready") + logging.info("-" * 50) + # Load - print("📤 STEP 3: LOAD") + logging.info("📤 STEP 3: LOAD") load(df_transformed, db_path) - print() + logging.info("-" * 50) + - print("🎉 ETL Pipeline completed successfully!") - print(f"📈 Final dataset: {len(df_transformed)} rows, {len(df_transformed.columns)} columns") + logging.info("🎉 ETL Pipeline completed successfully!") + logging.info(f"📈 Final dataset: {len(df_transformed)} rows, {len(df_transformed.columns)} columns") except FileNotFoundError as e: - print(f"❌ File Error: {e}") + logging.error(f"❌ File Error: {e}") except ValueError as e: + logging.error(f"⚠️ Value Error: {e}") + raise # TODO (Find & Fix): Error handling missing - pass + except Exception as e: # TODO (Find & Fix): Error handling missing - pass + logging.exception(f"🔥 Unexpected error: {e}") if __name__ == "__main__": # Run the pipeline