Skip to content

Commit e29ba26

Browse files
authored
Merge pull request #5 from rrad0812/develop
Add scaffold framework and sql to neo4j sync!
2 parents bcf437a + 9b5679a commit e29ba26

18 files changed

+1643
-567
lines changed
16 KB
Binary file not shown.
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
{
2+
"person": "2025-10-19 20:24:55",
3+
"project": "2025-10-19 20:24:55",
4+
"person_project": "2025-10-19 20:24:55"
5+
}
Lines changed: 349 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,349 @@
1+
import sqlite3
2+
from neo4j import GraphDatabase
3+
import pandas as pd
4+
import hashlib, json, requests, sys
5+
from collections import defaultdict
6+
from datetime import datetime
7+
from pathlib import Path
8+
9+
# =========================
10+
# CONFIG
11+
# =========================
12+
NEO4J_URI = "bolt://localhost:7687"
13+
NEO4J_AUTH = ("neo4j", "rrad0812")
14+
15+
SYNC_LOG_FILE = Path("importer/sync_log.json")
16+
17+
# True = simulacija (NE PIŠE u Neo4j / log); False = piše promene
18+
DRY_RUN = False
19+
20+
# Boje za lepši log (on/off)
21+
USE_COLOR = True
22+
23+
24+
# =========================
25+
# UTIL: boje & ispisi
26+
# =========================
27+
def c(code, s):
28+
if not USE_COLOR:
29+
return s
30+
return f"\033[{code}m{s}\033[0m"
31+
32+
def info(msg): print(c("36", f"[i] {msg}"))
33+
def ok(msg): print(c("32", f"[✓] {msg}"))
34+
def warn(msg): print(c("33", f"[!] {msg}"))
35+
def err(msg): print(c("31", f"[x] {msg}"))
36+
37+
38+
# =========================
39+
# DATA SOURCE ADAPTERS
40+
# =========================
41+
def load_sqlite_dataset(db_path):
42+
conn = sqlite3.connect(db_path)
43+
cur = conn.cursor()
44+
cur.execute("SELECT name FROM sqlite_master WHERE type='table'")
45+
tables = [r[0] for r in cur.fetchall()]
46+
dataset = {}
47+
for t in tables:
48+
df = pd.read_sql_query(f"SELECT * FROM {t}", conn)
49+
dataset[t] = df.to_dict(orient="records")
50+
ok(f"SQLite učitan ({db_path}) sa {len(dataset)} tabela.")
51+
return dataset
52+
53+
def load_csv_dataset(csv_dir):
54+
dataset = {}
55+
for f in Path(csv_dir).glob("*.csv"):
56+
table = f.stem
57+
df = pd.read_csv(f)
58+
dataset[table] = df.to_dict(orient="records")
59+
ok(f"CSV folder '{csv_dir}' učitan sa {len(dataset)} tabela.")
60+
return dataset
61+
62+
def load_json_dataset(json_path):
63+
data = json.loads(Path(json_path).read_text(encoding="utf-8"))
64+
ok(f"JSON '{json_path}' učitan sa {len(data)} tabela.")
65+
return data
66+
67+
def load_api_dataset(api_url):
68+
r = requests.get(api_url, timeout=60)
69+
r.raise_for_status()
70+
data = r.json()
71+
ok(f"API '{api_url}' učitan sa {len(data)} tabela.")
72+
return data
73+
74+
75+
# =========================
76+
# SYNC UTILS
77+
# =========================
78+
def make_hash(row):
79+
# stabilan hash nad svim ključevima (None -> "")
80+
row_str = "|".join(f"{k}:{'' if row[k] is None else row[k]}" for k in sorted(row))
81+
return hashlib.md5(row_str.encode()).hexdigest()
82+
83+
def make_rel_hash(a_id, b_id, props_dict):
84+
# hash za relaciju (krajevi + props) – koristi se samo za DRY dif log
85+
base = f"a:{a_id}|b:{b_id}|" + "|".join(f"{k}:{props_dict.get(k)}" for k in sorted(props_dict))
86+
return hashlib.md5(base.encode()).hexdigest()
87+
88+
def load_sync_log():
89+
if SYNC_LOG_FILE.exists():
90+
try:
91+
return json.loads(SYNC_LOG_FILE.read_text(encoding="utf-8"))
92+
except json.JSONDecodeError:
93+
return {}
94+
return {}
95+
96+
def save_sync_log(data):
97+
SYNC_LOG_FILE.write_text(json.dumps(data, indent=2, ensure_ascii=False), encoding="utf-8")
98+
99+
def log_sync_time(sync_data, table):
100+
sync_data[table] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
101+
102+
def guess_relation_name(from_col, to_table):
103+
base = str(from_col).lower()
104+
to = str(to_table).lower()
105+
if base.endswith("_id"):
106+
base = base[:-3]
107+
return f"{base.upper()}_REL_{to.upper()}"
108+
109+
def title_label(name: str) -> str:
110+
# Label iz imena tabele: account_user -> Account_user (jednostavno)
111+
if not name:
112+
return "X"
113+
return name[0].upper() + name[1:]
114+
115+
116+
# =========================================================
117+
# HEURISTIČKO PREPOZNAVANJE FK I JOIN TABELA (CSV/JSON/API)
118+
# =========================================================
119+
def detect_fk_columns(table, rows):
120+
"""Vraća listu kolona koje izgledaju kao FK: *_id."""
121+
if not rows:
122+
return []
123+
# sve kolone zajedno iz svih redova (za slučaj nested/različite ključeve)
124+
keys = set().union(*(r.keys() for r in rows))
125+
return [k for k in keys if k.endswith("_id")]
126+
127+
def is_join_table_heuristic(table, rows, dataset):
128+
"""
129+
Join tabela ako:
130+
- ima TAČNO 2 *_id kolone
131+
- i obe bazne tabele postoje u datasetu
132+
"""
133+
fks = detect_fk_columns(table, rows)
134+
if len(fks) != 2:
135+
return False, []
136+
base1 = fks[0][:-3]
137+
base2 = fks[1][:-3]
138+
if base1 in dataset and base2 in dataset:
139+
return True, fks
140+
return False, fks
141+
142+
def extra_rel_props(table, rows, fk_cols):
143+
"""Kolone koje nisu FK – postaju properties relacije (ako postoje)."""
144+
if not rows:
145+
return []
146+
keys = set().union(*(r.keys() for r in rows))
147+
return [k for k in keys if k not in fk_cols]
148+
149+
150+
# =========================
151+
# SMART SYNC ENGINE
152+
# =========================
153+
def smart_sync(dataset, driver):
154+
with driver.session() as session:
155+
stats_nodes = defaultdict(lambda: {"new": 0, "updated": 0, "deleted": 0})
156+
stats_rels = defaultdict(int)
157+
sync_data = load_sync_log()
158+
all_existing_nodes = set()
159+
160+
tables = list(dataset.keys())
161+
info(f"Pronađeno tabela: {len(tables)}")
162+
163+
# 0) pre-calc join tabele da ih preskočimo kao čvorove
164+
join_tables = {}
165+
for t in tables:
166+
is_join, fks = is_join_table_heuristic(t, dataset[t], dataset)
167+
if is_join:
168+
join_tables[t] = fks
169+
if join_tables:
170+
info("Join tabele (heuristika): " + ", ".join(join_tables.keys()))
171+
172+
# 1) NODES (INSERT/UPDATE) – skip join tables
173+
print(c("1;44", "\n=== NODE SYNC ==="))
174+
for table in tables:
175+
# skip čiste join tabele (ne pravimo čvorove)
176+
if table in join_tables:
177+
info(f"Preskačem čvorove za join tabelu '{table}'")
178+
log_sync_time(sync_data, table)
179+
continue
180+
181+
rows = dataset[table]
182+
for row in rows:
183+
if "id" not in row:
184+
warn(f"{table}: red bez 'id' – preskačem: {row}")
185+
continue
186+
187+
row_hash = make_hash(row)
188+
row["_hash"] = row_hash
189+
all_existing_nodes.add((table, row["id"]))
190+
191+
res = session.run(
192+
f"MATCH (n:{title_label(table)} {{id: $id}}) RETURN n._hash AS h",
193+
id=row["id"]
194+
).single()
195+
196+
if res and res["h"] == row_hash:
197+
# nema promene
198+
continue
199+
200+
if res:
201+
stats_nodes[table]["updated"] += 1
202+
print(c("36", f"& UPDATE → {table} id={row['id']}"))
203+
else:
204+
stats_nodes[table]["new"] += 1
205+
print(c("32", f"+ INSERT → {table} id={row['id']}"))
206+
207+
if not DRY_RUN:
208+
props = ", ".join(f"n.{k} = ${k}" for k in row.keys())
209+
cypher = f"""
210+
MERGE (n:{title_label(table)} {{id: $id}})
211+
ON CREATE SET {props}, n.tip = '{table}'
212+
ON MATCH SET {props}
213+
"""
214+
session.run(cypher, **row)
215+
216+
log_sync_time(sync_data, table)
217+
218+
# 2) RELATIONSHIPS
219+
print(c("1;44", "\n=== RELATION SYNC ==="))
220+
for table in tables:
221+
rows = dataset[table]
222+
223+
# (A) JOIN TABELA ⇒ direktne relacije (sa props ako ima extra kolona)
224+
if table in join_tables:
225+
fk1, fk2 = join_tables[table]
226+
base1, base2 = fk1[:-3], fk2[:-3]
227+
rel_name = f"{base1.upper()}_{base2.upper()}_REL"
228+
props_cols = extra_rel_props(table, rows, [fk1, fk2])
229+
230+
info(f"{table}: join → {base1}{base2} [{rel_name}] (props={bool(props_cols)})")
231+
232+
for row in rows:
233+
a_id = row.get(fk1)
234+
b_id = row.get(fk2)
235+
if a_id is None or b_id is None:
236+
continue
237+
238+
params = {"a_id": a_id, "b_id": b_id}
239+
props_set = ""
240+
if props_cols:
241+
for p in props_cols:
242+
params[p] = row.get(p, None)
243+
props_set = " {" + ", ".join(f"{p}: ${p}" for p in props_cols) + "}"
244+
245+
stats_rels[rel_name] += 1
246+
print(c("35", f"MERGE rel → {base1}({a_id}) -[{rel_name}{' props' if props_cols else ''}]-> {base2}({b_id})"))
247+
248+
if not DRY_RUN:
249+
cypher = f"""
250+
MATCH (a:{title_label(base1)} {{id: $a_id}}),
251+
(b:{title_label(base2)} {{id: $b_id}})
252+
MERGE (a)-[r:{rel_name}]->(b)
253+
{"SET " + ", ".join(f"r.{p} = ${p}" for p in props_cols) if props_cols else ""}
254+
"""
255+
session.run(cypher, **params)
256+
257+
continue # idući table
258+
259+
# (B) Regularne FK relacije – svaka *_id kolona vezuje na {base}.id ako takva tabela postoji
260+
fk_cols = detect_fk_columns(table, rows)
261+
for fk in fk_cols:
262+
ref = fk[:-3]
263+
if ref not in dataset:
264+
continue # nema ciljane tabele u datasetu → preskoči
265+
266+
rel_name = guess_relation_name(fk, ref)
267+
info(f"{table}.{fk} -> {ref}.id [{rel_name}]")
268+
269+
# nema “props” – ovo su obične 1:N veze
270+
if not DRY_RUN:
271+
cypher = f"""
272+
MATCH (a:{title_label(table)}), (b:{title_label(ref)})
273+
WHERE a.{fk} = b.id
274+
MERGE (a)-[:{rel_name}]->(b)
275+
"""
276+
session.run(cypher)
277+
# statistika (približno – bez brojanja po redovima radi brzine)
278+
stats_rels[rel_name] += len(rows) if rows else 0
279+
280+
# 3) DELETE “duhova” (čvorovi koji više ne postoje u izvoru)
281+
print(c("1;44", "\n=== CLEANUP ==="))
282+
for table in tables:
283+
if table in join_tables:
284+
continue # join tabele ne generišu čvorove
285+
286+
ids_src = [i for (t, i) in all_existing_nodes if t == table]
287+
res = session.run(
288+
f"MATCH (n:{title_label(table)}) RETURN collect(n.id) AS ids"
289+
).single()
290+
ids_neo = res["ids"] if res and res["ids"] else []
291+
to_delete = [i for i in ids_neo if i not in ids_src]
292+
293+
if to_delete:
294+
stats_nodes[table]["deleted"] += len(to_delete)
295+
for i in to_delete:
296+
print(c("31", f"× DELETE → {table} id={i}"))
297+
if not DRY_RUN:
298+
session.run(
299+
f"MATCH (n:{title_label(table)}) WHERE n.id IN $ids DETACH DELETE n",
300+
ids=to_delete,
301+
)
302+
303+
# 4) META TAGS & LOG
304+
if not DRY_RUN:
305+
# meta: broj veza po čvoru
306+
session.run("""
307+
MATCH (n)
308+
OPTIONAL MATCH (n)-[r]-()
309+
WITH n, count(r) AS broj_veza
310+
SET n.count_of_rel = broj_veza
311+
""")
312+
save_sync_log(sync_data)
313+
ok("Sinhronizacija upisana i meta tagovi ažurirani.")
314+
else:
315+
warn("DRY-RUN je uključen: Neo4j i sync log NISU menjani.")
316+
317+
# 5) REZIME
318+
print(c("1;44", "\n=== REZIME ==="))
319+
total_new = total_upd = total_del = 0
320+
for t, s in stats_nodes.items():
321+
n, u, d = s["new"], s["updated"], s["deleted"]
322+
if n or u or d:
323+
print(f"{title_label(t):18} → + {n:3} & {u:3} × {d:3}")
324+
total_new += n; total_upd += u; total_del += d
325+
print(f"\nČVOROVI → + {total_new} & {total_upd} × {total_del}")
326+
327+
if stats_rels:
328+
print("\nVEZE:")
329+
for rname, cnt in stats_rels.items():
330+
print(f" {rname:28} ~ {cnt}")
331+
print("\n✅ Universal Smart Diff Sync v3.1 završeno.\n")
332+
333+
334+
# =========================
335+
# MAIN
336+
# =========================
337+
if __name__ == "__main__":
338+
driver = GraphDatabase.driver(NEO4J_URI, auth=NEO4J_AUTH)
339+
340+
# === IZABERI IZVOR (otkomentariši jednu liniju) ===
341+
# dataset = load_sqlite_dataset("importer/example.db")
342+
# dataset = load_csv_dataset("data/") # npr. data/person.csv, data/project.csv, data/person_project.csv
343+
# dataset = load_json_dataset("data.json") # JSON: { "person":[...], "project":[...], "person_project":[...] }
344+
# dataset = load_api_dataset("https://example.com/api/export")
345+
346+
# Primer: SQLite
347+
dataset = load_sqlite_dataset("importer/example.db")
348+
349+
smart_sync(dataset, driver)

0 commit comments

Comments
 (0)