Skip to content

Commit 608c68d

Browse files
committed
fix: handle null values and improve ETL pipeline
Updated extract, load, transform, and main scripts to handle missing data and improve consistency.
1 parent 6e5d072 commit 608c68d

File tree

4 files changed

+84
-32
lines changed

4 files changed

+84
-32
lines changed

app/etl/extract.py

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,13 @@
11
import pandas as pd
22
import os
3+
import logging
34
# TODO (Find & Fix)
45
from typing import Optional
56

7+
logging.basicConfig(
8+
level=logging.INFO,
9+
format="%(asctime)s - %(levelname)s - %(message)s"
10+
)
611
def extract(path: str = "xyz.csv") -> pd.DataFrame :
712
"""
813
Extracts data from CSV file.
@@ -27,14 +32,16 @@ def extract(path: str = "xyz.csv") -> pd.DataFrame :
2732
try:
2833
# Try different encodings
2934
encodings = ['utf-8', 'latin-1', 'cp1252', 'iso-8859-1']
30-
df = None
35+
df: Optional[pd.DataFrame] = None
3136

3237
for encoding in encodings:
3338
try:
39+
df = pd.read_csv(path, encoding=encoding)
40+
break
3441
# TODO (Find & Fix)
35-
pass
42+
3643
except UnicodeDecodeError:
37-
print(f"Failed to read with encoding '{encoding}'") # Log the encoding that failed
44+
logging.warning(f"Failed to read with encoding '{encoding}'") # Log the encoding that failed
3845

3946
if df is None:
4047
raise ValueError(f" Could not read CSV with tried encodings: {encodings}")
@@ -43,7 +50,7 @@ def extract(path: str = "xyz.csv") -> pd.DataFrame :
4350
if df.empty:
4451
raise ValueError("File contains no data")
4552

46-
print(f"✅ Extracted {len(df)} rows and {len(df.columns)} columns") # TODO: Use logging instead of print
53+
logging.info(f"✅ Extracted {len(df)} rows and {len(df.columns)} columns") # TODO: Use logging instead of print
4754
return df
4855

4956
except pd.errors.EmptyDataError:

app/etl/load.py

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,14 @@
11
import pandas as pd
22
import sqlite3
33
import os
4+
import logging
45
# TODO (Find & Fix)
56
from typing import Optional
67

8+
logging.basicConfig(
9+
level=logging.INFO,
10+
format="%(asctime)s - %(levelname)s - %(message)s"
11+
)
712
def load(df: pd.DataFrame, db_path: str = "etl_data.db", table_name: str = "processed_data"):
813
"""
914
Loads data into SQLite database with proper error handling and upsert logic.
@@ -14,10 +19,10 @@ def load(df: pd.DataFrame, db_path: str = "etl_data.db", table_name: str = "proc
1419
table_name: Name of the table to create/update
1520
"""
1621
if df.empty:
17-
print("⚠️ Warning: Empty DataFrame received, nothing to load") # TODO (Find & Fix)
22+
logging.warning("⚠️ Warning: Empty DataFrame received, nothing to load") # TODO (Find & Fix)
1823
return
1924

20-
print(f"🔄 Loading {len(df)} rows into database '{db_path}'") # TODO (Find & Fix)
25+
logging.info(f"🔄 Loading {len(df)} rows into database '{db_path}'") # TODO (Find & Fix)
2126

2227
# Ensure directory exists
2328
db_dir = os.path.dirname(db_path)
@@ -50,13 +55,21 @@ def load(df: pd.DataFrame, db_path: str = "etl_data.db", table_name: str = "proc
5055
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
5156
)
5257
""")
58+
columns = ", ".join(df.columns)
59+
placeholders = ", ".join(["?"] * len(df.columns))
60+
update_clause = ", ".join([f"{col}=excluded.{col}" for col in df.columns if col != "employee_id"])
61+
62+
sql_query = f"""
63+
INSERT INTO {table_name} ({columns})
64+
VALUES ({placeholders})
65+
ON CONFLICT(employee_id) DO UPDATE SET {update_clause};
66+
"""
5367

5468
data_to_insert = [tuple(row) for row in df.itertuples(index=False, name=None)]
55-
placeholders = ", ".join(["?"] * len(df.columns))
56-
column_names = ", ".join(df.columns)
57-
sql_query = f"INSERT OR IGNORE INTO {table_name} ({column_names}) VALUES ({placeholders})"
5869
cursor.executemany(sql_query, data_to_insert)
5970
conn.commit()
71+
72+
logging.info(f"Successfully loaded {len(df)} records into '{table_name}'.")
6073
# TODO (Find & Fix): Bulk insert without checking for duplicates
6174

6275

app/etl/transform.py

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,16 @@
11
import pandas as pd
2+
import logging
23
from datetime import datetime
34
# TODO (Find & Fix)
45
from typing import Optional
56

7+
8+
logging.basicConfig(
9+
level=logging.INFO,
10+
format="%(asctime)s - %(levelname)s - %(message)s"
11+
)
12+
logger = logging.getLogger(__name__)
13+
614
def transform(df: pd.DataFrame) -> pd.DataFrame:
715
"""
816
Transform data by cleaning and standardizing it.
@@ -14,31 +22,38 @@ def transform(df: pd.DataFrame) -> pd.DataFrame:
1422
Transformed DataFrame
1523
"""
1624
if df.empty:
25+
raise ValueError("DataFrame is Empty.")
1726
# TODO (Find & Fix): Should raise a ValueError if DataFrame is empty
18-
pass
27+
1928

2029
# Create a copy to avoid modifying original
2130
df_transformed = df.copy()
2231

