Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ dependencies = [
"python-dateutil>=2.9.0.post0",
"python-slugify>=7.0.0",
"simplejson>=3.19.0",
"sqlglot>=27.27.0",
"tqdm>=4.65.0",
"tabulate",
"typing-extensions; python_version < \"3.11\"",
Expand Down
1 change: 1 addition & 0 deletions requirements_dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ simplejson>=3.19.1
types-simplejson
sqlalchemy>=2.0.0
sqlalchemy-utils
sqlglot>=27.27.0
types-sqlalchemy-utils
tox
tqdm>=4.65.0
Expand Down
8 changes: 8 additions & 0 deletions src/mysql_to_sqlite3/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,12 @@
)
@click.option("-l", "--log-file", type=click.Path(), help="Log file")
@click.option("--json-as-text", is_flag=True, help="Transfer JSON columns as TEXT.")
@click.option(
"-T",
"--mysql-views-as-tables",
is_flag=True,
help="Materialize MySQL VIEWs as SQLite tables (legacy behavior).",
)
@click.option(
"-V",
"--vacuum",
Expand Down Expand Up @@ -182,6 +188,7 @@ def cli(
chunk: int,
log_file: t.Union[str, "os.PathLike[t.Any]"],
json_as_text: bool,
mysql_views_as_tables: bool,
vacuum: bool,
use_buffered_cursors: bool,
quiet: bool,
Expand Down Expand Up @@ -230,6 +237,7 @@ def cli(
mysql_ssl_disabled=skip_ssl,
chunk=chunk,
json_as_text=json_as_text,
views_as_views=not mysql_views_as_tables,
vacuum=vacuum,
buffered=use_buffered_cursors,
log_file=log_file,
Expand Down
168 changes: 160 additions & 8 deletions src/mysql_to_sqlite3/transporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
from mysql.connector import CharacterSet, errorcode
from mysql.connector.abstracts import MySQLConnectionAbstract
from mysql.connector.types import RowItemType
from sqlglot import exp, parse_one
from sqlglot.errors import ParseError
from tqdm import tqdm, trange


Expand Down Expand Up @@ -120,6 +122,8 @@ def __init__(self, **kwargs: Unpack[MySQLtoSQLiteParams]) -> None:

self._quiet = bool(kwargs.get("quiet", False))

self._views_as_views = bool(kwargs.get("views_as_views", True))

self._sqlite_strict = bool(kwargs.get("sqlite_strict", False))

self._logger = self._setup_logger(log_file=kwargs.get("log_file") or None, quiet=self._quiet)
Expand Down Expand Up @@ -650,6 +654,127 @@ def _create_table(self, table_name: str, attempting_reconnect: bool = False) ->
self._logger.error("SQLite failed creating table %s: %s", table_name, err)
raise

@staticmethod
def _mysql_viewdef_to_sqlite(
view_select_sql: str,
view_name: str,
schema_name: t.Optional[str] = None,
keep_schema: bool = False,
) -> str:
"""
Convert a MySQL VIEW_DEFINITION (a SELECT ...) to a SQLite CREATE VIEW statement.

If keep_schema is False and schema_name is provided, strip qualifiers like `example`.table.
If keep_schema is True, you must ATTACH the SQLite database as that schema name before using the view.
"""
# Normalize whitespace and avoid double semicolons in output
cleaned_sql = view_select_sql.strip().rstrip(";")

try:
tree = parse_one(cleaned_sql, read="mysql")
except (ParseError, ValueError):
# Fallback: return a basic CREATE VIEW using the original SELECT
return f'CREATE VIEW IF NOT EXISTS "{view_name}" AS\n{cleaned_sql};'

if not keep_schema and schema_name:
# Remove schema qualifiers that match schema_name
for tbl in tree.find_all(exp.Table):
db = tbl.args.get("db")
if db and db.name.strip('`"') == schema_name:
tbl.set("db", None)

sqlite_select = tree.sql(dialect="sqlite")
return f'CREATE VIEW IF NOT EXISTS "{view_name}" AS\n{sqlite_select};'

def _build_create_view_sql(self, view_name: str) -> str:
"""Build a CREATE VIEW statement for SQLite from a MySQL VIEW definition."""
# Try to obtain the view definition from information_schema.VIEWS
definition: t.Optional[str] = None
try:
self._mysql_cur_dict.execute(
"""
SELECT VIEW_DEFINITION AS `definition`
FROM information_schema.VIEWS
WHERE TABLE_SCHEMA = %s
AND TABLE_NAME = %s
""",
(self._mysql_database, view_name),
)
row: t.Optional[t.Dict[str, RowItemType]] = self._mysql_cur_dict.fetchone()
if row is not None and row.get("definition") is not None:
val = row["definition"]
if isinstance(val, bytes):
try:
definition = val.decode()
except UnicodeDecodeError:
definition = str(val)
else:
definition = t.cast(str, val)
except mysql.connector.Error:
# Fall back to SHOW CREATE VIEW below
definition = None

if not definition:
# Fallback: use SHOW CREATE VIEW and extract the SELECT part
try:
self._mysql_cur.execute(f"SHOW CREATE VIEW `{view_name}`")
res = self._mysql_cur.fetchone()
if res and len(res) >= 2:
create_stmt = res[1]
if isinstance(create_stmt, bytes):
try:
create_stmt_str = create_stmt.decode()
except UnicodeDecodeError:
create_stmt_str = str(create_stmt)
else:
create_stmt_str = t.cast(str, create_stmt)
# Extract the SELECT ... part after AS
m = re.search(r"\\bAS\\b\\s*(.*)$", create_stmt_str, re.IGNORECASE | re.DOTALL)
if m:
definition = m.group(1).strip().rstrip(";")
else:
# As a last resort, try to use the full statement replacing the prefix
# Not ideal, but better than failing outright
idx = create_stmt_str.upper().find(" AS ")
if idx != -1:
definition = create_stmt_str[idx + 4 :].strip().rstrip(";")
except mysql.connector.Error:
pass

if not definition:
raise sqlite3.Error(f"Unable to fetch definition for MySQL view '{view_name}'")

return self._mysql_viewdef_to_sqlite(
view_name=view_name,
view_select_sql=definition,
schema_name=self._mysql_database,
)

def _create_view(self, view_name: str, attempting_reconnect: bool = False) -> None:
try:
if attempting_reconnect:
self._mysql.reconnect()
sql = self._build_create_view_sql(view_name)
self._sqlite_cur.execute(sql)
self._sqlite.commit()
except mysql.connector.Error as err:
if err.errno == errorcode.CR_SERVER_LOST:
if not attempting_reconnect:
self._logger.warning("Connection to MySQL server lost.\nAttempting to reconnect.")
self._create_view(view_name, True)
else:
self._logger.warning("Connection to MySQL server lost.\nReconnection attempt aborted.")
raise
self._logger.error(
"MySQL failed reading view definition from view %s: %s",
view_name,
err,
)
raise
except sqlite3.Error as err:
self._logger.error("SQLite failed creating view %s: %s", view_name, err)
raise

def _transfer_table_data(
self, table_name: str, sql: str, total_records: int = 0, attempting_reconnect: bool = False
) -> None:
Expand Down Expand Up @@ -720,7 +845,7 @@ def transfer(self) -> None:

self._mysql_cur_prepared.execute(
"""
SELECT TABLE_NAME
SELECT TABLE_NAME, TABLE_TYPE
FROM information_schema.TABLES
WHERE TABLE_SCHEMA = SCHEMA()
AND TABLE_NAME {exclude} IN ({placeholders})
Expand All @@ -730,25 +855,49 @@ def transfer(self) -> None:
),
specific_tables,
)
tables: t.Iterable[RowItemType] = (row[0] for row in self._mysql_cur_prepared.fetchall())
tables: t.Iterable[t.Tuple[str, str]] = (
(
str(row[0].decode() if isinstance(row[0], (bytes, bytearray)) else row[0]),
str(row[1].decode() if isinstance(row[1], (bytes, bytearray)) else row[1]),
)
for row in self._mysql_cur_prepared.fetchall()
)
else:
# transfer all tables
self._mysql_cur.execute(
"""
SELECT TABLE_NAME
SELECT TABLE_NAME, TABLE_TYPE
FROM information_schema.TABLES
WHERE TABLE_SCHEMA = SCHEMA()
"""
)
tables = (row[0].decode() for row in self._mysql_cur.fetchall()) # type: ignore[union-attr]

def _coerce_row(row: t.Any) -> t.Tuple[str, str]:
try:
# Row like (name, type)
name = row[0].decode() if isinstance(row[0], (bytes, bytearray)) else row[0]
ttype = (
row[1].decode()
if (isinstance(row, (list, tuple)) and len(row) > 1 and isinstance(row[1], (bytes, bytearray)))
else (row[1] if (isinstance(row, (list, tuple)) and len(row) > 1) else "BASE TABLE")
)
return str(name), str(ttype)
except (TypeError, IndexError, UnicodeDecodeError):
# Fallback: treat as a single value name when row is not a 2-tuple or decoding fails
name = row.decode() if isinstance(row, (bytes, bytearray)) else str(row)
return name, "BASE TABLE"

tables = (_coerce_row(row) for row in self._mysql_cur.fetchall())

try:
# turn off foreign key checking in SQLite while transferring data
self._sqlite_cur.execute("PRAGMA foreign_keys=OFF")

for table_name in tables:
for table_name, table_type in tables:
if isinstance(table_name, bytes):
table_name = table_name.decode()
if isinstance(table_type, bytes):
table_type = table_type.decode()

self._logger.info(
"%s%sTransferring table %s",
Expand All @@ -761,10 +910,13 @@ def transfer(self) -> None:
self._current_chunk_number = 0

if not self._without_tables:
# create the table
self._create_table(table_name) # type: ignore[arg-type]
# create the table or view
if table_type == "VIEW" and self._views_as_views:
self._create_view(table_name) # type: ignore[arg-type]
else:
self._create_table(table_name) # type: ignore[arg-type]

if not self._without_data:
if not self._without_data and not (table_type == "VIEW" and self._views_as_views):
# get the size of the data
if self._limit_rows > 0:
# limit to the requested number of rows
Expand Down
2 changes: 2 additions & 0 deletions src/mysql_to_sqlite3/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class MySQLtoSQLiteParams(TypedDict):
without_tables: t.Optional[bool]
without_data: t.Optional[bool]
without_foreign_keys: t.Optional[bool]
views_as_views: t.Optional[bool]


class MySQLtoSQLiteAttributes:
Expand Down Expand Up @@ -81,6 +82,7 @@ class MySQLtoSQLiteAttributes:
_vacuum: bool
_without_data: bool
_without_foreign_keys: bool
_views_as_views: bool
# Tracking of SQLite index names and counters to ensure uniqueness when prefixing is disabled
_seen_sqlite_index_names: t.Set[str]
_sqlite_index_name_counters: t.Dict[str, int]
68 changes: 68 additions & 0 deletions tests/unit/test_cli_views_flag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import typing as t

import pytest
from click.testing import CliRunner

from mysql_to_sqlite3.cli import cli as mysql2sqlite


class TestCLIViewsFlag:
def test_mysql_views_as_tables_flag_is_threaded(self, monkeypatch: pytest.MonkeyPatch) -> None:
"""Ensure --mysql-views-as-tables reaches MySQLtoSQLite as views_as_views=False (legacy materialization)."""
received_kwargs: t.Dict[str, t.Any] = {}

class FakeConverter:
def __init__(self, **kwargs: t.Any) -> None:
received_kwargs.update(kwargs)

def transfer(self) -> None: # pragma: no cover - nothing to do
return None

# Patch the converter used by the CLI
monkeypatch.setattr("mysql_to_sqlite3.cli.MySQLtoSQLite", FakeConverter)

runner = CliRunner()
result = runner.invoke(
mysql2sqlite,
[
"-f",
"out.sqlite3",
"-d",
"db",
"-u",
"user",
"--mysql-views-as-tables",
],
)
assert result.exit_code == 0
assert received_kwargs.get("views_as_views") is False

def test_mysql_views_as_tables_short_flag_is_threaded(self, monkeypatch: pytest.MonkeyPatch) -> None:
"""Ensure -T (short for --mysql-views-as-tables) reaches MySQLtoSQLite as views_as_views=False."""
received_kwargs: t.Dict[str, t.Any] = {}

class FakeConverter:
def __init__(self, **kwargs: t.Any) -> None:
received_kwargs.update(kwargs)

def transfer(self) -> None: # pragma: no cover - nothing to do
return None

# Patch the converter used by the CLI
monkeypatch.setattr("mysql_to_sqlite3.cli.MySQLtoSQLite", FakeConverter)

runner = CliRunner()
result = runner.invoke(
mysql2sqlite,
[
"-f",
"out.sqlite3",
"-d",
"db",
"-u",
"user",
"-T",
],
)
assert result.exit_code == 0
assert received_kwargs.get("views_as_views") is False
28 changes: 28 additions & 0 deletions tests/unit/test_transporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,34 @@


class TestMySQLtoSQLiteTransporter:
def test_transfer_creates_view_when_flag_enabled(self) -> None:
"""When views_as_views is True, encountering a MySQL VIEW should create a SQLite VIEW and skip data transfer."""
with patch.object(MySQLtoSQLite, "__init__", return_value=None):
instance = MySQLtoSQLite()
# Configure minimal attributes used by transfer()
instance._mysql_tables = []
instance._exclude_mysql_tables = []
instance._mysql_cur = MagicMock()
# All-tables branch returns one VIEW
instance._mysql_cur.fetchall.return_value = [(b"my_view", b"VIEW")]
instance._sqlite_cur = MagicMock()
instance._without_data = False
instance._without_tables = False
instance._views_as_views = True
instance._vacuum = False
instance._logger = MagicMock()

# Spy on methods to ensure correct calls
instance._create_view = MagicMock()
instance._create_table = MagicMock()
instance._transfer_table_data = MagicMock()

instance.transfer()

instance._create_view.assert_called_once_with("my_view")
instance._create_table.assert_not_called()
instance._transfer_table_data.assert_not_called()

def test_decode_column_type_with_string(self) -> None:
"""Test _decode_column_type with string input."""
assert MySQLtoSQLite._decode_column_type("VARCHAR") == "VARCHAR"
Expand Down
Loading