Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 24 additions & 25 deletions src/mysql_to_sqlite3/transporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -895,37 +895,37 @@ def _create_table(self, table_name: str, attempting_reconnect: bool = False) ->
self._logger.error("SQLite failed creating table %s: %s", table_name, err)
raise

@staticmethod
def _mysql_viewdef_to_sqlite(
view_select_sql: str,
view_name: str,
schema_name: t.Optional[str] = None,
keep_schema: bool = False,
) -> str:
"""
Convert a MySQL VIEW_DEFINITION (a SELECT ...) to a SQLite CREATE VIEW statement.

If keep_schema is False and schema_name is provided, strip qualifiers like `example`.table.
If keep_schema is True, you must ATTACH the SQLite database as that schema name before using the view.
"""
def _mysql_viewdef_to_sqlite(self, view_select_sql: str, view_name: str) -> str:
"""Convert a MySQL VIEW_DEFINITION (a SELECT ...) to a SQLite CREATE VIEW statement."""
# Normalize whitespace and avoid double semicolons in output
cleaned_sql = view_select_sql.strip().rstrip(";")

try:
tree = parse_one(cleaned_sql, read="mysql")
except (ParseError, ValueError, Exception): # pylint: disable=W0718
# Fallback: return a basic CREATE VIEW using the original SELECT
return f'CREATE VIEW IF NOT EXISTS "{view_name}" AS\n{cleaned_sql};'

if not keep_schema and schema_name:
# Remove schema qualifiers that match schema_name
for tbl in tree.find_all(exp.Table):
db = tbl.args.get("db")
if db and db.name.strip('`"') == schema_name:
tbl.set("db", None)
except (ParseError, ValueError, AttributeError, TypeError):
# Fallback: try to remove schema qualifiers if requested, then return
stripped_sql = cleaned_sql
# Remove qualifiers `schema`.tbl or "schema".tbl or schema.tbl
sn = re.escape(self._mysql_database)
for pat in (rf"`{sn}`\.", rf'"{sn}"\.', rf"\b{sn}\."):
stripped_sql = re.sub(pat, "", stripped_sql)
view_ident = self._quote_sqlite_identifier(view_name)
return f"CREATE VIEW IF NOT EXISTS {view_ident} AS\n{stripped_sql};"

# Remove schema qualifiers that match schema_name on tables
for tbl in tree.find_all(exp.Table):
db = tbl.args.get("db")
if db and db.name.strip('`"') == self._mysql_database:
tbl.set("db", None)
# Also remove schema qualifiers on fully-qualified columns (db.table.column)
for col in tree.find_all(exp.Column):
db = col.args.get("db")
if db and db.name.strip('`"') == self._mysql_database:
col.set("db", None)

sqlite_select = tree.sql(dialect="sqlite")
return f'CREATE VIEW IF NOT EXISTS "{view_name}" AS\n{sqlite_select};'
view_ident = self._quote_sqlite_identifier(view_name)
return f"CREATE VIEW IF NOT EXISTS {view_ident} AS\n{sqlite_select};"

def _build_create_view_sql(self, view_name: str) -> str:
"""Build a CREATE VIEW statement for SQLite from a MySQL VIEW definition."""
Expand Down Expand Up @@ -990,7 +990,6 @@ def _build_create_view_sql(self, view_name: str) -> str:
return self._mysql_viewdef_to_sqlite(
view_name=view_name,
view_select_sql=definition,
schema_name=self._mysql_database,
)

def _create_view(self, view_name: str, attempting_reconnect: bool = False) -> None:
Expand Down
5 changes: 1 addition & 4 deletions tests/unit/test_views_build_paths_extra.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,9 @@ def test_build_create_view_sql_information_schema_bytes_decode_failure_falls_bac

captured = {}

def fake_converter(*, view_select_sql: str, view_name: str, schema_name: str, keep_schema: bool = False) -> str:
def fake_converter(*, view_select_sql: str, view_name: str) -> str:
captured["view_select_sql"] = view_select_sql
captured["view_name"] = view_name
captured["schema_name"] = schema_name
captured["keep_schema"] = keep_schema
return f'CREATE VIEW IF NOT EXISTS "{view_name}" AS SELECT 1;'

