Skip to content

Commit ea18815

Browse files
committed
fix: handle null values in transform function
Add validation to check for empty or null DataFrames to prevent runtime errors. Implements fillna() to handle missing values and ensure data consistency during transformation.
1 parent 6e5d072 commit ea18815

File tree

9 files changed

+394
-581
lines changed

9 files changed

+394
-581
lines changed
2.34 KB
Binary file not shown.
2.55 KB
Binary file not shown.
3.32 KB
Binary file not shown.
3.8 KB
Binary file not shown.

app/etl/extract.py

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,13 @@
11
import pandas as pd
22
import os
3+
import logging
34
# TODO (Find & Fix)
45
from typing import Optional
56

7+
logging.basicConfig(
8+
level=logging.INFO,
9+
format="%(asctime)s - %(levelname)s - %(message)s"
10+
)
611
def extract(path: str = "xyz.csv") -> pd.DataFrame :
712
"""
813
Extracts data from CSV file.
@@ -27,14 +32,16 @@ def extract(path: str = "xyz.csv") -> pd.DataFrame :
2732
try:
2833
# Try different encodings
2934
encodings = ['utf-8', 'latin-1', 'cp1252', 'iso-8859-1']
30-
df = None
35+
df: Optional[pd.DataFrame] = None
3136

3237
for encoding in encodings:
3338
try:
39+
df = pd.read_csv(path, encoding=encoding)
40+
break
3441
# TODO (Find & Fix)
35-
pass
42+
3643
except UnicodeDecodeError:
37-
print(f"Failed to read with encoding '{encoding}'") # Log the encoding that failed
44+
logging.warning(f"Failed to read with encoding '{encoding}'") # Log the encoding that failed
3845

3946
if df is None:
4047
raise ValueError(f" Could not read CSV with tried encodings: {encodings}")
@@ -43,7 +50,7 @@ def extract(path: str = "xyz.csv") -> pd.DataFrame :
4350
if df.empty:
4451
raise ValueError("File contains no data")
4552

46-
print(f"✅ Extracted {len(df)} rows and {len(df.columns)} columns") # TODO: Use logging instead of print
53+
logging.info(f"✅ Extracted {len(df)} rows and {len(df.columns)} columns") # TODO: Use logging instead of print
4754
return df
4855

4956
except pd.errors.EmptyDataError:

app/etl/load.py

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,14 @@
11
import pandas as pd
22
import sqlite3
33
import os
4+
import logging
45
# TODO (Find & Fix)
56
from typing import Optional
67

8+
logging.basicConfig(
9+
level=logging.INFO,
10+
format="%(asctime)s - %(levelname)s - %(message)s"
11+
)
712
def load(df: pd.DataFrame, db_path: str = "etl_data.db", table_name: str = "processed_data"):
813
"""
914
Loads data into SQLite database with proper error handling and upsert logic.
@@ -14,10 +19,10 @@ def load(df: pd.DataFrame, db_path: str = "etl_data.db", table_name: str = "proc
1419
table_name: Name of the table to create/update
1520
"""
1621
if df.empty:
17-
print("⚠️ Warning: Empty DataFrame received, nothing to load") # TODO (Find & Fix)
22+
logging.warning("⚠️ Warning: Empty DataFrame received, nothing to load") # TODO (Find & Fix)
1823
return
1924

20-
print(f"🔄 Loading {len(df)} rows into database '{db_path}'") # TODO (Find & Fix)
25+
logging.info(f"🔄 Loading {len(df)} rows into database '{db_path}'") # TODO (Find & Fix)
2126

2227
# Ensure directory exists
2328
db_dir = os.path.dirname(db_path)
@@ -50,13 +55,21 @@ def load(df: pd.DataFrame, db_path: str = "etl_data.db", table_name: str = "proc
5055
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
5156
)
5257
""")
58+
columns = ", ".join(df.columns)
59+
placeholders = ", ".join(["?"] * len(df.columns))
60+
update_clause = ", ".join([f"{col}=excluded.{col}" for col in df.columns if col != "employee_id"])
61+
62+
sql_query = f"""
63+
INSERT INTO {table_name} ({columns})
64+
VALUES ({placeholders})
65+
ON CONFLICT(employee_id) DO UPDATE SET {update_clause};
66+
"""
5367

