Skip to content

Commit 6e5d072

Browse files
Merge pull request #25 from Satvik-Singh192/feat/add-load-idempotency-12
feat: implemented idempotency
2 parents 916e6c7 + ed9be5b commit 6e5d072

File tree

1 file changed

+26
-3
lines changed

1 file changed

+26
-3
lines changed

app/etl/load.py

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,33 @@ def load(df: pd.DataFrame, db_path: str = "etl_data.db", table_name: str = "proc
3232

3333
# TODO (Find & Fix): Table creation and schema logic missing
3434

35-
# TODO (Find & Fix): Idempotency check missing (should avoid duplicate inserts)
36-
# TODO (Find & Fix): Bulk insert without checking for duplicates
37-
df.to_sql(table_name, conn, if_exists="append", index=False)
35+
# Idempotency check (should avoid duplicate inserts)
36+
cursor.execute(f"""
37+
CREATE TABLE IF NOT EXISTS {table_name} (
38+
employee_id INTEGER PRIMARY KEY,
39+
name TEXT,
40+
email TEXT,
41+
age INTEGER,
42+
department TEXT,
43+
job_title TEXT,
44+
salary REAL,
45+
city TEXT,
46+
hire_date TEXT,
47+
performance_rating REAL,
48+
phone TEXT,
49+
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
50+
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
51+
)
52+
""")
53+
54+
data_to_insert = [tuple(row) for row in df.itertuples(index=False, name=None)]
55+
placeholders = ", ".join(["?"] * len(df.columns))
56+
column_names = ", ".join(df.columns)
57+
sql_query = f"INSERT OR IGNORE INTO {table_name} ({column_names}) VALUES ({placeholders})"
58+
cursor.executemany(sql_query, data_to_insert)
3859
conn.commit()
60+
# TODO (Find & Fix): Bulk insert without checking for duplicates
61+
3962

4063
except sqlite3.Error as e:
4164
if conn:

0 commit comments

Comments
 (0)