monkeypatch.setattr(MySQLtoSQLite, "_mysql_viewdef_to_sqlite", staticmethod(fake_converter))
Expand All @@ -39,7 +37,6 @@ def fake_converter(*, view_select_sql: str, view_name: str, schema_name: str, ke

# Converter was invoked with the string representation of the undecodable bytes
assert captured["view_name"] == "v_strange"
assert captured["schema_name"] == "db"
assert isinstance(captured["view_select_sql"], str)
# And a CREATE VIEW statement was produced
assert sql.startswith('CREATE VIEW IF NOT EXISTS "v_strange" AS')
Expand Down
8 changes: 1 addition & 7 deletions tests/unit/test_views_create_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,9 @@ def capture_execute(sql: str) -> None:
# Capture the definition passed to _mysql_viewdef_to_sqlite and return a dummy SQL
captured: t.Dict[str, str] = {}

def fake_mysql_viewdef_to_sqlite(
*, view_select_sql: str, view_name: str, schema_name: t.Optional[str] = None, keep_schema: bool = False
) -> str:
def fake_mysql_viewdef_to_sqlite(*, view_select_sql: str, view_name: str) -> str:
captured["select"] = view_select_sql
captured["view_name"] = view_name
captured["schema_name"] = schema_name or ""
captured["keep_schema"] = str(keep_schema)
return 'CREATE VIEW IF NOT EXISTS "dummy" AS SELECT 1;'

monkeypatch.setattr(MySQLtoSQLite, "_mysql_viewdef_to_sqlite", staticmethod(fake_mysql_viewdef_to_sqlite))
Expand All @@ -64,5 +60,3 @@ def fake_mysql_viewdef_to_sqlite(
assert captured["select"] == "SELECT 1 AS `x`"
# Check view_name was threaded unchanged to the converter
assert captured["view_name"] == "we`ird"
# Schema name also provided
assert captured["schema_name"] == "db"
68 changes: 59 additions & 9 deletions tests/unit/test_views_sqlglot.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import re
from unittest.mock import patch

import pytest

Expand All @@ -8,10 +9,13 @@
class TestViewsSqlglot:
def test_mysql_viewdef_to_sqlite_strips_schema_and_transpiles(self) -> None:
mysql_select = "SELECT `u`.`id`, `u`.`name` FROM `db`.`users` AS `u` WHERE `u`.`id` > 1"
sql = MySQLtoSQLite._mysql_viewdef_to_sqlite(
# Use an instance to ensure access to _mysql_database for stripping
with patch.object(MySQLtoSQLite, "__init__", return_value=None):
inst = MySQLtoSQLite() # type: ignore[call-arg]
inst._mysql_database = "db" # type: ignore[attr-defined]
sql = inst._mysql_viewdef_to_sqlite(
view_select_sql=mysql_select,
view_name="v_users",
schema_name="db",
)
assert sql.startswith('CREATE VIEW IF NOT EXISTS "v_users" AS')
# Ensure schema qualifier was removed
Expand All @@ -32,23 +36,69 @@ def boom(*args, **kwargs):
monkeypatch.setattr("mysql_to_sqlite3.transporter.parse_one", boom)

sql_in = "SELECT 1"
out = MySQLtoSQLite._mysql_viewdef_to_sqlite(
with patch.object(MySQLtoSQLite, "__init__", return_value=None):
inst = MySQLtoSQLite() # type: ignore[call-arg]
inst._mysql_database = "db" # type: ignore[attr-defined]
out = inst._mysql_viewdef_to_sqlite(
view_select_sql=sql_in,
view_name="v1",
schema_name="db",
)
assert out.startswith('CREATE VIEW IF NOT EXISTS "v1" AS')
assert "SELECT 1" in out
assert out.strip().endswith(";")

def test_mysql_viewdef_to_sqlite_parse_fallback_strips_schema(self, monkeypatch: pytest.MonkeyPatch) -> None:
# Force parse_one to raise so we exercise the fallback path with schema qualifiers
from sqlglot.errors import ParseError

def boom(*args, **kwargs):
raise ParseError("boom")

monkeypatch.setattr("mysql_to_sqlite3.transporter.parse_one", boom)

mysql_select = "SELECT `u`.`id` FROM `db`.`users` AS `u` WHERE `u`.`id` > 1"
with patch.object(MySQLtoSQLite, "__init__", return_value=None):
inst = MySQLtoSQLite() # type: ignore[call-arg]
inst._mysql_database = "db" # type: ignore[attr-defined]
out = inst._mysql_viewdef_to_sqlite(
view_select_sql=mysql_select,
view_name="v_users",
)
# Should not contain schema qualifier anymore
assert "`db`." not in out and '"db".' not in out and " db." not in out
# Should still reference the table name
assert "FROM `users`" in out or 'FROM "users"' in out or "FROM users" in out
assert out.strip().endswith(";")

def test_mysql_viewdef_to_sqlite_strips_schema_from_qualified_columns_nested(self) -> None:
# Based on the user-reported example with nested subquery and fully-qualified columns
mysql_sql = (
"select `p`.`instrument_id` AS `instrument_id`,`p`.`price_date` AS `price_date`,`p`.`close` AS `close` "
"from (`example`.`prices` `p` join (select `example`.`prices`.`instrument_id` AS `instrument_id`,"
"max(`example`.`prices`.`price_date`) AS `max_date` from `example`.`prices` group by "
"`example`.`prices`.`instrument_id`) `t` on(((`t`.`instrument_id` = `p`.`instrument_id`) and "
"(`t`.`max_date` = `p`.`price_date`))))"
)
with patch.object(MySQLtoSQLite, "__init__", return_value=None):
inst = MySQLtoSQLite() # type: ignore[call-arg]
inst._mysql_database = "example" # type: ignore[attr-defined]
out = inst._mysql_viewdef_to_sqlite(view_select_sql=mysql_sql, view_name="v_prices")
# Ensure all schema qualifiers are removed, including on qualified columns inside subqueries
assert '"example".' not in out and "`example`." not in out and " example." not in out
# Still references the base table name
assert 'FROM "prices"' in out or 'FROM ("prices"' in out or "FROM prices" in out
assert out.strip().endswith(";")

def test_mysql_viewdef_to_sqlite_keep_schema_true_preserves_qualifiers(self) -> None:
mysql_select = "SELECT `u`.`id` FROM `db`.`users` AS `u`"
sql = MySQLtoSQLite._mysql_viewdef_to_sqlite(
# Use instance for consistent attribute access
with patch.object(MySQLtoSQLite, "__init__", return_value=None):
inst = MySQLtoSQLite() # type: ignore[call-arg]
inst._mysql_database = "db" # type: ignore[attr-defined]
# Since keep_schema behavior is no longer parameterized, ensure that if schema matches current db, it is stripped
sql = inst._mysql_viewdef_to_sqlite(
view_select_sql=mysql_select,
view_name="v_users",
schema_name="db",
keep_schema=True,
)
# Should not strip the schema when keep_schema=True
assert "`db`." in sql or '"db".' in sql
assert "`db`." not in sql and '"db".' not in sql
assert sql.strip().endswith(";")
Loading