Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ dependencies = [
"python-dateutil>=2.9.0.post0",
"python-slugify>=7.0.0",
"simplejson>=3.19.0",
"sqlglot>=27.27.0",
"tqdm>=4.65.0",
"tabulate",
"typing-extensions; python_version < \"3.11\"",
Expand Down
1 change: 1 addition & 0 deletions requirements_dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ simplejson>=3.19.1
types-simplejson
sqlalchemy>=2.0.0
sqlalchemy-utils
sqlglot>=27.27.0
types-sqlalchemy-utils
tox
tqdm>=4.65.0
Expand Down
8 changes: 8 additions & 0 deletions src/mysql_to_sqlite3/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,12 @@
)
@click.option("-l", "--log-file", type=click.Path(), help="Log file")
@click.option("--json-as-text", is_flag=True, help="Transfer JSON columns as TEXT.")
@click.option(
"-T",
"--mysql-views-as-tables",
is_flag=True,
help="Materialize MySQL VIEWs as SQLite tables (legacy behavior).",
)
@click.option(
"-V",
"--vacuum",
Expand Down Expand Up @@ -182,6 +188,7 @@ def cli(
chunk: int,
log_file: t.Union[str, "os.PathLike[t.Any]"],
json_as_text: bool,
mysql_views_as_tables: bool,
vacuum: bool,
use_buffered_cursors: bool,
quiet: bool,
Expand Down Expand Up @@ -230,6 +237,7 @@ def cli(
mysql_ssl_disabled=skip_ssl,
chunk=chunk,
json_as_text=json_as_text,
views_as_views=not mysql_views_as_tables,
vacuum=vacuum,
buffered=use_buffered_cursors,
log_file=log_file,
Expand Down
173 changes: 165 additions & 8 deletions src/mysql_to_sqlite3/transporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
from mysql.connector import CharacterSet, errorcode
from mysql.connector.abstracts import MySQLConnectionAbstract
from mysql.connector.types import RowItemType
from sqlglot import exp, parse_one
from sqlglot.errors import ParseError
from tqdm import tqdm, trange


Expand Down Expand Up @@ -120,6 +122,8 @@ def __init__(self, **kwargs: Unpack[MySQLtoSQLiteParams]) -> None:

self._quiet = bool(kwargs.get("quiet", False))

self._views_as_views = bool(kwargs.get("views_as_views", True))

self._sqlite_strict = bool(kwargs.get("sqlite_strict", False))

self._logger = self._setup_logger(log_file=kwargs.get("log_file") or None, quiet=self._quiet)
Expand Down Expand Up @@ -637,6 +641,7 @@ def _create_table(self, table_name: str, attempting_reconnect: bool = False) ->
if not attempting_reconnect:
self._logger.warning("Connection to MySQL server lost.\nAttempting to reconnect.")
self._create_table(table_name, True)
return
else:
self._logger.warning("Connection to MySQL server lost.\nReconnection attempt aborted.")
raise
Expand All @@ -650,6 +655,130 @@ def _create_table(self, table_name: str, attempting_reconnect: bool = False) ->
self._logger.error("SQLite failed creating table %s: %s", table_name, err)
raise

@staticmethod
def _mysql_viewdef_to_sqlite(
view_select_sql: str,
view_name: str,
schema_name: t.Optional[str] = None,
keep_schema: bool = False,
) -> str:
"""
Convert a MySQL VIEW_DEFINITION (a SELECT ...) to a SQLite CREATE VIEW statement.

If keep_schema is False and schema_name is provided, strip qualifiers like `example`.table.
If keep_schema is True, you must ATTACH the SQLite database as that schema name before using the view.
"""
# Normalize whitespace and avoid double semicolons in output
cleaned_sql = view_select_sql.strip().rstrip(";")

try:
tree = parse_one(cleaned_sql, read="mysql")
except (ParseError, ValueError, Exception): # pylint: disable=W0718
# Fallback: return a basic CREATE VIEW using the original SELECT
return f'CREATE VIEW IF NOT EXISTS "{view_name}" AS\n{cleaned_sql};'

if not keep_schema and schema_name:
# Remove schema qualifiers that match schema_name
for tbl in tree.find_all(exp.Table):
db = tbl.args.get("db")
if db and db.name.strip('`"') == schema_name:
tbl.set("db", None)

sqlite_select = tree.sql(dialect="sqlite")
return f'CREATE VIEW IF NOT EXISTS "{view_name}" AS\n{sqlite_select};'

def _build_create_view_sql(self, view_name: str) -> str:
"""Build a CREATE VIEW statement for SQLite from a MySQL VIEW definition."""
# Try to obtain the view definition from information_schema.VIEWS
definition: t.Optional[str] = None
try:
self._mysql_cur_dict.execute(
"""
SELECT VIEW_DEFINITION AS `definition`
FROM information_schema.VIEWS
WHERE TABLE_SCHEMA = %s
AND TABLE_NAME = %s
""",
(self._mysql_database, view_name),
)
row: t.Optional[t.Dict[str, RowItemType]] = self._mysql_cur_dict.fetchone()
if row is not None and row.get("definition") is not None:
val = row["definition"]
if isinstance(val, bytes):
try:
definition = val.decode()
except UnicodeDecodeError:
definition = str(val)
else:
definition = t.cast(str, val)
except mysql.connector.Error:
# Fall back to SHOW CREATE VIEW below
definition = None

if not definition:
# Fallback: use SHOW CREATE VIEW and extract the SELECT part
try:
# Escape backticks in the MySQL view name for safe interpolation
safe_view_name = view_name.replace("`", "``")
self._mysql_cur.execute(f"SHOW CREATE VIEW `{safe_view_name}`")
res = self._mysql_cur.fetchone()
if res and len(res) >= 2:
create_stmt = res[1]
if isinstance(create_stmt, bytes):
try:
create_stmt_str = create_stmt.decode()
except UnicodeDecodeError:
create_stmt_str = str(create_stmt)
else:
create_stmt_str = t.cast(str, create_stmt)
# Extract the SELECT ... part after AS (supporting newlines)
m = re.search(r"\bAS\b\s*(.*)$", create_stmt_str, re.IGNORECASE | re.DOTALL)
if m:
definition = m.group(1).strip().rstrip(";")
else:
# As a last resort, try to use the full statement replacing the prefix
# Not ideal, but better than failing outright
idx = create_stmt_str.upper().find(" AS ")
if idx != -1:
definition = create_stmt_str[idx + 4 :].strip().rstrip(";")
except mysql.connector.Error:
pass

if not definition:
raise sqlite3.Error(f"Unable to fetch definition for MySQL view '{view_name}'")

return self._mysql_viewdef_to_sqlite(
view_name=view_name,
view_select_sql=definition,
schema_name=self._mysql_database,
)

def _create_view(self, view_name: str, attempting_reconnect: bool = False) -> None:
try:
if attempting_reconnect:
self._mysql.reconnect()
sql = self._build_create_view_sql(view_name)
self._sqlite_cur.execute(sql)
self._sqlite.commit()
except mysql.connector.Error as err:
if err.errno == errorcode.CR_SERVER_LOST:
if not attempting_reconnect:
self._logger.warning("Connection to MySQL server lost.\nAttempting to reconnect.")
self._create_view(view_name, True)
return
else:
self._logger.warning("Connection to MySQL server lost.\nReconnection attempt aborted.")
raise
self._logger.error(
"MySQL failed reading view definition from view %s: %s",
view_name,
err,
)
raise
except sqlite3.Error as err:
self._logger.error("SQLite failed creating view %s: %s", view_name, err)
raise

def _transfer_table_data(
self, table_name: str, sql: str, total_records: int = 0, attempting_reconnect: bool = False
) -> None:
Expand Down Expand Up @@ -693,6 +822,7 @@ def _transfer_table_data(
total_records=total_records,
attempting_reconnect=True,
)
return
else:
self._logger.warning("Connection to MySQL server lost.\nReconnection attempt aborted.")
raise
Expand Down Expand Up @@ -720,7 +850,7 @@ def transfer(self) -> None:

self._mysql_cur_prepared.execute(
"""
SELECT TABLE_NAME
SELECT TABLE_NAME, TABLE_TYPE
FROM information_schema.TABLES
WHERE TABLE_SCHEMA = SCHEMA()
AND TABLE_NAME {exclude} IN ({placeholders})
Expand All @@ -730,25 +860,49 @@ def transfer(self) -> None:
),
specific_tables,
)
tables: t.Iterable[RowItemType] = (row[0] for row in self._mysql_cur_prepared.fetchall())
tables: t.Iterable[t.Tuple[str, str]] = (
(
str(row[0].decode() if isinstance(row[0], (bytes, bytearray)) else row[0]),
str(row[1].decode() if isinstance(row[1], (bytes, bytearray)) else row[1]),
)
for row in self._mysql_cur_prepared.fetchall()
)
else:
# transfer all tables
self._mysql_cur.execute(
"""
SELECT TABLE_NAME
SELECT TABLE_NAME, TABLE_TYPE
FROM information_schema.TABLES
WHERE TABLE_SCHEMA = SCHEMA()
"""
)
tables = (row[0].decode() for row in self._mysql_cur.fetchall()) # type: ignore[union-attr]

def _coerce_row(row: t.Any) -> t.Tuple[str, str]:
try:
# Row like (name, type)
name = row[0].decode() if isinstance(row[0], (bytes, bytearray)) else row[0]
ttype = (
row[1].decode()
if (isinstance(row, (list, tuple)) and len(row) > 1 and isinstance(row[1], (bytes, bytearray)))
else (row[1] if (isinstance(row, (list, tuple)) and len(row) > 1) else "BASE TABLE")
)
return str(name), str(ttype)
except (TypeError, IndexError, UnicodeDecodeError):
# Fallback: treat as a single value name when row is not a 2-tuple or decoding fails
name = row.decode() if isinstance(row, (bytes, bytearray)) else str(row)
return name, "BASE TABLE"

tables = (_coerce_row(row) for row in self._mysql_cur.fetchall())

try:
# turn off foreign key checking in SQLite while transferring data
self._sqlite_cur.execute("PRAGMA foreign_keys=OFF")

for table_name in tables:
for table_name, table_type in tables:
if isinstance(table_name, bytes):
table_name = table_name.decode()
if isinstance(table_type, bytes):
table_type = table_type.decode()

self._logger.info(
"%s%sTransferring table %s",
Expand All @@ -761,10 +915,13 @@ def transfer(self) -> None:
self._current_chunk_number = 0

if not self._without_tables:
# create the table
self._create_table(table_name) # type: ignore[arg-type]
# create the table or view
if table_type == "VIEW" and self._views_as_views:
self._create_view(table_name) # type: ignore[arg-type]
else:
self._create_table(table_name) # type: ignore[arg-type]

if not self._without_data:
if not self._without_data and not (table_type == "VIEW" and self._views_as_views):
# get the size of the data
if self._limit_rows > 0:
# limit to the requested number of rows
Expand Down
2 changes: 2 additions & 0 deletions src/mysql_to_sqlite3/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class MySQLtoSQLiteParams(TypedDict):
without_tables: t.Optional[bool]
without_data: t.Optional[bool]
without_foreign_keys: t.Optional[bool]
views_as_views: t.Optional[bool]


class MySQLtoSQLiteAttributes:
Expand Down Expand Up @@ -81,6 +82,7 @@ class MySQLtoSQLiteAttributes:
_vacuum: bool
_without_data: bool
_without_foreign_keys: bool
_views_as_views: bool
# Tracking of SQLite index names and counters to ensure uniqueness when prefixing is disabled
_seen_sqlite_index_names: t.Set[str]
_sqlite_index_name_counters: t.Dict[str, int]
68 changes: 68 additions & 0 deletions tests/unit/test_cli_views_flag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import typing as t

import pytest
from click.testing import CliRunner

from mysql_to_sqlite3.cli import cli as mysql2sqlite


class TestCLIViewsFlag:
def test_mysql_views_as_tables_flag_is_threaded(self, monkeypatch: pytest.MonkeyPatch) -> None:
"""Ensure --mysql-views-as-tables reaches MySQLtoSQLite as views_as_views=False (legacy materialization)."""
received_kwargs: t.Dict[str, t.Any] = {}

class FakeConverter:
def __init__(self, **kwargs: t.Any) -> None:
received_kwargs.update(kwargs)

def transfer(self) -> None: # pragma: no cover - nothing to do
return None

# Patch the converter used by the CLI
monkeypatch.setattr("mysql_to_sqlite3.cli.MySQLtoSQLite", FakeConverter)

runner = CliRunner()
result = runner.invoke(
mysql2sqlite,
[
"-f",
"out.sqlite3",
"-d",
"db",
"-u",
"user",
"--mysql-views-as-tables",
],
)
assert result.exit_code == 0
assert received_kwargs.get("views_as_views") is False

def test_mysql_views_as_tables_short_flag_is_threaded(self, monkeypatch: pytest.MonkeyPatch) -> None:
"""Ensure -T (short for --mysql-views-as-tables) reaches MySQLtoSQLite as views_as_views=False."""
received_kwargs: t.Dict[str, t.Any] = {}

class FakeConverter:
def __init__(self, **kwargs: t.Any) -> None:
received_kwargs.update(kwargs)

def transfer(self) -> None: # pragma: no cover - nothing to do
return None

# Patch the converter used by the CLI
monkeypatch.setattr("mysql_to_sqlite3.cli.MySQLtoSQLite", FakeConverter)

runner = CliRunner()
result = runner.invoke(
mysql2sqlite,
[
"-f",
"out.sqlite3",
"-d",
"db",
"-u",
"user",
"-T",
],
)
assert result.exit_code == 0
assert received_kwargs.get("views_as_views") is False
Loading