5468
data_to_insert = [tuple(row) for row in df.itertuples(index=False, name=None)]
55-
placeholders = ", ".join(["?"] * len(df.columns))
56-
column_names = ", ".join(df.columns)
57-
sql_query = f"INSERT OR IGNORE INTO {table_name} ({column_names}) VALUES ({placeholders})"
5869
cursor.executemany(sql_query, data_to_insert)
5970
conn.commit()
71+
72+
logging.info(f"Successfully loaded {len(df)} records into '{table_name}'.")
6073
# TODO (Find & Fix): Bulk insert without checking for duplicates
6174

6275

app/etl/transform.py

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,16 @@
11
import pandas as pd
2+
import logging
23
from datetime import datetime
34
# TODO (Find & Fix)
45
from typing import Optional
56

7+
8+
logging.basicConfig(
9+
level=logging.INFO,
10+
format="%(asctime)s - %(levelname)s - %(message)s"
11+
)
12+
logger = logging.getLogger(__name__)
13+
614
def transform(df: pd.DataFrame) -> pd.DataFrame:
715
"""
816
Transform data by cleaning and standardizing it.
@@ -14,31 +22,38 @@ def transform(df: pd.DataFrame) -> pd.DataFrame:
1422
Transformed DataFrame
1523
"""
1624
if df.empty:
25+
raise ValueError("DataFrame is Empty.")
1726
# TODO (Find & Fix): Should raise a ValueError if DataFrame is empty
18-
pass
27+
1928

2029
# Create a copy to avoid modifying original
2130
df_transformed = df.copy()
2231

23-
print(f"🔄 Starting transformation of {len(df_transformed)} rows") # TODO (Find & Fix): Use logging instead of print
32+
logger.info(f"🔄 Starting transformation of {len(df_transformed)} rows") # TODO (Find & Fix): Use logging instead of print
2433

2534
# Handle duplicates
2635
initial_rows = len(df_transformed)
27-
# TODO (Find & Fix): Duplicates are not removed
36+
df_transformed.drop_duplicates(inplace=True)# TODO (Find & Fix): Duplicates are not removed
2837
duplicates_removed = initial_rows - len(df_transformed)
2938
if duplicates_removed > 0:
39+
logger.info(f"Removed {duplicates_removed} duplicate rows.")
3040
# TODO (Find & Fix): Should log how many duplicates were removed
3141
pass
3242

3343
# Handle null values in numeric columns
3444
numeric_columns = df_transformed.select_dtypes(include=['number']).columns
3545
for col in numeric_columns:
46+
if df_transformed[col].isnull().any():
47+
mean_value = df_transformed[col].mean()
48+
df_transformed[col].fillna(mean_value, inplace=True)
3649
# TODO (Find & Fix): Nulls in numeric columns are not handled
3750
pass
3851

3952
# Handle null values in text columns
4053
text_columns = df_transformed.select_dtypes(include=['object']).columns
4154
for col in text_columns:
55+
if df_transformed[col].isnull().any():
56+
df_transformed[col].fillna("Unknown", inplace=True)
4257
# TODO (Find & Fix): Nulls in text columns are not handled
4358
pass
4459

@@ -47,8 +62,15 @@ def transform(df: pd.DataFrame) -> pd.DataFrame:
4762
if any(keyword in col.lower() for keyword in ['date', 'time', 'created', 'updated'])]
4863