23-
print(f"🔄 Starting transformation of {len(df_transformed)} rows") # TODO (Find & Fix): Use logging instead of print
32+
logger.info(f"🔄 Starting transformation of {len(df_transformed)} rows") # TODO (Find & Fix): Use logging instead of print
2433

2534
# Handle duplicates
2635
initial_rows = len(df_transformed)
27-
# TODO (Find & Fix): Duplicates are not removed
36+
df_transformed.drop_duplicates(inplace=True)# TODO (Find & Fix): Duplicates are not removed
2837
duplicates_removed = initial_rows - len(df_transformed)
2938
if duplicates_removed > 0:
39+
logger.info(f"Removed {duplicates_removed} duplicate rows.")
3040
# TODO (Find & Fix): Should log how many duplicates were removed
3141
pass
3242

3343
# Handle null values in numeric columns
3444
numeric_columns = df_transformed.select_dtypes(include=['number']).columns
3545
for col in numeric_columns:
46+
if df_transformed[col].isnull().any():
47+
mean_value = df_transformed[col].mean()
48+
df_transformed[col].fillna(mean_value, inplace=True)
3649
# TODO (Find & Fix): Nulls in numeric columns are not handled
3750
pass
3851

3952
# Handle null values in text columns
4053
text_columns = df_transformed.select_dtypes(include=['object']).columns
4154
for col in text_columns:
55+
if df_transformed[col].isnull().any():
56+
df_transformed[col].fillna("Unknown", inplace=True)
4257
# TODO (Find & Fix): Nulls in text columns are not handled
4358
pass
4459

@@ -47,8 +62,15 @@ def transform(df: pd.DataFrame) -> pd.DataFrame:
4762
if any(keyword in col.lower() for keyword in ['date', 'time', 'created', 'updated'])]
4863

4964
for col in date_columns:
50-
# TODO (Find & Fix): Date columns are not standardized
51-
pass
65+
df_transformed[col] = pd.to_datetime(df_transformed[col], errors='coerce')
66+
if df_transformed[col].isnull().any():
67+
median_date = df_transformed[col].median()
68+
df_transformed[col].fillna(median_date, inplace=True)# TODO (Find & Fix): Date columns are not standardized
69+
70+
for col in text_columns:
71+
df_transformed[col] = df_transformed[col].astype(str).str.strip().str.lower()
72+
73+
logger.info("Transformation completed successfully.")
5274

5375
# TODO (Find & Fix): Text columns are not cleaned (strip, lowercase)
5476
return df_transformed

app/main.py

Lines changed: 28 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,12 @@
1+
import logging
12
from app.etl.extract import extract
23
from app.etl.transform import transform
34
from app.etl.load import load
45

6+
logging.basicConfig(
7+
level=logging.INFO,
8+
format="%(asctime)s - %(levelname)s - %(message)s"
9+
)
510
def run_pipeline(csv_path: str = "data.csv", db_path: str = "etl_data.db"):
611
"""
712
Run the complete ETL pipeline.
@@ -11,41 +16,46 @@ def run_pipeline(csv_path: str = "data.csv", db_path: str = "etl_data.db"):
1116
db_path: Path to the output SQLite database
1217
"""
1318
try:
14-
print("🚀 Starting ETL Pipeline") # TODO (Find & Fix): Use logging instead of print
15-
print(f"📁 Input file: {csv_path}")
16-
print(f"🗄️ Output database: {db_path}")
17-
print("-" * 50)
19+
logging.info("🚀 Starting ETL Pipeline") # TODO (Find & Fix): Use logging instead of logging.info
20+
logging.info(f"📁 Input file: {csv_path}")
21+
logging.info(f"🗄️ Output database: {db_path}")
22+
logging.info("-" * 50)
1823

1924
# Extract
20-
print("📥 STEP 1: EXTRACT")
25+
logging.info("📥 STEP 1: EXTRACT")
2126
df = extract(csv_path)
22-
print(f"✅ Extracted {len(df)} rows")
23-
print(f"📊 Columns: {list(df.columns)}")
24-
print()
27+
logging.info(f"✅ Extracted {len(df)} rows")
28+
logging.info(f"📊 Columns: {list(df.columns)}")
29+
logging.info("-" * 50)
30+
2531

2632
# Transform
27-
print("🔄 STEP 2: TRANSFORM")
33+
logging.info("🔄 STEP 2: TRANSFORM")
2834
df_transformed = transform(df)
29-
print(f"✅ Transformed data ready")
30-
print()
35+
logging.info(f"✅ Transformed data ready")
36+
logging.info("-" * 50)
37+
3138

3239
# Load
33-
print("📤 STEP 3: LOAD")
40+
logging.info("📤 STEP 3: LOAD")
3441
load(df_transformed, db_path)
35-
print()
42+
logging.info("-" * 50)
43+
3644

37-
print("🎉 ETL Pipeline completed successfully!")
38-
print(f"📈 Final dataset: {len(df_transformed)} rows, {len(df_transformed.columns)} columns")
45+
logging.info("🎉 ETL Pipeline completed successfully!")
46+
logging.info(f"📈 Final dataset: {len(df_transformed)} rows, {len(df_transformed.columns)} columns")
3947

4048
except FileNotFoundError as e:
41-
print(f"❌ File Error: {e}")
49+
logging.error(f"❌ File Error: {e}")
4250

4351
except ValueError as e:
52+
logging.error(f"⚠️ Value Error: {e}")
53+
raise
4454
# TODO (Find & Fix): Error handling missing
45-
pass
55+
4656
except Exception as e:
4757
# TODO (Find & Fix): Error handling missing
48-
pass
58+
logging.exception(f"🔥 Unexpected error: {e}")
4959

5060
if __name__ == "__main__":
5161
# Run the pipeline

0 commit comments

Comments
 (0)