Skip to content

Commit 096f139

Browse files
committed
drop data objects of different types
1 parent 325bf76 commit 096f139

File tree

5 files changed

+118
-6
lines changed

5 files changed

+118
-6
lines changed

sqlmesh/core/engine_adapter/base.py

Lines changed: 62 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
CommentCreationTable,
3333
CommentCreationView,
3434
DataObject,
35+
DataObjectType,
3536
EngineRunMode,
3637
InsertOverwriteStrategy,
3738
SourceQuery,
@@ -369,6 +370,9 @@ def replace_query(
369370
kwargs: Optional create table properties.
370371
"""
371372
target_table = exp.to_table(table_name)
373+
374+
table_exists = self._drop_data_object_on_type_mismatch(target_table, DataObjectType.TABLE)
375+
372376
source_queries, columns_to_types = self._get_source_queries_and_columns_to_types(
373377
query_or_df, columns_to_types, target_table=target_table
374378
)
@@ -390,7 +394,7 @@ def replace_query(
390394
)
391395
# All engines support `CREATE TABLE AS` so we use that if the table doesn't already exist and we
392396
# use `CREATE OR REPLACE TABLE AS` if the engine supports it
393-
if self.SUPPORTS_REPLACE_TABLE or not self.table_exists(target_table):
397+
if self.SUPPORTS_REPLACE_TABLE or not table_exists:
394398
return self._create_table_from_source_queries(
395399
target_table,
396400
source_queries,
@@ -930,6 +934,28 @@ def clone_table(
930934
)
931935
)
932936

937+
def drop_data_object(self, data_object: DataObject, ignore_if_not_exists: bool = True) -> None:
938+
"""Drops a data object of arbitrary type.
939+
940+
Args:
941+
data_object: The data object to drop.
942+
ignore_if_not_exists: If True, no error will be raised if the data object does not exist.
943+
"""
944+
if data_object.type.is_view:
945+
self.drop_view(data_object.to_table(), ignore_if_not_exists=ignore_if_not_exists)
946+
elif data_object.type.is_materialized_view:
947+
self.drop_view(
948+
data_object.to_table(), ignore_if_not_exists=ignore_if_not_exists, materialized=True
949+
)
950+
elif data_object.type.is_table:
951+
self.drop_table(data_object.to_table(), exists=ignore_if_not_exists)
952+
elif data_object.type.is_managed_table:
953+
self.drop_managed_table(data_object.to_table(), exists=ignore_if_not_exists)
954+
else:
955+
raise SQLMeshError(
956+
f"Can't drop data object '{data_object.to_table().sql(dialect=self.dialect)}' of type '{data_object.type.value}'"
957+
)
958+
933959
def drop_table(self, table_name: TableName, exists: bool = True) -> None:
934960
"""Drops a table.
935961
@@ -1118,6 +1144,12 @@ def create_view(
11181144
if properties.expressions:
11191145
create_kwargs["properties"] = properties
11201146

1147+
if replace:
1148+
self._drop_data_object_on_type_mismatch(
1149+
view_name,
1150+
DataObjectType.VIEW if not materialized else DataObjectType.MATERIALIZED_VIEW,
1151+
)
1152+
11211153
with source_queries[0] as query:
11221154
self.execute(
11231155
exp.Create(
@@ -2483,6 +2515,35 @@ def _truncate_table(self, table_name: TableName) -> None:
24832515
table = exp.to_table(table_name)
24842516
self.execute(f"TRUNCATE TABLE {table.sql(dialect=self.dialect, identify=True)}")
24852517

2518+
def _drop_data_object_on_type_mismatch(
2519+
self, target_name: TableName, expected_type: DataObjectType
2520+
) -> bool:
2521+
"""Drops a data object if it exists and is not of the expected type.
2522+
2523+
Args:
2524+
target_name: The name of the data object to check.
2525+
expected_type: The expected type of the data object.
2526+
2527+
Returns:
2528+
True if the data object exists and is of the expected type, False otherwise.
2529+
"""
2530+
target_table = exp.to_table(target_name)
2531+
existing_data_objects = self.get_data_objects(
2532+
schema_(target_table.db, target_table.catalog), {target_table.name}
2533+
)
2534+
if existing_data_objects:
2535+
if existing_data_objects[0].type == expected_type:
2536+
return True
2537+
2538+
logger.warning(
2539+
"Target data object '%s' is a %s and not a %s, dropping it",
2540+
target_table.sql(dialect=self.dialect),
2541+
existing_data_objects[0].type.value,
2542+
expected_type.value,
2543+
)
2544+
self.drop_data_object(existing_data_objects[0])
2545+
return False
2546+
24862547
def _replace_by_key(
24872548
self,
24882549
target_table: TableName,

sqlmesh/core/engine_adapter/redshift.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -262,7 +262,9 @@ def replace_query(
262262
"""
263263
import pandas as pd
264264

265-
if not isinstance(query_or_df, pd.DataFrame) or not self.table_exists(table_name):
265+
table_exists = self._drop_data_object_on_type_mismatch(table_name, DataObjectType.TABLE)
266+
267+
if not isinstance(query_or_df, pd.DataFrame) or not table_exists:
266268
return super().replace_query(
267269
table_name,
268270
query_or_df,

sqlmesh/core/engine_adapter/shared.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,9 @@ class DataObject(PydanticModel):
171171
def is_clustered(self) -> bool:
172172
return bool(self.clustering_key)
173173

174+
def to_table(self) -> exp.Table:
175+
return exp.table_(self.name, db=self.schema_name, catalog=self.catalog, quoted=True)
176+
174177

175178
class CatalogSupport(Enum):
176179
# The engine has no concept of catalogs

sqlmesh/core/plan/builder.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -896,15 +896,15 @@ def _modified_and_added_snapshots(self) -> t.List[Snapshot]:
896896

897897
def _should_force_rebuild(old: Snapshot, new: Snapshot) -> bool:
898898
if old.virtual_environment_mode != new.virtual_environment_mode:
899-
# If the virtual environment mode has changed, then it's a breaking change
899+
# If the virtual environment mode has changed, then we need to rebuild
900900
return True
901901
if old.model.kind.name == new.model.kind.name:
902-
# If the kind hasn't changed, then it's not a breaking change
902+
# If the kind hasn't changed, then we don't need to rebuild
903903
return False
904904
if not old.is_incremental or not new.is_incremental:
905-
# If either is not incremental, then it's a breaking change
905+
# If either is not incremental, then we need to rebuild
906906
return True
907907
if old.model.partitioned_by == new.model.partitioned_by:
908-
# If the partitioning hasn't changed, then it's not a breaking change
908+
# If the partitioning hasn't changed, then we don't need to rebuild
909909
return False
910910
return True

tests/core/test_integration.py

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2525,6 +2525,52 @@ def test_virtual_environment_mode_dev_only(init_and_plan_context: t.Callable):
25252525
assert "one" not in context.engine_adapter.columns("sushi.waiter_revenue_by_day")
25262526

25272527

2528+
@time_machine.travel("2023-01-08 15:00:00 UTC")
2529+
def test_virtual_environment_mode_dev_only_model_kind_change(init_and_plan_context: t.Callable):
2530+
context, plan = init_and_plan_context(
2531+
"examples/sushi", config="test_config_virtual_environment_mode_dev_only"
2532+
)
2533+
context.apply(plan)
2534+
2535+
# Change to full kind
2536+
model = context.get_model("sushi.top_waiters")
2537+
model = model.copy(update={"kind": FullKind()})
2538+
context.upsert_model(model)
2539+
prod_plan = context.plan_builder("prod", skip_tests=True).build()
2540+
assert prod_plan.missing_intervals
2541+
assert prod_plan.requires_backfill
2542+
context.apply(prod_plan)
2543+
data_objects = context.engine_adapter.get_data_objects("sushi", {"top_waiters"})
2544+
assert len(data_objects) == 1
2545+
assert data_objects[0].type == "table"
2546+
2547+
context.state_sync.clear_cache()
2548+
2549+
# Change back to view
2550+
model = context.get_model("sushi.top_waiters")
2551+
model = model.copy(update={"kind": ViewKind()})
2552+
context.upsert_model(model)
2553+
prod_plan = context.plan_builder("prod", skip_tests=True).build()
2554+
assert prod_plan.requires_backfill
2555+
assert prod_plan.missing_intervals
2556+
context.apply(prod_plan)
2557+
data_objects = context.engine_adapter.get_data_objects("sushi", {"top_waiters"})
2558+
assert len(data_objects) == 1
2559+
assert data_objects[0].type == "view"
2560+
2561+
# Change to incremental
2562+
model = context.get_model("sushi.top_waiters")
2563+
model = model.copy(update={"kind": IncrementalUnmanagedKind()})
2564+
context.upsert_model(model)
2565+
prod_plan = context.plan_builder("prod", skip_tests=True).build()
2566+
assert prod_plan.requires_backfill
2567+
assert prod_plan.missing_intervals
2568+
context.apply(prod_plan)
2569+
data_objects = context.engine_adapter.get_data_objects("sushi", {"top_waiters"})
2570+
assert len(data_objects) == 1
2571+
assert data_objects[0].type == "table"
2572+
2573+
25282574
@time_machine.travel("2023-01-08 15:00:00 UTC")
25292575
def test_restatement_plan_ignores_changes(init_and_plan_context: t.Callable):
25302576
context, plan = init_and_plan_context("examples/sushi")

0 commit comments

Comments
 (0)