From ebacc568c955774d56c2eb4721156d28cf73aa5c Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Fri, 25 Jul 2025 18:16:14 -0700 Subject: [PATCH 1/5] create ingestion batches --- .github/workflows/metadata-ingestion.yml | 13 +++++++--- metadata-ingestion/build.gradle | 26 +++++++------------ metadata-ingestion/setup.cfg | 2 ++ .../datahub/ingestion/source/sql/vertica.py | 3 +++ metadata-ingestion/tests/conftest.py | 3 ++- .../integration/cassandra/test_cassandra.py | 2 ++ .../tests/integration/dremio/test_dremio.py | 2 ++ .../tests/integration/hana/test_hana.py | 2 +- .../tests/integration/kafka/test_kafka.py | 2 ++ .../powerbi/test_admin_only_api.py | 2 +- .../integration/powerbi/test_m_parser.py | 2 +- .../tests/integration/powerbi/test_powerbi.py | 2 +- 12 files changed, 35 insertions(+), 26 deletions(-) diff --git a/.github/workflows/metadata-ingestion.yml b/.github/workflows/metadata-ingestion.yml index fa2982a41734a4..892809a6995808 100644 --- a/.github/workflows/metadata-ingestion.yml +++ b/.github/workflows/metadata-ingestion.yml @@ -38,14 +38,19 @@ jobs: # DATAHUB_LOOKML_GIT_TEST_SSH_KEY: ${{ secrets.DATAHUB_LOOKML_GIT_TEST_SSH_KEY }} strategy: matrix: - python-version: ["3.9", "3.11"] - command: - [ - "testQuick", + python-version: ["3.11"] + command: [ + "testQuick", # also runs lint "testIntegrationBatch0", "testIntegrationBatch1", "testIntegrationBatch2", + "testIntegrationBatch3", + "testIntegrationBatch4", ] + include: + # Version compatibility tests. + - python-version: "3.9" + command: "testQuick" fail-fast: false steps: - name: Free up disk space diff --git a/metadata-ingestion/build.gradle b/metadata-ingestion/build.gradle index baab804a0dc05c..afec1afbc6765f 100644 --- a/metadata-ingestion/build.gradle +++ b/metadata-ingestion/build.gradle @@ -191,23 +191,15 @@ task testSingle(dependsOn: [installDevTest]) { } } -task testIntegrationBatch0(type: Exec, dependsOn: [installDevTest]) { - def cvg_arg = get_coverage_args("intBatch0") - commandLine 'bash', '-c', - venv_activate_command + - "${pytest_default_env} pytest ${cvg_arg} ${pytest_default_args} -m 'integration_batch_0' --junit-xml=junit.integrationbatch0.xml" -} -task testIntegrationBatch1(type: Exec, dependsOn: [installDevTest]) { - def cvg_arg = get_coverage_args("intBatch1") - commandLine 'bash', '-c', - venv_activate_command + - "${pytest_default_env} pytest ${cvg_arg} ${pytest_default_args} -m 'integration_batch_1' --junit-xml=junit.integrationbatch1.xml" -} -task testIntegrationBatch2(type: Exec, dependsOn: [installDevTest]) { - def cvg_arg = get_coverage_args("intBatch2") - commandLine 'bash', '-c', - venv_activate_command + - "${pytest_default_env} pytest ${cvg_arg} ${pytest_default_args} -m 'integration_batch_2' --junit-xml=junit.integrationbatch2.xml" +// Create testIntegrationBatch0 through testIntegrationBatch4 tasks +(0..4).each { batchNum -> + tasks.register("testIntegrationBatch${batchNum}", Exec) { + dependsOn installDevTest + def cvg_arg = get_coverage_args("intBatch${batchNum}") + commandLine 'bash', '-c', + venv_activate_command + + "${pytest_default_env} pytest ${cvg_arg} ${pytest_default_args} -m 'integration_batch_${batchNum}' --junit-xml=junit.integrationbatch${batchNum}.xml" + } } task testFull(type: Exec, dependsOn: [installDevTest]) { diff --git a/metadata-ingestion/setup.cfg b/metadata-ingestion/setup.cfg index 4cc8b1c79eb3f6..5d7a7dab4ea5d4 100644 --- a/metadata-ingestion/setup.cfg +++ b/metadata-ingestion/setup.cfg @@ -63,6 +63,8 @@ markers = integration_batch_0: mark tests to run in batch 0 of integration tests. This is done mainly for parallelization in CI. Batch 0 is the default batch. integration_batch_1: mark tests to run in batch 1 of integration tests integration_batch_2: mark tests to run in batch 2 of integration tests + integration_batch_3: mark tests to run in batch 3 of integration tests (mostly powerbi) + integration_batch_4: mark tests to run in batch 4 of integration tests testpaths = tests/unit tests/integration diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/vertica.py b/metadata-ingestion/src/datahub/ingestion/source/sql/vertica.py index 18e9a63f184b81..9cdb31a2329cb3 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/vertica.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/vertica.py @@ -4,6 +4,7 @@ from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Set, Tuple, Union import pydantic +import pytest from pydantic.class_validators import validator from vertica_sqlalchemy_dialect.base import VerticaInspector @@ -55,6 +56,8 @@ if TYPE_CHECKING: from datahub.ingestion.source.ge_data_profiler import GEProfilerRequest + +pytestmark = pytest.mark.integration_batch_4 logger: logging.Logger = logging.getLogger(__name__) diff --git a/metadata-ingestion/tests/conftest.py b/metadata-ingestion/tests/conftest.py index 4cc25bcf38b08c..c6d7f6c0fb03aa 100644 --- a/metadata-ingestion/tests/conftest.py +++ b/metadata-ingestion/tests/conftest.py @@ -73,7 +73,8 @@ def pytest_collection_modifyitems( if ( "docker_compose_runner" in item.fixturenames # type: ignore[attr-defined] or any( - marker.name == "integration_batch_2" for marker in item.iter_markers() + marker.name.startswith("integration_batch_") + for marker in item.iter_markers() ) ): item.add_marker(pytest.mark.slow) diff --git a/metadata-ingestion/tests/integration/cassandra/test_cassandra.py b/metadata-ingestion/tests/integration/cassandra/test_cassandra.py index ddb11acabc2371..0a3dd5f6bb2492 100644 --- a/metadata-ingestion/tests/integration/cassandra/test_cassandra.py +++ b/metadata-ingestion/tests/integration/cassandra/test_cassandra.py @@ -13,6 +13,8 @@ _resources_dir = pathlib.Path(__file__).parent +pytestmark = pytest.mark.integration_batch_4 + @pytest.mark.integration def test_cassandra_ingest(docker_compose_runner, pytestconfig, tmp_path, monkeypatch): diff --git a/metadata-ingestion/tests/integration/dremio/test_dremio.py b/metadata-ingestion/tests/integration/dremio/test_dremio.py index d070ae1015558d..be3ac31a34b156 100644 --- a/metadata-ingestion/tests/integration/dremio/test_dremio.py +++ b/metadata-ingestion/tests/integration/dremio/test_dremio.py @@ -12,6 +12,8 @@ from tests.test_helpers.click_helpers import run_datahub_cmd from tests.test_helpers.docker_helpers import wait_for_port +pytestmark = pytest.mark.integration_batch_4 + FROZEN_TIME = "2023-10-15 07:00:00" MINIO_PORT = 9000 MYSQL_PORT = 3306 diff --git a/metadata-ingestion/tests/integration/hana/test_hana.py b/metadata-ingestion/tests/integration/hana/test_hana.py index 63903ddcc47a82..066bac8f3f9f10 100644 --- a/metadata-ingestion/tests/integration/hana/test_hana.py +++ b/metadata-ingestion/tests/integration/hana/test_hana.py @@ -7,7 +7,7 @@ from tests.test_helpers.click_helpers import run_datahub_cmd from tests.test_helpers.docker_helpers import wait_for_port -pytestmark = pytest.mark.integration_batch_2 +pytestmark = pytest.mark.integration_batch_4 FROZEN_TIME = "2020-04-14 07:00:00" diff --git a/metadata-ingestion/tests/integration/kafka/test_kafka.py b/metadata-ingestion/tests/integration/kafka/test_kafka.py index 70e7537deb29ff..45ee5ec39f7642 100644 --- a/metadata-ingestion/tests/integration/kafka/test_kafka.py +++ b/metadata-ingestion/tests/integration/kafka/test_kafka.py @@ -15,6 +15,8 @@ from tests.test_helpers.click_helpers import run_datahub_cmd from tests.test_helpers.docker_helpers import wait_for_port +pytestmark = pytest.mark.integration_batch_4 + FROZEN_TIME = "2020-04-14 07:00:00" diff --git a/metadata-ingestion/tests/integration/powerbi/test_admin_only_api.py b/metadata-ingestion/tests/integration/powerbi/test_admin_only_api.py index c15bd686465ca5..addbbce2feec9d 100644 --- a/metadata-ingestion/tests/integration/powerbi/test_admin_only_api.py +++ b/metadata-ingestion/tests/integration/powerbi/test_admin_only_api.py @@ -7,7 +7,7 @@ from datahub.ingestion.run.pipeline import Pipeline from datahub.testing import mce_helpers -pytestmark = pytest.mark.integration_batch_2 +pytestmark = pytest.mark.integration_batch_3 FROZEN_TIME = "2022-02-03 07:00:00" diff --git a/metadata-ingestion/tests/integration/powerbi/test_m_parser.py b/metadata-ingestion/tests/integration/powerbi/test_m_parser.py index 2b4e036d88b0cc..cf22f2583a9c64 100644 --- a/metadata-ingestion/tests/integration/powerbi/test_m_parser.py +++ b/metadata-ingestion/tests/integration/powerbi/test_m_parser.py @@ -25,7 +25,7 @@ Lineage, ) -pytestmark = pytest.mark.integration_batch_2 +pytestmark = pytest.mark.integration_batch_3 M_QUERIES = [ 'let\n Source = Snowflake.Databases("bu10758.ap-unknown-2.fakecomputing.com","PBI_TEST_WAREHOUSE_PROD",[Role="PBI_TEST_MEMBER"]),\n PBI_TEST_Database = Source{[Name="PBI_TEST",Kind="Database"]}[Data],\n TEST_Schema = PBI_TEST_Database{[Name="TEST",Kind="Schema"]}[Data],\n TESTTABLE_Table = TEST_Schema{[Name="TESTTABLE",Kind="Table"]}[Data]\nin\n TESTTABLE_Table', diff --git a/metadata-ingestion/tests/integration/powerbi/test_powerbi.py b/metadata-ingestion/tests/integration/powerbi/test_powerbi.py index 630cc44e28b25f..2fdc2a1f95267b 100644 --- a/metadata-ingestion/tests/integration/powerbi/test_powerbi.py +++ b/metadata-ingestion/tests/integration/powerbi/test_powerbi.py @@ -26,7 +26,7 @@ from datahub.testing import mce_helpers from tests.test_helpers import test_connection_helpers -pytestmark = pytest.mark.integration_batch_2 +pytestmark = pytest.mark.integration_batch_3 FROZEN_TIME = "2022-02-03 07:00:00" From 4b986a148c01a2582e05311df04f5d9beab0a3b4 Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Fri, 25 Jul 2025 19:02:08 -0700 Subject: [PATCH 2/5] shuffle a bit --- .github/workflows/metadata-ingestion.yml | 4 +- .../tests/integration/kafka/test_kafka.py | 3 - .../integration/kafka/test_kafka_state.py | 182 ------------------ 3 files changed, 2 insertions(+), 187 deletions(-) delete mode 100644 metadata-ingestion/tests/integration/kafka/test_kafka_state.py diff --git a/.github/workflows/metadata-ingestion.yml b/.github/workflows/metadata-ingestion.yml index 892809a6995808..ae1713370317bf 100644 --- a/.github/workflows/metadata-ingestion.yml +++ b/.github/workflows/metadata-ingestion.yml @@ -1,4 +1,4 @@ -name: metadata ingestion +name: Metadata Ingestion on: push: branches: @@ -29,7 +29,7 @@ env: DEPOT_PROJECT_ID: "${{ vars.DEPOT_PROJECT_ID }}" jobs: - metadata-ingestion: + ci: runs-on: ${{ vars.DEPOT_PROJECT_ID != '' && 'depot-ubuntu-latest' || 'ubuntu-latest' }} timeout-minutes: 60 env: diff --git a/metadata-ingestion/tests/integration/kafka/test_kafka.py b/metadata-ingestion/tests/integration/kafka/test_kafka.py index 45ee5ec39f7642..8ac216e6fa8376 100644 --- a/metadata-ingestion/tests/integration/kafka/test_kafka.py +++ b/metadata-ingestion/tests/integration/kafka/test_kafka.py @@ -43,7 +43,6 @@ def mock_kafka_service(docker_compose_runner, test_resources_dir): @pytest.mark.parametrize("approach", ["kafka_without_schemas", "kafka"]) @freeze_time(FROZEN_TIME) -@pytest.mark.integration def test_kafka_ingest( mock_kafka_service, test_resources_dir, pytestconfig, tmp_path, mock_time, approach ): @@ -83,7 +82,6 @@ def test_kafka_ingest( ), ], ) -@pytest.mark.integration @freeze_time(FROZEN_TIME) def test_kafka_test_connection(mock_kafka_service, config_dict, is_success): report = test_connection_helpers.run_test_connection(KafkaSource, config_dict) @@ -106,7 +104,6 @@ def test_kafka_test_connection(mock_kafka_service, config_dict, is_success): @freeze_time(FROZEN_TIME) -@pytest.mark.integration def test_kafka_oauth_callback( mock_kafka_service, test_resources_dir, pytestconfig, tmp_path, mock_time ): diff --git a/metadata-ingestion/tests/integration/kafka/test_kafka_state.py b/metadata-ingestion/tests/integration/kafka/test_kafka_state.py deleted file mode 100644 index d2e447464989ed..00000000000000 --- a/metadata-ingestion/tests/integration/kafka/test_kafka_state.py +++ /dev/null @@ -1,182 +0,0 @@ -import time -from typing import Any, Dict, List -from unittest.mock import patch - -import pytest -from confluent_kafka.admin import AdminClient, NewTopic -from freezegun import freeze_time - -from tests.test_helpers.docker_helpers import wait_for_port -from tests.test_helpers.state_helpers import ( - get_current_checkpoint_from_pipeline, - run_and_get_pipeline, - validate_all_providers_have_committed_successfully, -) - -FROZEN_TIME = "2020-04-14 07:00:00" -KAFKA_PORT = 29092 -KAFKA_BOOTSTRAP_SERVER = f"localhost:{KAFKA_PORT}" -GMS_PORT = 8080 -GMS_SERVER = f"http://localhost:{GMS_PORT}" - - -class KafkaTopicsCxtManager: - def __init__(self, topics: List[str], bootstrap_servers: str) -> None: - self.topics = topics - self.bootstrap_servers = bootstrap_servers - - def create_kafka_topics(self, topics: List[NewTopic]) -> None: - """ - creates new kafka topics - """ - admin_config: Dict[str, Any] = { - "bootstrap.servers": self.bootstrap_servers, - } - a = AdminClient(admin_config) - - fs = a.create_topics(topics, operation_timeout=3) - - # Wait for operation to finish. - for topic, f in fs.items(): - try: - f.result() # The result itself is None - print(f"Topic {topic} created") - except Exception as e: - print(f"Failed to create topic {topic}: {e}") - raise e - - def delete_kafka_topics(self, topics: List[str]) -> None: - """ - delete a list of existing Kafka topics - """ - admin_config: Dict[str, Any] = { - "bootstrap.servers": self.bootstrap_servers, - } - a = AdminClient(admin_config) - - fs = a.delete_topics(topics, operation_timeout=3) - - # Wait for operation to finish. - for topic, f in fs.items(): - try: - f.result() # The result itself is None - print(f"Topic {topic} deleted") - except Exception as e: - # this error should be ignored when we already deleted - # the topic within the test code - print(f"Failed to delete topic {topic}: {e}") - - def __enter__(self): - topics = [ - NewTopic(topic=topic_name, num_partitions=1, replication_factor=1) - for topic_name in self.topics - ] - self.create_kafka_topics(topics) - return self - - def __exit__(self, exc_type, exc, traceback): - self.delete_kafka_topics(self.topics) - - -@freeze_time(FROZEN_TIME) -@pytest.mark.integration -def test_kafka_ingest_with_stateful( - docker_compose_runner, pytestconfig, tmp_path, mock_time, mock_datahub_graph -): - test_resources_dir = pytestconfig.rootpath / "tests/integration/kafka" - topic_prefix: str = "stateful_ingestion_test" - topic_names: List[str] = [f"{topic_prefix}_t1", f"{topic_prefix}_t2"] - platform_instance = "test_platform_instance_1" - - with docker_compose_runner( - test_resources_dir / "docker-compose.yml", "kafka" - ) as docker_services: - wait_for_port(docker_services, "test_broker", KAFKA_PORT, timeout=120) - wait_for_port(docker_services, "test_schema_registry", 8081, timeout=120) - - source_config_dict: Dict[str, Any] = { - "connection": { - "bootstrap": KAFKA_BOOTSTRAP_SERVER, - }, - "platform_instance": f"{platform_instance}", - # enable stateful ingestion - "stateful_ingestion": { - "enabled": True, - "remove_stale_metadata": True, - "fail_safe_threshold": 100.0, - "state_provider": { - "type": "datahub", - "config": {"datahub_api": {"server": GMS_SERVER}}, - }, - }, - } - - pipeline_config_dict: Dict[str, Any] = { - "source": { - "type": "kafka", - "config": source_config_dict, - }, - "sink": { - # we are not really interested in the resulting events for this test - "type": "console" - }, - "pipeline_name": "test_pipeline", - # enable reporting - "reporting": [ - { - "type": "datahub", - } - ], - } - - # topics will be automatically created and deleted upon test completion - with ( - KafkaTopicsCxtManager(topic_names, KAFKA_BOOTSTRAP_SERVER) as kafka_ctx, - patch( - "datahub.ingestion.source.state_provider.datahub_ingestion_checkpointing_provider.DataHubGraph", - mock_datahub_graph, - ) as mock_checkpoint, - ): - # both checkpoint and reporting will use the same mocked graph instance - mock_checkpoint.return_value = mock_datahub_graph - - # 1. Do the first run of the pipeline and get the default job's checkpoint. - pipeline_run1 = run_and_get_pipeline(pipeline_config_dict) - checkpoint1 = get_current_checkpoint_from_pipeline(pipeline_run1) - - assert checkpoint1 - assert checkpoint1.state - - # 2. Drop the first topic created during step 1 + rerun the pipeline and get the checkpoint state. - kafka_ctx.delete_kafka_topics([kafka_ctx.topics[0]]) - # sleep to guarantee eventual consistency for kafka topic deletion - time.sleep(1) - pipeline_run2 = run_and_get_pipeline(pipeline_config_dict) - checkpoint2 = get_current_checkpoint_from_pipeline(pipeline_run2) - - assert checkpoint2 - assert checkpoint2.state - - # 3. Perform all assertions on the states. The deleted topic should not be - # part of the second state - state1 = checkpoint1.state - state2 = checkpoint2.state - difference_urns = list( - state1.get_urns_not_in(type="topic", other_checkpoint_state=state2) - ) - - assert len(difference_urns) == 1 - assert ( - difference_urns[0] - == f"urn:li:dataset:(urn:li:dataPlatform:kafka,{platform_instance}.{kafka_ctx.topics[0]},PROD)" - ) - - # 4. Validate that all providers have committed successfully. - # NOTE: The following validation asserts for presence of state as well - # and validates reporting. - validate_all_providers_have_committed_successfully( - pipeline=pipeline_run1, expected_providers=1 - ) - validate_all_providers_have_committed_successfully( - pipeline=pipeline_run1, expected_providers=1 - ) From 9537262b89038a1135892802505be4dd54828cd4 Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Mon, 28 Jul 2025 10:08:36 -0700 Subject: [PATCH 3/5] add 6th batch --- .github/workflows/metadata-ingestion.yml | 1 + metadata-ingestion/build.gradle | 4 ++-- metadata-ingestion/setup.cfg | 1 + metadata-ingestion/tests/integration/powerbi/test_m_parser.py | 2 +- 4 files changed, 5 insertions(+), 3 deletions(-) diff --git a/.github/workflows/metadata-ingestion.yml b/.github/workflows/metadata-ingestion.yml index ae1713370317bf..a434fc779f5603 100644 --- a/.github/workflows/metadata-ingestion.yml +++ b/.github/workflows/metadata-ingestion.yml @@ -46,6 +46,7 @@ jobs: "testIntegrationBatch2", "testIntegrationBatch3", "testIntegrationBatch4", + "testIntegrationBatch5", ] include: # Version compatibility tests. diff --git a/metadata-ingestion/build.gradle b/metadata-ingestion/build.gradle index afec1afbc6765f..2de729d6d896c5 100644 --- a/metadata-ingestion/build.gradle +++ b/metadata-ingestion/build.gradle @@ -191,8 +191,8 @@ task testSingle(dependsOn: [installDevTest]) { } } -// Create testIntegrationBatch0 through testIntegrationBatch4 tasks -(0..4).each { batchNum -> +// Create testIntegrationBatch0 through testIntegrationBatch5 tasks +(0..5).each { batchNum -> tasks.register("testIntegrationBatch${batchNum}", Exec) { dependsOn installDevTest def cvg_arg = get_coverage_args("intBatch${batchNum}") diff --git a/metadata-ingestion/setup.cfg b/metadata-ingestion/setup.cfg index 5d7a7dab4ea5d4..838120312a3e7d 100644 --- a/metadata-ingestion/setup.cfg +++ b/metadata-ingestion/setup.cfg @@ -65,6 +65,7 @@ markers = integration_batch_2: mark tests to run in batch 2 of integration tests integration_batch_3: mark tests to run in batch 3 of integration tests (mostly powerbi) integration_batch_4: mark tests to run in batch 4 of integration tests + integration_batch_5: mark tests to run in batch 4 of integration tests (mostly powerbi) testpaths = tests/unit tests/integration diff --git a/metadata-ingestion/tests/integration/powerbi/test_m_parser.py b/metadata-ingestion/tests/integration/powerbi/test_m_parser.py index cf22f2583a9c64..7b3732d1f249e6 100644 --- a/metadata-ingestion/tests/integration/powerbi/test_m_parser.py +++ b/metadata-ingestion/tests/integration/powerbi/test_m_parser.py @@ -25,7 +25,7 @@ Lineage, ) -pytestmark = pytest.mark.integration_batch_3 +pytestmark = pytest.mark.integration_batch_5 M_QUERIES = [ 'let\n Source = Snowflake.Databases("bu10758.ap-unknown-2.fakecomputing.com","PBI_TEST_WAREHOUSE_PROD",[Role="PBI_TEST_MEMBER"]),\n PBI_TEST_Database = Source{[Name="PBI_TEST",Kind="Database"]}[Data],\n TEST_Schema = PBI_TEST_Database{[Name="TEST",Kind="Schema"]}[Data],\n TESTTABLE_Table = TEST_Schema{[Name="TESTTABLE",Kind="Table"]}[Data]\nin\n TESTTABLE_Table', From 7475c5a81ef3671d0a16cdd0a456813ee53cbfc1 Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Mon, 28 Jul 2025 10:43:56 -0700 Subject: [PATCH 4/5] shuffle a bit more --- .../tests/integration/clickhouse/test_clickhouse.py | 1 + metadata-ingestion/tests/integration/druid/test_druid.py | 1 + metadata-ingestion/tests/integration/mongodb/test_mongodb.py | 3 ++- metadata-ingestion/tests/integration/vertica/test_vertica.py | 1 + 4 files changed, 5 insertions(+), 1 deletion(-) diff --git a/metadata-ingestion/tests/integration/clickhouse/test_clickhouse.py b/metadata-ingestion/tests/integration/clickhouse/test_clickhouse.py index 2526336c38d69d..a6aa64907fd8ea 100644 --- a/metadata-ingestion/tests/integration/clickhouse/test_clickhouse.py +++ b/metadata-ingestion/tests/integration/clickhouse/test_clickhouse.py @@ -7,6 +7,7 @@ from tests.test_helpers.click_helpers import run_datahub_cmd from tests.test_helpers.docker_helpers import wait_for_port +pytestmark = pytest.mark.integration_batch_2 FROZEN_TIME = "2020-04-14 07:00:00" diff --git a/metadata-ingestion/tests/integration/druid/test_druid.py b/metadata-ingestion/tests/integration/druid/test_druid.py index 1f738109d9b368..741c2b5926c033 100644 --- a/metadata-ingestion/tests/integration/druid/test_druid.py +++ b/metadata-ingestion/tests/integration/druid/test_druid.py @@ -7,6 +7,7 @@ from tests.test_helpers.click_helpers import run_datahub_cmd from tests.test_helpers.docker_helpers import wait_for_port +pytestmark = pytest.mark.integration_batch_2 FROZEN_TIME = "2025-02-24 09:00:00" TESTS_DIR = pathlib.Path(__file__).parent GOLDEN_FILES_DIR = TESTS_DIR / "golden" diff --git a/metadata-ingestion/tests/integration/mongodb/test_mongodb.py b/metadata-ingestion/tests/integration/mongodb/test_mongodb.py index 61c60a08802db0..69ee6551b4c095 100644 --- a/metadata-ingestion/tests/integration/mongodb/test_mongodb.py +++ b/metadata-ingestion/tests/integration/mongodb/test_mongodb.py @@ -4,8 +4,9 @@ from datahub.testing import mce_helpers from tests.test_helpers.docker_helpers import wait_for_port +pytestmark = pytest.mark.integration_batch_2 + -@pytest.mark.integration def test_mongodb_ingest(docker_compose_runner, pytestconfig, tmp_path, mock_time): test_resources_dir = pytestconfig.rootpath / "tests/integration/mongodb" diff --git a/metadata-ingestion/tests/integration/vertica/test_vertica.py b/metadata-ingestion/tests/integration/vertica/test_vertica.py index a3fb61882d4db1..51f5e46ce57466 100644 --- a/metadata-ingestion/tests/integration/vertica/test_vertica.py +++ b/metadata-ingestion/tests/integration/vertica/test_vertica.py @@ -8,6 +8,7 @@ from tests.test_helpers.click_helpers import run_datahub_cmd from tests.test_helpers.docker_helpers import cleanup_image, wait_for_port +pytestmark = pytest.mark.integration_batch_2 FROZEN_TIME = "2020-04-14 07:00:00" From 5bb54efff3d23a834591d79703f901ab5b4936df Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Mon, 28 Jul 2025 15:30:04 -0700 Subject: [PATCH 5/5] update workflow --- .github/workflows/airflow-plugin.yml | 4 ++-- metadata-ingestion-modules/airflow-plugin/build.gradle | 9 +-------- 2 files changed, 3 insertions(+), 10 deletions(-) diff --git a/.github/workflows/airflow-plugin.yml b/.github/workflows/airflow-plugin.yml index 796d782890ec8b..22aeeef0b31f89 100644 --- a/.github/workflows/airflow-plugin.yml +++ b/.github/workflows/airflow-plugin.yml @@ -31,6 +31,7 @@ env: jobs: airflow-plugin: runs-on: ${{ vars.DEPOT_PROJECT_ID != '' && 'depot-ubuntu-latest' || 'ubuntu-latest' }} + timeout-minutes: 30 env: DATAHUB_TELEMETRY_ENABLED: false strategy: @@ -64,7 +65,6 @@ jobs: - uses: actions/setup-python@v5 with: python-version: ${{ matrix.python-version }} - cache: "pip" - name: Install dependencies run: ./metadata-ingestion/scripts/install_deps.sh - name: Install airflow package and test (extras ${{ matrix.extra_pip_requirements }}) @@ -73,7 +73,7 @@ jobs: if: always() run: source metadata-ingestion-modules/airflow-plugin/venv/bin/activate && uv pip freeze - uses: actions/upload-artifact@v4 - if: ${{ always() && matrix.python-version == '3.10' && matrix.extra_pip_requirements == 'apache-airflow>=2.7.0' }} + if: ${{ always() && matrix.python-version == '3.10' && matrix.extra_pip_requirements == 'apache-airflow~=2.7.3' }} with: name: Test Results (Airflow Plugin ${{ matrix.python-version}}) path: | diff --git a/metadata-ingestion-modules/airflow-plugin/build.gradle b/metadata-ingestion-modules/airflow-plugin/build.gradle index 5f1240f7fde229..a28767f6036736 100644 --- a/metadata-ingestion-modules/airflow-plugin/build.gradle +++ b/metadata-ingestion-modules/airflow-plugin/build.gradle @@ -37,7 +37,7 @@ task environmentSetup(type: Exec) { "touch ${sentinel_file}" } -task installPackage(type: Exec, dependsOn: [environmentSetup, ':metadata-ingestion:codegen']) { +task install(type: Exec, dependsOn: [environmentSetup, ':metadata-ingestion:codegen']) { def sentinel_file = "${venv_name}/.build_install_package_sentinel" inputs.file file('setup.py') outputs.file(sentinel_file) @@ -47,8 +47,6 @@ task installPackage(type: Exec, dependsOn: [environmentSetup, ':metadata-ingesti "touch ${sentinel_file}" } -task install(dependsOn: [installPackage]) - task installDev(type: Exec, dependsOn: [install]) { def sentinel_file = "${venv_name}/.build_install_dev_sentinel" inputs.file file('setup.py') @@ -71,12 +69,7 @@ task installTest(type: Exec, dependsOn: [installDev]) { } task lint(type: Exec, dependsOn: installDev) { - /* - The find/sed combo below is a temporary work-around for the following mypy issue with airflow 2.2.0: - "venv/lib/python3.8/site-packages/airflow/_vendor/connexion/spec.py:169: error: invalid syntax". - */ commandLine 'bash', '-c', - "find ${venv_name}/lib -path *airflow/_vendor/connexion/spec.py -exec sed -i.bak -e '169,169s/ # type: List\\[str\\]//g' {} \\; && " + venv_activate_command + "ruff check src/ tests/ && " + "ruff format --check src/ tests/ && " +