diff --git a/README.md b/README.md index 3db31d8..75a1c46 100644 --- a/README.md +++ b/README.md @@ -109,6 +109,11 @@ Here are some commands: ``` Run all needed migrations on the db. Fails if a migration fails, or if there is no managed db at the path. This is equivalent to calling `fastmigrate.run_migrations()` +5. **Enroll an existing db**: + ``` + fastmigrate_enroll_db --db path/to/data.db + ``` + Enroll an existing SQLite database for versioning, adding a default initial migration called `0001-initial.sql`, then running it. Running the initial migration will set the version to 1. This is equivalent to calling `fastmigrate.enroll_db()` ### How to enroll an existing, unversioned database into fastmigrate diff --git a/fastmigrate/cli.py b/fastmigrate/cli.py index 0d17ab4..9756459 100644 --- a/fastmigrate/cli.py +++ b/fastmigrate/cli.py @@ -128,6 +128,28 @@ def create_db( print(f"Unexpected error: {e}") sys.exit(1) +@call_parse +def enroll_db( + db: Path = DEFAULT_DB, # Path to the SQLite database file + migrations: Path = DEFAULT_MIGRATIONS, # Path to the migrations directory + config_path: Path = DEFAULT_CONFIG # Path to config file +) -> None: + """Enroll an existing SQLite database for versioning, adding a default initial migration, then running it. + + Note: command line arguments take precedence over values from a + config file, unless they are equal to default values. + """ + db_path, migrations_path = _get_config(config_path, db, migrations) + if not migrations_path.exists(): migrations_path.mkdir(parents=True) + initial_migration = migrations_path / "0001-initial.sql" + schema = core.get_db_schema(db_path) + initial_migration.write_text(schema) + core._ensure_meta_table(db_path) + success = core.run_migrations(db_path, migrations_path, verbose=True) + if not success: + sys.exit(1) + + @call_parse def run_migrations( db: Path = DEFAULT_DB, # Path to the SQLite database file diff --git a/fastmigrate/core.py b/fastmigrate/core.py index 8f90a12..db278e9 100644 --- a/fastmigrate/core.py +++ b/fastmigrate/core.py @@ -10,6 +10,9 @@ from datetime import datetime from pathlib import Path from typing import Dict, Optional +from io import StringIO +from apsw import Connection +from apsw.shell import Shell from rich.console import Console @@ -98,6 +101,7 @@ def _ensure_meta_table(db_path: Path) -> None: """ ) conn.execute("INSERT INTO _meta (id, version) VALUES (1, 0)") + print('Database is enrolled') except sqlite3.Error as e: raise sqlite3.Error(f"Failed to create _meta table: {e}") except sqlite3.Error as e: @@ -199,7 +203,6 @@ def get_migration_scripts(migrations_dir: Path) -> Dict[int, Path]: """ migrations_dir = Path(migrations_dir) migration_scripts: Dict[int, Path] = {} - if not migrations_dir.exists(): return migration_scripts @@ -212,7 +215,6 @@ def get_migration_scripts(migrations_dir: Path) -> Dict[int, Path]: f"{migration_scripts[version]} and {file_path}" ) migration_scripts[version] = file_path - return migration_scripts @@ -507,3 +509,30 @@ def run_migrations( except Exception as e: console.print(f"[bold red]Error:[/bold red] {e}") return False + +def get_db_schema(db_path: Path) -> str: + """Get the SQL schema of a SQLite database file. + + This function retrieves the CREATE statements for all tables, + indices, triggers, and views in the database. + + Args: + db_path: Path to the SQLite database file + + Returns: + str: The SQL schema as a string + + Raises: + FileNotFoundError: If database file doesn't exist + sqlite3.Error: If unable to access the database + """ + + if not Path(db_path).exists(): raise FileNotFoundError(f"Database does not exist: {db_path}") + conn = Connection(str(db_path)) + out = StringIO() + shell = Shell(stdout=out, db=conn) + shell.process_command('.schema') + sql = out.getvalue() + if 'CREATE TABLE' in sql: sql = sql.replace("CREATE TABLE", "CREATE TABLE IF NOT EXISTS") + if 'CREATE INDEX' in sql: sql = sql.replace("CREATE INDEX", "CREATE INDEX IF NOT EXISTS") + return sql \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index df39064..87cc88d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -25,6 +25,7 @@ dev = [ fastmigrate_backup_db = "fastmigrate.cli:backup_db" fastmigrate_check_version = "fastmigrate.cli:check_version" fastmigrate_create_db = "fastmigrate.cli:create_db" +fastmigrate_enroll_db = "fastmigrate.cli:enroll_db" fastmigrate_run_migrations = "fastmigrate.cli:run_migrations" [tool.pytest] diff --git a/tests/test_cli.py b/tests/test_cli.py index 0cdd004..92f77bb 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -10,20 +10,29 @@ import subprocess from fastmigrate.cli import backup_db, check_version, create_db, run_migrations -from fastmigrate.core import _ensure_meta_table, _set_db_version +from fastmigrate.core import _set_db_version, _ensure_meta_table # Path to the test migrations directory CLI_MIGRATIONS_DIR = Path(__file__).parent / "test_cli" def test_cli_help_backup_db(): - """Test the CLI help output for .""" + """Test the CLI help output for backup_db.""" # Capture standard output result = subprocess.run(['fastmigrate_backup_db', '--help'], capture_output=True, text=True) assert result.returncode == 0 assert "usage: fastmigrate_backup_db [-h] [--db DB]" in result.stdout + +def test_cli_help_enroll_db(): + """Test the CLI help output for enroll_db.""" + # Capture standard output + result = subprocess.run(['fastmigrate_enroll_db', '--help'], + capture_output=True, text=True) + assert result.returncode == 0 + assert "usage: fastmigrate_enroll_db [-h] [--db DB]" in result.stdout + def test_cli_explicit_paths(): """Test CLI with explicit path arguments.""" with tempfile.TemporaryDirectory() as temp_dir: @@ -249,122 +258,261 @@ def test_cli_precedence(): assert cursor.fetchone() is None, "CLI DB should not have config_table" conn_cli.close() - +def test_cli_enroll_db_success(tmp_path): + """Test the CLI enroll_db command successfully enrolls an unversioned database.""" + db_path = tmp_path / "unversioned.db" + migrations_path = tmp_path / "migrations" + + # Create an unversioned database with a sample table + conn = sqlite3.connect(db_path) + conn.execute("CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT)") + conn.execute("INSERT INTO users (name) VALUES ('test_user')") + conn.commit() + conn.close() + + # Verify _meta table doesn't exist yet + conn = sqlite3.connect(db_path) + cursor = conn.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='_meta'") + assert cursor.fetchone() is None + conn.close() + + # Run the enroll_db command + result = subprocess.run([ + "fastmigrate_enroll_db", + "--db", db_path, + "--migrations", migrations_path, + ], capture_output=True, text=True) + + assert result.returncode == 0 + + # Verify the database has been enrolled (_meta table created) + conn = sqlite3.connect(db_path) + cursor = conn.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='_meta'") + assert cursor.fetchone() is not None + + # Original data should still be intact + cursor = conn.execute("SELECT name FROM users WHERE name='test_user'") + assert cursor.fetchone() is not None + + # Version should be 0 + cursor = conn.execute("SELECT version FROM _meta WHERE id = 1") + assert cursor.fetchone()[0] == 1 + + conn.close() -def test_cli_createdb_flag(): - """Test the --create_db flag properly initializes a database with _meta table.""" - with tempfile.TemporaryDirectory() as temp_dir: - temp_dir_path = Path(temp_dir) - db_path = temp_dir_path / "new_db.db" - - # Verify the database doesn't exist yet - assert not db_path.exists() - - # Run the CLI with just the --create_db flag - result = subprocess.run([ - "fastmigrate_create_db", - "--db", db_path, - ]) - - assert result.returncode == 0 - - # Verify database was created - assert db_path.exists() - - # Verify the _meta table exists with version 0 - conn = sqlite3.connect(db_path) - cursor = conn.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='_meta'") - assert cursor.fetchone() is not None - - cursor = conn.execute("SELECT version FROM _meta WHERE id = 1") - assert cursor.fetchone()[0] == 0 + +def test_cli_enroll_db_already_versioned(tmp_path): + """Test the CLI enroll_db command fails when the database is already versioned.""" + db_path = tmp_path / "versioned.db" + migrations_path = tmp_path / "migrations" + + # Create a versioned database + conn = sqlite3.connect(db_path) + conn.execute(""" + CREATE TABLE _meta ( + id INTEGER PRIMARY KEY CHECK (id = 1), + version INTEGER NOT NULL DEFAULT 0 + ) + """) + conn.execute("INSERT INTO _meta (id, version) VALUES (1, 42)") + conn.commit() + conn.close() + + # Run the enroll_db command on an already versioned database + result = subprocess.run([ + "fastmigrate_enroll_db", + "--db", db_path, + "--migrations", migrations_path, + ], capture_output=True, text=True) + + # Should exit with zero status because the database is successfully versioned + assert result.returncode == 0 + + # Verify the database version wasn't changed + conn = sqlite3.connect(db_path) + cursor = conn.execute("SELECT version FROM _meta WHERE id = 1") + assert cursor.fetchone()[0] == 42 + conn.close() + + +def test_cli_enroll_db_nonexistent_db(tmp_path): + """Test the CLI enroll_db command fails when the database doesn't exist.""" + db_path = tmp_path / "nonexistent.db" + migrations_path = tmp_path / "migrations" + + # Verify file doesn't exist + assert not db_path.exists() + + # Run the enroll_db command on a non-existent database + result = subprocess.run([ + "fastmigrate_enroll_db", + "--db", db_path, + "--migrations", migrations_path, + ], capture_output=True, text=True) + + # Should exit with non-zero status + assert result.returncode == 1 + assert "does not exist" in result.stdout or "does not exist" in result.stderr + + +def test_cli_enroll_db_invalid_db(tmp_path): + """Test the CLI enroll_db command fails when the database is invalid.""" + db_path = tmp_path / "invalid.db" + migrations_path = tmp_path / "migrations" + + # Create an invalid database file + with open(db_path, 'wb') as f: + f.write(b'This is not a valid SQLite database') + + # Run the enroll_db command on an invalid database + result = subprocess.run([ + "fastmigrate_enroll_db", + "--db", db_path, + "--migrations", migrations_path + ], capture_output=True, text=True) + + # Should exit with non-zero status + assert result.returncode == 1 + + +def test_cli_enroll_db_with_config_file(tmp_path): + """Test the CLI enroll_db command with configuration from a file.""" + db_path = tmp_path / "db_from_config.db" + migrations_path = tmp_path / "migrations" + config_path = tmp_path / "test_config.ini" + + # Create an unversioned database + conn = sqlite3.connect(db_path) + conn.execute("CREATE TABLE test (id INTEGER PRIMARY KEY)") + conn.commit() + conn.close() + + # Create a config file + config_path.write_text(f"[paths]\ndb = {db_path}\nmigrations = {migrations_path}") + + # Run the enroll_db command with config + result = subprocess.run([ + "fastmigrate_enroll_db", + "--config", config_path, + ], capture_output=True, text=True) + + assert result.returncode == 0 + + # Verify the database has been enrolled and increased in version + conn = sqlite3.connect(db_path) + cursor = conn.execute("SELECT version FROM _meta WHERE id = 1") + assert cursor.fetchone()[0] == 1 + conn.close() - conn.close() + +def test_cli_createdb_flag(tmp_path): + """Test the --create_db flag properly initializes a database with _meta table.""" + db_path = tmp_path / "new_db.db" + + # Verify the database doesn't exist yet + assert not db_path.exists() + + # Run the CLI with just the --create_db flag + result = subprocess.run([ + "fastmigrate_create_db", + "--db", db_path, + ]) + + assert result.returncode == 0 + + # Verify database was created + assert db_path.exists() + + # Verify the _meta table exists with version 0 + conn = sqlite3.connect(db_path) + cursor = conn.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='_meta'") + assert cursor.fetchone() is not None + + cursor = conn.execute("SELECT version FROM _meta WHERE id = 1") + assert cursor.fetchone()[0] == 0 + + conn.close() -def test_check_db_version_option(): +def test_check_db_version_option(tmp_path): """Test the --check_db_version option correctly reports the database version.""" - with tempfile.TemporaryDirectory() as temp_dir: - temp_dir_path = Path(temp_dir) - db_path = temp_dir_path / "test.db" - - # Create database file with version 42 - conn = sqlite3.connect(db_path) - conn.close() - _ensure_meta_table(db_path) - _set_db_version(db_path, 42) - - # Test with versioned database - result = subprocess.run([ - "fastmigrate_check_version", - "--db", db_path - ], capture_output=True, text=True) - - assert result.returncode == 0 - assert "Database version: 42" in result.stdout - - # Create unversioned database - unversioned_db = temp_dir_path / "unversioned.db" - conn = sqlite3.connect(unversioned_db) - conn.close() - - # Test with unversioned database - result = subprocess.run([ - "fastmigrate_check_version", - "--db", unversioned_db, - ], capture_output=True, text=True) - - assert result.returncode == 0 - assert "unversioned" in result.stdout.lower() - - # Test with non-existent database - nonexistent_db = temp_dir_path / "nonexistent.db" - result = subprocess.run([ - "fastmigrate_check_version", - "--db", nonexistent_db, - ], capture_output=True, text=True) - - assert result.returncode == 1 - assert "does not exist" in result.stdout + db_path = tmp_path / "test.db" + + # Create database file with version 42 + conn = sqlite3.connect(db_path) + conn.close() + _ensure_meta_table(db_path) + _set_db_version(db_path, 42) + + # Test with versioned database + result = subprocess.run([ + "fastmigrate_check_version", + "--db", db_path + ], capture_output=True, text=True) + + assert result.returncode == 0 + assert "Database version: 42" in result.stdout + + # Create unversioned database + unversioned_db = tmp_path / "unversioned.db" + conn = sqlite3.connect(unversioned_db) + conn.close() + + # Test with unversioned database + result = subprocess.run([ + "fastmigrate_check_version", + "--db", unversioned_db, + ], capture_output=True, text=True) + + assert result.returncode == 0 + assert "unversioned" in result.stdout.lower() + + # Test with non-existent database + nonexistent_db = tmp_path / "nonexistent.db" + result = subprocess.run([ + "fastmigrate_check_version", + "--db", nonexistent_db, + ], capture_output=True, text=True) + + assert result.returncode == 1 + assert "does not exist" in result.stdout -def test_cli_with_testsuite_a(): +def test_cli_with_testsuite_a(tmp_path): """Test CLI using testsuite_a.""" - with tempfile.TemporaryDirectory() as temp_dir: - temp_dir_path = Path(temp_dir) - db_path = temp_dir_path / "test.db" - - # Create empty database file - conn = sqlite3.connect(db_path) - conn.close() - - # Initialize the database with _meta table - _ensure_meta_table(db_path) - - # Run the CLI with explicit paths to the test suite - result = subprocess.run([ - "fastmigrate_run_migrations", - "--db", db_path, - "--migrations", CLI_MIGRATIONS_DIR / "migrations" - ], capture_output=True, text=True) - - assert result.returncode == 0 - - # Verify migrations applied - conn = sqlite3.connect(db_path) - - # Version should be 4 (all migrations applied) - cursor = conn.execute("SELECT version FROM _meta") - assert cursor.fetchone()[0] == 4 - - # Verify tables exist - tables = ["users", "posts", "tags", "post_tags"] - for table in tables: - cursor = conn.execute( - f"SELECT name FROM sqlite_master WHERE type='table' AND name='{table}'" - ) - assert cursor.fetchone() is not None - - conn.close() + db_path = tmp_path / "test.db" + + # Create empty database file + conn = sqlite3.connect(db_path) + conn.close() + + # Initialize the database with _meta table + _ensure_meta_table(db_path) + + # Run the CLI with explicit paths to the test suite + result = subprocess.run([ + "fastmigrate_run_migrations", + "--db", db_path, + "--migrations", CLI_MIGRATIONS_DIR / "migrations" + ], capture_output=True, text=True) + + assert result.returncode == 0 + + # Verify migrations applied + conn = sqlite3.connect(db_path) + + # Version should be 4 (all migrations applied) + cursor = conn.execute("SELECT version FROM _meta") + assert cursor.fetchone()[0] == 4 + + # Verify tables exist + tables = ["users", "posts", "tags", "post_tags"] + for table in tables: + cursor = conn.execute( + f"SELECT name FROM sqlite_master WHERE type='table' AND name='{table}'" + ) + assert cursor.fetchone() is not None + + conn.close() \ No newline at end of file