Skip to content

feat(ingest): improve CI batches #14239

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Jul 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/airflow-plugin.yml

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚫 [actionlint] reported by reviewdog 🐶
property "extra_pip_extras" is not defined in object type {extra_pip_constraints: string; extra_pip_requirements: string; python-version: number} [expression]

run: ./gradlew -Pextra_pip_requirements='${{ matrix.extra_pip_requirements }}' -Pextra_pip_constraints='${{ matrix.extra_pip_constraints }}' -Pextra_pip_extras='${{ matrix.extra_pip_extras }}' :metadata-ingestion-modules:airflow-plugin:build

Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ env:
jobs:
airflow-plugin:
runs-on: ${{ vars.DEPOT_PROJECT_ID != '' && 'depot-ubuntu-latest' || 'ubuntu-latest' }}
timeout-minutes: 30
env:
DATAHUB_TELEMETRY_ENABLED: false
strategy:
Expand Down Expand Up @@ -64,7 +65,6 @@ jobs:
- uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
cache: "pip"
- name: Install dependencies
run: ./metadata-ingestion/scripts/install_deps.sh
- name: Install airflow package and test (extras ${{ matrix.extra_pip_requirements }})
Expand All @@ -73,7 +73,7 @@ jobs:
if: always()
run: source metadata-ingestion-modules/airflow-plugin/venv/bin/activate && uv pip freeze
- uses: actions/upload-artifact@v4
if: ${{ always() && matrix.python-version == '3.10' && matrix.extra_pip_requirements == 'apache-airflow>=2.7.0' }}
if: ${{ always() && matrix.python-version == '3.10' && matrix.extra_pip_requirements == 'apache-airflow~=2.7.3' }}
with:
name: Test Results (Airflow Plugin ${{ matrix.python-version}})
path: |
Expand Down
18 changes: 12 additions & 6 deletions .github/workflows/metadata-ingestion.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
name: metadata ingestion
name: Metadata Ingestion
on:
push:
branches:
Expand Down Expand Up @@ -29,7 +29,7 @@ env:
DEPOT_PROJECT_ID: "${{ vars.DEPOT_PROJECT_ID }}"