4964
for col in date_columns:
50-
# TODO (Find & Fix): Date columns are not standardized
51-
pass
65+
df_transformed[col] = pd.to_datetime(df_transformed[col], errors='coerce')
66+
if df_transformed[col].isnull().any():
67+
median_date = df_transformed[col].median()
68+
df_transformed[col].fillna(median_date, inplace=True)# TODO (Find & Fix): Date columns are not standardized
69+
70+
for col in text_columns:
71+
df_transformed[col] = df_transformed[col].astype(str).str.strip().str.lower()
72+
73+
logger.info("Transformation completed successfully.")
5274

5375
# TODO (Find & Fix): Text columns are not cleaned (strip, lowercase)
5476
return df_transformed

app/main.py

Lines changed: 28 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,12 @@
1+
import logging
12
from app.etl.extract import extract
23
from app.etl.transform import transform
34
from app.etl.load import load
45

6+
logging.basicConfig(
7+
level=logging.INFO,
8+
format="%(asctime)s - %(levelname)s - %(message)s"
9+
)
510
def run_pipeline(csv_path: str = "data.csv", db_path: str = "etl_data.db"):
611
"""
712
Run the complete ETL pipeline.
@@ -11,41 +16,46 @@ def run_pipeline(csv_path: str = "data.csv", db_path: str = "etl_data.db"):
1116
db_path: Path to the output SQLite database
1217
"""
1318
try:
14-
print("🚀 Starting ETL Pipeline") # TODO (Find & Fix): Use logging instead of print
15-
print(f"📁 Input file: {csv_path}")
16-
print(f"🗄️ Output database: {db_path}")
17-
print("-" * 50)
19+
logging.info("🚀 Starting ETL Pipeline") # TODO (Find & Fix): Use logging instead of logging.info
20+
logging.info(f"📁 Input file: {csv_path}")
21+
logging.info(f"🗄️ Output database: {db_path}")
22+
logging.info("-" * 50)
1823

1924
# Extract
20-
print("📥 STEP 1: EXTRACT")
25+
logging.info("📥 STEP 1: EXTRACT")
2126
df = extract(csv_path)
22-
print(f"✅ Extracted {len(df)} rows")
23-
print(f"📊 Columns: {list(df.columns)}")
24-
print()
27+
logging.info(f"✅ Extracted {len(df)} rows")
28+
logging.info(f"📊 Columns: {list(df.columns)}")
29+
logging.info("-" * 50)
30+
2531

2632
# Transform
27-
print("🔄 STEP 2: TRANSFORM")
33+
logging.info("🔄 STEP 2: TRANSFORM")
2834
df_transformed = transform(df)
29-
print(f"✅ Transformed data ready")
30-
print()
35+
logging.info(f"✅ Transformed data ready")
36+
logging.info("-" * 50)
37+
3138

3239
# Load
33-
print("📤 STEP 3: LOAD")
40+
logging.info("📤 STEP 3: LOAD")
3441
load(df_transformed, db_path)
35-
print()
42+
logging.info("-" * 50)
43+
3644

37-
print("🎉 ETL Pipeline completed successfully!")
38-
print(f"📈 Final dataset: {len(df_transformed)} rows, {len(df_transformed.columns)} columns")
45+
logging.info("🎉 ETL Pipeline completed successfully!")
46+
logging.info(f"📈 Final dataset: {len(df_transformed)} rows, {len(df_transformed.columns)} columns")
3947

4048
except FileNotFoundError as e:
41-
print(f"❌ File Error: {e}")
49+
logging.error(f"❌ File Error: {e}")
4250

4351
except ValueError as e:
52+
logging.error(f"⚠️ Value Error: {e}")
53+
raise
4454
# TODO (Find & Fix): Error handling missing
45-
pass
55+
4656
except Exception as e:
4757
# TODO (Find & Fix): Error handling missing
48-
pass
58+
logging.exception(f"🔥 Unexpected error: {e}")
4959

5060
if __name__ == "__main__":
5161
# Run the pipeline

0 commit comments

Comments
 (0)