From 608c68d611974839ed444deff092ea95dacac816 Mon Sep 17 00:00:00 2001 From: Ayush Date: Fri, 7 Nov 2025 08:52:00 +0530 Subject: [PATCH 1/3] fix: handle null values and improve ETL pipeline Updated extract, load, transform, and main scripts to handle missing data and improve consistency. --- app/etl/extract.py | 15 +++++++++++---- app/etl/load.py | 23 +++++++++++++++++----- app/etl/transform.py | 32 +++++++++++++++++++++++++----- app/main.py | 46 +++++++++++++++++++++++++++----------------- 4 files changed, 84 insertions(+), 32 deletions(-) diff --git a/app/etl/extract.py b/app/etl/extract.py index 94714f2..5044796 100644 --- a/app/etl/extract.py +++ b/app/etl/extract.py @@ -1,8 +1,13 @@ import pandas as pd import os +import logging # TODO (Find & Fix) from typing import Optional +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(levelname)s - %(message)s" +) def extract(path: str = "xyz.csv") -> pd.DataFrame : """ Extracts data from CSV file. @@ -27,14 +32,16 @@ def extract(path: str = "xyz.csv") -> pd.DataFrame : try: # Try different encodings encodings = ['utf-8', 'latin-1', 'cp1252', 'iso-8859-1'] - df = None + df: Optional[pd.DataFrame] = None for encoding in encodings: try: + df = pd.read_csv(path, encoding=encoding) + break # TODO (Find & Fix) - pass + except UnicodeDecodeError: - print(f"Failed to read with encoding '{encoding}'") # Log the encoding that failed + logging.warning(f"Failed to read with encoding '{encoding}'") # Log the encoding that failed if df is None: raise ValueError(f" Could not read CSV with tried encodings: {encodings}") @@ -43,7 +50,7 @@ def extract(path: str = "xyz.csv") -> pd.DataFrame : if df.empty: raise ValueError("File contains no data") - print(f"✅ Extracted {len(df)} rows and {len(df.columns)} columns") # TODO: Use logging instead of print + logging.info(f"✅ Extracted {len(df)} rows and {len(df.columns)} columns") # TODO: Use logging instead of print return df except pd.errors.EmptyDataError: diff --git a/app/etl/load.py b/app/etl/load.py index 97936f9..548b357 100644 --- a/app/etl/load.py +++ b/app/etl/load.py @@ -1,9 +1,14 @@ import pandas as pd import sqlite3 import os +import logging # TODO (Find & Fix) from typing import Optional +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(levelname)s - %(message)s" +) def load(df: pd.DataFrame, db_path: str = "etl_data.db", table_name: str = "processed_data"): """ Loads data into SQLite database with proper error handling and upsert logic. @@ -14,10 +19,10 @@ def load(df: pd.DataFrame, db_path: str = "etl_data.db", table_name: str = "proc table_name: Name of the table to create/update """ if df.empty: - print("⚠️ Warning: Empty DataFrame received, nothing to load") # TODO (Find & Fix) + logging.warning("⚠️ Warning: Empty DataFrame received, nothing to load") # TODO (Find & Fix) return - print(f"🔄 Loading {len(df)} rows into database '{db_path}'") # TODO (Find & Fix) + logging.info(f"🔄 Loading {len(df)} rows into database '{db_path}'") # TODO (Find & Fix) # Ensure directory exists db_dir = os.path.dirname(db_path) @@ -50,13 +55,21 @@ def load(df: pd.DataFrame, db_path: str = "etl_data.db", table_name: str = "proc updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """) + columns = ", ".join(df.columns) + placeholders = ", ".join(["?"] * len(df.columns)) + update_clause = ", ".join([f"{col}=excluded.{col}" for col in df.columns if col != "employee_id"]) + + sql_query = f""" + INSERT INTO {table_name} ({columns}) + VALUES ({placeholders}) + ON CONFLICT(employee_id) DO UPDATE SET {update_clause}; + """ data_to_insert = [tuple(row) for row in df.itertuples(index=False, name=None)] - placeholders = ", ".join(["?"] * len(df.columns)) - column_names = ", ".join(df.columns) - sql_query = f"INSERT OR IGNORE INTO {table_name} ({column_names}) VALUES ({placeholders})" cursor.executemany(sql_query, data_to_insert) conn.commit() + + logging.info(f"Successfully loaded {len(df)} records into '{table_name}'.") # TODO (Find & Fix): Bulk insert without checking for duplicates diff --git a/app/etl/transform.py b/app/etl/transform.py index 13887c6..c197014 100644 --- a/app/etl/transform.py +++ b/app/etl/transform.py @@ -1,8 +1,16 @@ import pandas as pd +import logging from datetime import datetime # TODO (Find & Fix) from typing import Optional + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(levelname)s - %(message)s" +) +logger = logging.getLogger(__name__) + def transform(df: pd.DataFrame) -> pd.DataFrame: """ Transform data by cleaning and standardizing it. @@ -14,31 +22,38 @@ def transform(df: pd.DataFrame) -> pd.DataFrame: Transformed DataFrame """ if df.empty: + raise ValueError("DataFrame is Empty.") # TODO (Find & Fix): Should raise a ValueError if DataFrame is empty - pass + # Create a copy to avoid modifying original df_transformed = df.copy() - print(f"🔄 Starting transformation of {len(df_transformed)} rows") # TODO (Find & Fix): Use logging instead of print + logger.info(f"🔄 Starting transformation of {len(df_transformed)} rows") # TODO (Find & Fix): Use logging instead of print # Handle duplicates initial_rows = len(df_transformed) - # TODO (Find & Fix): Duplicates are not removed + df_transformed.drop_duplicates(inplace=True)# TODO (Find & Fix): Duplicates are not removed duplicates_removed = initial_rows - len(df_transformed) if duplicates_removed > 0: + logger.info(f"Removed {duplicates_removed} duplicate rows.") # TODO (Find & Fix): Should log how many duplicates were removed pass # Handle null values in numeric columns numeric_columns = df_transformed.select_dtypes(include=['number']).columns for col in numeric_columns: + if df_transformed[col].isnull().any(): + mean_value = df_transformed[col].mean() + df_transformed[col].fillna(mean_value, inplace=True) # TODO (Find & Fix): Nulls in numeric columns are not handled pass # Handle null values in text columns text_columns = df_transformed.select_dtypes(include=['object']).columns for col in text_columns: + if df_transformed[col].isnull().any(): + df_transformed[col].fillna("Unknown", inplace=True) # TODO (Find & Fix): Nulls in text columns are not handled pass @@ -47,8 +62,15 @@ def transform(df: pd.DataFrame) -> pd.DataFrame: if any(keyword in col.lower() for keyword in ['date', 'time', 'created', 'updated'])] for col in date_columns: - # TODO (Find & Fix): Date columns are not standardized - pass + df_transformed[col] = pd.to_datetime(df_transformed[col], errors='coerce') + if df_transformed[col].isnull().any(): + median_date = df_transformed[col].median() + df_transformed[col].fillna(median_date, inplace=True)# TODO (Find & Fix): Date columns are not standardized + + for col in text_columns: + df_transformed[col] = df_transformed[col].astype(str).str.strip().str.lower() + + logger.info("Transformation completed successfully.") # TODO (Find & Fix): Text columns are not cleaned (strip, lowercase) return df_transformed \ No newline at end of file diff --git a/app/main.py b/app/main.py index e61920b..d24ed28 100644 --- a/app/main.py +++ b/app/main.py @@ -1,7 +1,12 @@ +import logging from app.etl.extract import extract from app.etl.transform import transform from app.etl.load import load +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(levelname)s - %(message)s" +) def run_pipeline(csv_path: str = "data.csv", db_path: str = "etl_data.db"): """ Run the complete ETL pipeline. @@ -11,41 +16,46 @@ def run_pipeline(csv_path: str = "data.csv", db_path: str = "etl_data.db"): db_path: Path to the output SQLite database """ try: - print("🚀 Starting ETL Pipeline") # TODO (Find & Fix): Use logging instead of print - print(f"📁 Input file: {csv_path}") - print(f"🗄️ Output database: {db_path}") - print("-" * 50) + logging.info("🚀 Starting ETL Pipeline") # TODO (Find & Fix): Use logging instead of logging.info + logging.info(f"📁 Input file: {csv_path}") + logging.info(f"🗄️ Output database: {db_path}") + logging.info("-" * 50) # Extract - print("📥 STEP 1: EXTRACT") + logging.info("📥 STEP 1: EXTRACT") df = extract(csv_path) - print(f"✅ Extracted {len(df)} rows") - print(f"📊 Columns: {list(df.columns)}") - print() + logging.info(f"✅ Extracted {len(df)} rows") + logging.info(f"📊 Columns: {list(df.columns)}") + logging.info("-" * 50) + # Transform - print("🔄 STEP 2: TRANSFORM") + logging.info("🔄 STEP 2: TRANSFORM") df_transformed = transform(df) - print(f"✅ Transformed data ready") - print() + logging.info(f"✅ Transformed data ready") + logging.info("-" * 50) + # Load - print("📤 STEP 3: LOAD") + logging.info("📤 STEP 3: LOAD") load(df_transformed, db_path) - print() + logging.info("-" * 50) + - print("🎉 ETL Pipeline completed successfully!") - print(f"📈 Final dataset: {len(df_transformed)} rows, {len(df_transformed.columns)} columns") + logging.info("🎉 ETL Pipeline completed successfully!") + logging.info(f"📈 Final dataset: {len(df_transformed)} rows, {len(df_transformed.columns)} columns") except FileNotFoundError as e: - print(f"❌ File Error: {e}") + logging.error(f"❌ File Error: {e}") except ValueError as e: + logging.error(f"⚠️ Value Error: {e}") + raise # TODO (Find & Fix): Error handling missing - pass + except Exception as e: # TODO (Find & Fix): Error handling missing - pass + logging.exception(f"🔥 Unexpected error: {e}") if __name__ == "__main__": # Run the pipeline From 3f0040908fb7e8ab3a3acb059d0b94de46619851 Mon Sep 17 00:00:00 2001 From: Ayush Date: Fri, 7 Nov 2025 09:17:34 +0530 Subject: [PATCH 2/3] chore: add .gitignore to ignore caches and env files --- .gitignore | 22 +++++++++++++++++++++- 1 file changed, 21 insertions(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index b512c09..a00cb39 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,21 @@ -node_modules \ No newline at end of file +# Python cache files +__pycache__/ +*.pyc +*.pyo + +# Environments +.venv/ +env/ +venv/ + +# Node modules +node_modules/ + +# IDE +.vscode/ +.idea/ + +# OS junk +.DS_Store +Thumbs.db +desktop.ini From 94bfa3a1024f34eba24ffcbe89ef574fdede6d0c Mon Sep 17 00:00:00 2001 From: Ayush Date: Fri, 7 Nov 2025 09:17:37 +0530 Subject: [PATCH 3/3] chore: remove __pycache__ from tracking and restore package-lock.json --- app/etl/__pycache__/extract.cpython-313.pyc | Bin 2403 -> 0 bytes app/etl/__pycache__/load.cpython-313.pyc | Bin 3836 -> 0 bytes app/etl/__pycache__/transform.cpython-313.pyc | Bin 3836 -> 0 bytes 3 files changed, 0 insertions(+), 0 deletions(-) delete mode 100644 app/etl/__pycache__/extract.cpython-313.pyc delete mode 100644 app/etl/__pycache__/load.cpython-313.pyc delete mode 100644 app/etl/__pycache__/transform.cpython-313.pyc diff --git a/app/etl/__pycache__/extract.cpython-313.pyc b/app/etl/__pycache__/extract.cpython-313.pyc deleted file mode 100644 index 10b5268f18bdd97a5d8c39f25a44837eae8778f8..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 2403 zcmai0OKcNI7@mFEYj3=HkbnT@1xAyCI8aSTLF6C=vI-WpYMq-wF(u#vOt&hC=L z;?PT_M6DErR28XIN>5d9uG*?OLV^26K?+?`se)cA^$-+D3Y7}3*otgkbpl8+Eu zSo1llZv9G(NL|jn9-|8!0TTNjrBM>6(K%`dICTgBjp+h;jBaaWTYRoTd%y>b6d0j3 zI={nTm4Z1&1QMTQ{2U;F-~uh+Z;mcty!i%_kgtRDq_aDH9}VhfAiruCZrE*$EaYRp zWv*TWjHPM&dUm(3)1baLtQ$#k{*2(=5B7n4yRR7pG7E4X-Qe6!07~El(+x@#)@W3z z)7Nd#Y*&N#IqV7VO}M=lq90eG{>LBmy)-EHhdZFL+iKgKV_)4#xVP*WTcYwkzTF$F zTK>z*&oV-QMRHfMK_#hDXWI!8J@&p&+Fz%-O`E~ zgmjdK@0MIk0tgm$#n29!P?=Q?)3v@Nmo4QDnyLYn>54fAbp%!Inp{?kVT*MC$yc92 z5(R3y2}>kS8dllE1>>3(-7cQC3=>{fpiJ^i!#G>zEVI;~ss_ulscQWRCs3#)1_lSJ zVb#$4Q>np`{)Bt^C|W@;;)E?o{N4TpN>-tu7)Hq|m**f-sGnw;(JY5*5;sLRVq3%aRuvkt=na zaBkZjrmSdA&`S|VC9n)f%#b}ncITF|a1rODpsh(LGh0yHha=8yfera3HXOQGLgdMC zD8(R;0XY)T={%L5%)EaVUZ@ZNlZsiM^mh2+%A6DQctmauVq_>303UanOo{o^@rZt<)5a$oBIBUop|R?pDmtw$e(@G+&Uln>-jFw zesB{|k(kpG-C!_%`FuC=II=LZCiJcdy;#T%k!)f|SH(e_A6yqATT+rPk*3^S2Wp90 zuN@m&6^Ctp_~p9LPGl35D;!<^Xt`j=-dPpLZGQYE4umCt8|1&m-Gjme3+@i|jwk7# zJI40n$-QKFg5vI-U@(6lP?*0@xjZ`-$fTGDVKEbB9z-LAKOkn3%!4F{`8ZYO$^A&K zi8${BD)lB7JYy3W-;Lf4Kw@=;S42`guwupYH=mfh$IbA?jbDayQ-?woDh=Jx;-gO#-*?<%t#Sa1*ijL^8NgVnULdAuwt?#wyy~S5E@^%=BQ&- zaRQ?$x?^93RZw}VK1d%Q2K~BpR;xbscyVv84 z(aY_*Dx%p$;{bLR(RNy!+P6f_B6izTOVLRAuyiX~N5R6H%nVmu~KVd!xxp-E~I z$CasAq|0vit5cfKj-iQBA3h7Hy(-)W7=Yv8s5n!+Eqr(s?hG=s*<~~qmkiDFb>hRr z1V)MCYlc`xR7prm2U7{)FWFNkAZuT~zxeO_8GKPtW#H(;ftf^du7ZgwMWooz zC9xCBN^+VkV~yKziX7gBySIOVNleI0x(2JtHLVyJ5+WlaZ93KFrFDDJxfm9by1mGN z$mv#tItf8fFyIdbaL9jVFo64e@$g6p2QKwRm zcp0AyT*iJrG}1o|KtqAyQ0E4VG~J1HswHh{!J)vVQ2JLJr_1bcsDCKHh5SRK_-vrp z&ku%h4<8HyV0e9IqiuOLKgM>Qm3LiAkvsLv4}Xb6MyA@*);0{O66ex2TwpNJ0}Mv^ z;ZS>r7xxB7hOi1Z3`#YX(^s?;;)W59C~{JW$r?_ZjfA9~X8{F5YZ#}<tNWJQ%Ach-m!PtC}hZZS$4dLfaJWU&|KY_VwAK+tq^ zOctft)UGvrE)|JLnifsP<8wwigRiu+qD0zKJBaE>H;sBTip9Y9;S{-U$pkM;vk57Z zBs9rxf+=I!K%eXu!v>Zsg9<#ba3rp1lIX3~%@V0&-JDQim+GdtB5TF2vWhX4 zJi1^xWa*mD&IoTwFsSKP?P?tMb2nMCfvQ^~DOFQcl~lXVTv1}OZk1-GNGd7mY*b1{ zrWIMz84R(000|hW+s-@%>lw$XH_L0pA;f*>9)XZL^9rjy4m3Bb*T&htK{ZX z$y-Uc!0QmG=nNsC!tsxE6NF#4i_vf?4W44Ph9Lm(ek2~AtM5k-_Hq-1=8970`*Ih`SbdKgRyN9{LI zf7elAJ7Vqc%)T@GgmvZI{dsm^m32R3Z1Y1O?#)hq(3Y*swH&{1y*KxO%GuB4nVxSf z$nD8oT)3DCFNCufS6ut9_kLk@-JZNNb!+O5d`n)u@Of?La&6}m)}C?PaJ<`_XX~Fj zs;-|c(0Uy@`vtOWApZ0&3J*KaO-Mo*W$?nq7S*Ay0&F5FI@h_x3cr}pL|O_**)2w?6F15 z2UVZ=mUo_BalMgq{;Yr)KV>Uetqsl`y9<_TtID_++_{}?dA1!E{H`NAkUhD0E$2R% zXAc!DrTMz9rSwufxAXNpdvdj|`EK)j%`0^sONW>1UYmD6b<}13D~`q|&TY9JNA8`@ z9Xa!0Uv7I}-q~MpBUg21cwsm@vDC5DkgNYm-g%tpZGSEA>;n9Trn|@AJH9k{zcY8> z&D`jv?D1UvWkL;H*L}`5FSE^y_WOMg134lGzlTw6!yV<8vQpc&L@n2L%sZYz>OAdB z^-HdM6AwBbG(0??bC2ZNQDW}OI2IhgcD-moPS2{-lNneTxH)K0I`TJXcfkz9-+q0S zLbVgrw_n@Qt~dVs%`k?U>LXS844?#QzTHtQ8^0QTJr_xJPopTykP-A*jV+Q%rZqq;`depe@ z9BuN_X$NeDFgYB%#7W%^mpeJuB+(f1+SJ|PTDK(NdKWaEPKauaF_4@~z@e&=2e57< zhm~=)>n1I!8awHa=s8Rw^$<~DPtGV}DlVN?;ZGBSKM_dlhM$6oqNvBn{TNk0Mz+7B g`u{K&DC+2ZFf*|*Q9wjq>?!sSxEIE^N^bT41NvBAPXGV_ diff --git a/app/etl/__pycache__/transform.cpython-313.pyc b/app/etl/__pycache__/transform.cpython-313.pyc deleted file mode 100644 index 4347539d440f62a3b68168ea3a8a8897d4d33528..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 3836 zcmcf^O-vi<^&8LF<3BK9?8FI}F9u4;5+D$i#r$k2386S9D-T|hg@royn0VKoac9O! z;7}p8)pDttRX5tGTB&SXDXFvvjvRW3de}zY%gmat(IhKXyS?nGq@-%Lz4U!!#yEto zsvi1Do_X`$``-Wi-sjz#8YhDFO8ZwgXct0Xkq>h*d%*4j0Cy3GI4Xi93f`88Wy&I1 zFu~CgdWw-4fSGwJW=Jyyl;527aQs<@vmHZkGm>qI>SVtr%+sBeZcil?UdiNm0$<51 z8Bs`NbxSI(+az8oNJ5gTl5Yo?hTrb*tq3i!cTpV0s>mjIat}z zYa@F>%E2|ThY+Z$ot8P)3x}p@d%bU6?2$-_uu&jJ+e2~oK`Q1xNCD|gobjT`G(tHxPR+0+6fw|qaG93I z>JL&7GHHS}ZY^7yvLF=gdcj&p+zQq@XIOZ;`mOu%s%Z7TEV)ASy-YK;nvw`_2e3tN zAml=j+rwkw{+3wS;x>py{Qnw@n*TNyTi-_pHY@HCtqT#E;BxH=bcstqa#!%tw-=meT;GDWi}(F`YuqbdeI zBbWZT1MFi=TQO=u+!4Vs=D0*fVMiHGQ)x6px1coDd17VRIU^+mIW0;#45J`{uP@_d zmQM&7VF@RM6qXfuCZtqmgwn$+8JG*-R4Vox!HtghK8*FTvnr zGeR<3Nbv=jSEfl5VgbBgsmDSgo5eR1*#a-)jDR~<{2g65oylg25w|kR4V>dsnS`L* z#OrVKNo6H)klqm&-VwxGLWj=oK`#tdGWh%9w>t@sQK}V=P%>Dd0pZXP>t`%63J3?5 z*%m{9zlrWzF^UGtjj@Ez(U4k-Om}NmyagW8x~02UXW!wMZ;4V$r;}n$GX;g38Ck$uBWFjk%c2}rpK%Pu0^!e9l z6{&%x&(5beX4kp<*FL)T>$%U5^}_czGV%3CEg`Ct(HU~{&>3>~(AlKK!&gdY3weT0 z>9(ZEOG$DovGI~5N^*gmKO~`00DA=qStZOE+$rQBGdk5#ASdBoQH_g*Yzhma0t~`U zUQ!@;o(`;t#x9~ajHe64WFodwq~&r-in)B2SNJeg0LG{BLYmGv5#&q84>rj+G8FGH{le`Vd4kx6mtv|PyPHd2y8LbvTzBb^YIbxS^_ zyA*MegvkiB>+nn4CtNuRKKT+~eYyRogD~~(k88e@C0~!`>nZxqlzao4Z=mQqx8~e- z`R-9a9ev`izY|?Qwcezj9^I5Td3F4n+VJL4Mcr>n!yj~ia z)kbEEBiD+dH`k``^%vbOPaUYPWv3Q3hDzQZ&D*nibknJNdsOdu$s5tUk=4tOJ;(n? zQ|8r%x3=612TTFwm#UOEf%3M#QtRMnt%J`H)tI3GxHta9AJ_^kY3I|$z*5P-^bZ>O zj%uxgtCx2So?RQNYg(gsER184dJ<~8-~FrZjo?FCJ9FXDRW0-eQJ}e7AUtYg-$wsN z`^FE{hQZI>;ioKY+^sWqZ@U8bS~XX&+iT?Xsi^vsD&<;LJ=(# zDTbn^=~fW&%!#Z{)qQe{IrW5;O7n-w_3mQujN06%I{UYnvj!Ib#=E{yY9G?thl(eL zRXn0PN4J>Q3|h;2Z3z!)c<5oHiicD@QNmLio+{$Yj}#4a>ddU#a#g)Lr#k1in797y z3aCuro4>t6!u0%EEvgIbB8o(S_}TqHe!y?M_VCQZR<&WG