jobs:
metadata-ingestion:
ci:
runs-on: ${{ vars.DEPOT_PROJECT_ID != '' && 'depot-ubuntu-latest' || 'ubuntu-latest' }}
timeout-minutes: 60
env:
Expand All @@ -38,14 +38,20 @@ jobs:
# DATAHUB_LOOKML_GIT_TEST_SSH_KEY: ${{ secrets.DATAHUB_LOOKML_GIT_TEST_SSH_KEY }}
strategy:
matrix:
python-version: ["3.9", "3.11"]
command:
[
"testQuick",
python-version: ["3.11"]
command: [
"testQuick", # also runs lint
"testIntegrationBatch0",
"testIntegrationBatch1",
"testIntegrationBatch2",
"testIntegrationBatch3",
"testIntegrationBatch4",
"testIntegrationBatch5",
]
include:
# Version compatibility tests.
- python-version: "3.9"
command: "testQuick"
fail-fast: false
steps:
- name: Free up disk space
Expand Down
9 changes: 1 addition & 8 deletions metadata-ingestion-modules/airflow-plugin/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ task environmentSetup(type: Exec) {
"touch ${sentinel_file}"
}

task installPackage(type: Exec, dependsOn: [environmentSetup, ':metadata-ingestion:codegen']) {
task install(type: Exec, dependsOn: [environmentSetup, ':metadata-ingestion:codegen']) {
def sentinel_file = "${venv_name}/.build_install_package_sentinel"
inputs.file file('setup.py')
outputs.file(sentinel_file)
Expand All @@ -47,8 +47,6 @@ task installPackage(type: Exec, dependsOn: [environmentSetup, ':metadata-ingesti
"touch ${sentinel_file}"
}

task install(dependsOn: [installPackage])

task installDev(type: Exec, dependsOn: [install]) {
def sentinel_file = "${venv_name}/.build_install_dev_sentinel"
inputs.file file('setup.py')
Expand All @@ -71,12 +69,7 @@ task installTest(type: Exec, dependsOn: [installDev]) {
}

task lint(type: Exec, dependsOn: installDev) {
/*
The find/sed combo below is a temporary work-around for the following mypy issue with airflow 2.2.0:
"venv/lib/python3.8/site-packages/airflow/_vendor/connexion/spec.py:169: error: invalid syntax".
*/
commandLine 'bash', '-c',
"find ${venv_name}/lib -path *airflow/_vendor/connexion/spec.py -exec sed -i.bak -e '169,169s/ # type: List\\[str\\]//g' {} \\; && " +
venv_activate_command +
"ruff check src/ tests/ && " +
"ruff format --check src/ tests/ && " +
Expand Down
26 changes: 9 additions & 17 deletions metadata-ingestion/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -191,23 +191,15 @@ task testSingle(dependsOn: [installDevTest]) {
}
}

task testIntegrationBatch0(type: Exec, dependsOn: [installDevTest]) {
def cvg_arg = get_coverage_args("intBatch0")
commandLine 'bash', '-c',
venv_activate_command +
"${pytest_default_env} pytest ${cvg_arg} ${pytest_default_args} -m 'integration_batch_0' --junit-xml=junit.integrationbatch0.xml"
}
task testIntegrationBatch1(type: Exec, dependsOn: [installDevTest]) {
def cvg_arg = get_coverage_args("intBatch1")
commandLine 'bash', '-c',
venv_activate_command +
"${pytest_default_env} pytest ${cvg_arg} ${pytest_default_args} -m 'integration_batch_1' --junit-xml=junit.integrationbatch1.xml"
}
task testIntegrationBatch2(type: Exec, dependsOn: [installDevTest]) {
def cvg_arg = get_coverage_args("intBatch2")
commandLine 'bash', '-c',
venv_activate_command +
"${pytest_default_env} pytest ${cvg_arg} ${pytest_default_args} -m 'integration_batch_2' --junit-xml=junit.integrationbatch2.xml"
// Create testIntegrationBatch0 through testIntegrationBatch5 tasks
(0..5).each { batchNum ->
tasks.register("testIntegrationBatch${batchNum}", Exec) {
dependsOn installDevTest
def cvg_arg = get_coverage_args("intBatch${batchNum}")
commandLine 'bash', '-c',
venv_activate_command +
"${pytest_default_env} pytest ${cvg_arg} ${pytest_default_args} -m 'integration_batch_${batchNum}' --junit-xml=junit.integrationbatch${batchNum}.xml"
}
}

task testFull(type: Exec, dependsOn: [installDevTest]) {
Expand Down
3 changes: 3 additions & 0 deletions metadata-ingestion/setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ markers =
integration_batch_0: mark tests to run in batch 0 of integration tests. This is done mainly for parallelization in CI. Batch 0 is the default batch.
integration_batch_1: mark tests to run in batch 1 of integration tests
integration_batch_2: mark tests to run in batch 2 of integration tests
integration_batch_3: mark tests to run in batch 3 of integration tests (mostly powerbi)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it true there are two mostly powerbi batches?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah. somehow the m query parser is extremely slow, so the powerbi tests alone take ~24 minutes. so I split them into two batches

integration_batch_4: mark tests to run in batch 4 of integration tests
integration_batch_5: mark tests to run in batch 4 of integration tests (mostly powerbi)
testpaths =
tests/unit
tests/integration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Set, Tuple, Union

import pydantic
import pytest
from pydantic.class_validators import validator
from vertica_sqlalchemy_dialect.base import VerticaInspector

Expand Down Expand Up @@ -55,6 +56,8 @@

if TYPE_CHECKING:
from datahub.ingestion.source.ge_data_profiler import GEProfilerRequest

pytestmark = pytest.mark.integration_batch_4
logger: logging.Logger = logging.getLogger(__name__)


Expand Down
3 changes: 2 additions & 1 deletion metadata-ingestion/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ def pytest_collection_modifyitems(
if (
"docker_compose_runner" in item.fixturenames # type: ignore[attr-defined]
or any(
marker.name == "integration_batch_2" for marker in item.iter_markers()
marker.name.startswith("integration_batch_")
for marker in item.iter_markers()
)
):
item.add_marker(pytest.mark.slow)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@

_resources_dir = pathlib.Path(__file__).parent

pytestmark = pytest.mark.integration_batch_4


@pytest.mark.integration
def test_cassandra_ingest(docker_compose_runner, pytestconfig, tmp_path, monkeypatch):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from tests.test_helpers.click_helpers import run_datahub_cmd
from tests.test_helpers.docker_helpers import wait_for_port

pytestmark = pytest.mark.integration_batch_2
FROZEN_TIME = "2020-04-14 07:00:00"


Expand Down
2 changes: 2 additions & 0 deletions metadata-ingestion/tests/integration/dremio/test_dremio.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
from tests.test_helpers.click_helpers import run_datahub_cmd
from tests.test_helpers.docker_helpers import wait_for_port

pytestmark = pytest.mark.integration_batch_4

FROZEN_TIME = "2023-10-15 07:00:00"
MINIO_PORT = 9000
MYSQL_PORT = 3306
Expand Down
1 change: 1 addition & 0 deletions metadata-ingestion/tests/integration/druid/test_druid.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from tests.test_helpers.click_helpers import run_datahub_cmd
from tests.test_helpers.docker_helpers import wait_for_port

pytestmark = pytest.mark.integration_batch_2
FROZEN_TIME = "2025-02-24 09:00:00"
TESTS_DIR = pathlib.Path(__file__).parent
GOLDEN_FILES_DIR = TESTS_DIR / "golden"
Expand Down
2 changes: 1 addition & 1 deletion metadata-ingestion/tests/integration/hana/test_hana.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from tests.test_helpers.click_helpers import run_datahub_cmd
from tests.test_helpers.docker_helpers import wait_for_port

pytestmark = pytest.mark.integration_batch_2
pytestmark = pytest.mark.integration_batch_4
FROZEN_TIME = "2020-04-14 07:00:00"


Expand Down
5 changes: 2 additions & 3 deletions metadata-ingestion/tests/integration/kafka/test_kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
from tests.test_helpers.click_helpers import run_datahub_cmd
from tests.test_helpers.docker_helpers import wait_for_port

pytestmark = pytest.mark.integration_batch_4

FROZEN_TIME = "2020-04-14 07:00:00"


Expand All @@ -41,7 +43,6 @@ def mock_kafka_service(docker_compose_runner, test_resources_dir):

@pytest.mark.parametrize("approach", ["kafka_without_schemas", "kafka"])
@freeze_time(FROZEN_TIME)
@pytest.mark.integration
def test_kafka_ingest(
mock_kafka_service, test_resources_dir, pytestconfig, tmp_path, mock_time, approach
):
Expand Down Expand Up @@ -81,7 +82,6 @@ def test_kafka_ingest(
),
],
)
@pytest.mark.integration
@freeze_time(FROZEN_TIME)
def test_kafka_test_connection(mock_kafka_service, config_dict, is_success):
report = test_connection_helpers.run_test_connection(KafkaSource, config_dict)
Expand All @@ -104,7 +104,6 @@ def test_kafka_test_connection(mock_kafka_service, config_dict, is_success):


@freeze_time(FROZEN_TIME)
@pytest.mark.integration
def test_kafka_oauth_callback(
mock_kafka_service, test_resources_dir, pytestconfig, tmp_path, mock_time
):
Expand Down
Loading
Loading