diff --git a/.github/workflows/flow.yml b/.github/workflows/flow.yml index e1d9918ea8..896e69685a 100644 --- a/.github/workflows/flow.yml +++ b/.github/workflows/flow.yml @@ -408,6 +408,7 @@ jobs: CI_MONGO_URI: mongodb://localhost:27017/?replicaSet=rs0&authSource=admin CI_MONGO_USERNAME: "csuser" CI_MONGO_PASSWORD: "cspass" + CI_CLICKHOUSE_CLUSTER: "true" ENABLE_OTEL_METRICS: ${{ (matrix.db-version.pg == '16' || matrix.db-version.mysql == 'mysql-pos') && 'true' || 'false' }} OTEL_EXPORTER_OTLP_METRICS_ENDPOINT: http://localhost:4317 OTEL_EXPORTER_OTLP_METRICS_PROTOCOL: grpc diff --git a/docker-compose-e2e.yml b/docker-compose-e2e.yml new file mode 100644 index 0000000000..c753b7725d --- /dev/null +++ b/docker-compose-e2e.yml @@ -0,0 +1,171 @@ +name: peerdb-quickstart-e2e + +x-minio-config: &minio-config + PEERDB_CLICKHOUSE_AWS_CREDENTIALS_AWS_ACCESS_KEY_ID: _peerdb_minioadmin + PEERDB_CLICKHOUSE_AWS_CREDENTIALS_AWS_SECRET_ACCESS_KEY: _peerdb_minioadmin + PEERDB_CLICKHOUSE_AWS_CREDENTIALS_AWS_REGION: us-east-1 + PEERDB_CLICKHOUSE_AWS_CREDENTIALS_AWS_ENDPOINT_URL_S3: http://host.docker.internal:9001 + PEERDB_CLICKHOUSE_AWS_S3_BUCKET_NAME: peerdbbucket + +x-catalog-config: &catalog-config + PEERDB_CATALOG_HOST: catalog-e2e + PEERDB_CATALOG_PORT: 5432 + PEERDB_CATALOG_USER: postgres + PEERDB_CATALOG_PASSWORD: postgres + PEERDB_CATALOG_DATABASE: postgres + +services: + catalog-e2e: + container_name: catalog-e2e + image: postgres:17-alpine@sha256:fbe21607052bb5c298674f2fd8cf044a63aa3ddf50b81627f894f91f40f50bcb + command: -c config_file=/etc/postgresql.conf + ports: + - 9901:5432 + environment: + PGUSER: postgres + POSTGRES_PASSWORD: postgres + POSTGRES_DB: postgres + POSTGRES_INITDB_ARGS: --locale=C.UTF-8 + extra_hosts: + - "host.docker.internal:host-gateway" + volumes: + - pgdata-e2e:/var/lib/postgresql/data + - ./volumes/postgresql.conf:/etc/postgresql.conf + - ./volumes/docker-entrypoint-initdb.d:/docker-entrypoint-initdb.d + healthcheck: + test: ["CMD", "pg_isready", "-d", "postgres", "-U", "postgres"] + interval: 10s + timeout: 30s + retries: 5 + start_period: 60s + + temporal-e2e: + container_name: temporal-e2e + depends_on: + catalog-e2e: + condition: service_healthy + environment: + - DB=postgres12 + - DB_PORT=5432 + - POSTGRES_USER=postgres + - POSTGRES_PWD=postgres + - POSTGRES_SEEDS=catalog-e2e + - DYNAMIC_CONFIG_FILE_PATH=config/dynamicconfig/development-sql.yaml + image: temporalio/auto-setup:1.28@sha256:3ee84bf3ec5494f2be6ee0c1fea3b52684e50da3ddc5f997e6fa56ad340da9a8 + ports: + - 7233:7233 + volumes: + - ./volumes/temporal-dynamicconfig:/etc/temporal/config/dynamicconfig + labels: + kompose.volume.type: configMap + + temporal-admin-tools-e2e: + container_name: temporal-admin-tools-e2e + depends_on: + - temporal-e2e + environment: + - TEMPORAL_ADDRESS=temporal-e2e:7233 + - TEMPORAL_CLI_ADDRESS=temporal-e2e:7233 + - TEMPORAL_CLI_SHOW_STACKS=1 + image: temporalio/admin-tools:1.25.2-tctl-1.18.1-cli-1.1.1@sha256:da0c7a7982b571857173ab8f058e7f139b3054800abb4dcb100445d29a563ee8 + stdin_open: true + tty: true + entrypoint: /etc/temporal/entrypoint.sh + restart: on-failure + healthcheck: + test: ["CMD", "tctl", "workflow", "list"] + interval: 1s + timeout: 5s + retries: 30 + volumes: + - ./scripts/mirror-name-search.sh:/etc/temporal/entrypoint.sh + + temporal-ui-e2e: + container_name: temporal-ui-e2e + depends_on: + - temporal-e2e + environment: + - TEMPORAL_ADDRESS=temporal-e2e:7233 + - TEMPORAL_CORS_ORIGINS=http://localhost:3000 + - TEMPORAL_CSRF_COOKIE_INSECURE=true + image: temporalio/ui:2.39.0@sha256:b768f87f18b59663a6749e98a2f7782c266e8e4e4749f92248e2ba41d6330d3f + ports: + - 8085:8080 + + peerdb-e2e: + container_name: peerdb-server-e2e + stop_signal: SIGINT + build: + context: . + dockerfile: stacks/peerdb-server.Dockerfile + environment: + <<: *catalog-config + PEERDB_PASSWORD: peerdb + PEERDB_FLOW_SERVER_ADDRESS: grpc://flow_api:8112 + RUST_LOG: info + RUST_BACKTRACE: 1 + ports: + - 9900:9900 + depends_on: + catalog-e2e: + condition: service_healthy + + minio-e2e: + image: bitnami/minio:2025.6.13 + volumes: + - minio-data-e2e:/data + ports: + - "9001:9000" + - "9002:36987" + environment: + MINIO_ROOT_USER: _peerdb_minioadmin + MINIO_ROOT_PASSWORD: _peerdb_minioadmin + MINIO_CONSOLE_PORT_NUMBER: 36987 + MINIO_DEFAULT_BUCKETS: peerdbbucket + + mysql-e2e: + container_name: mysql-e2e + build: + context: . + dockerfile: stacks/mysql.Dockerfile + ports: + - 3306:3306 + environment: + MYSQL_ROOT_PASSWORD: example + volumes: + - mydata-e2e:/var/lib/mysql + extra_hosts: + - "host.docker.internal:host-gateway" + + clickhouse-e2e: + image: clickhouse/clickhouse-server + ports: + - 9000:9000 + environment: + CLICKHOUSE_PASSWORD: example + volumes: + - chdata-e2e:/var/lib/clickhouse + extra_hosts: + - "host.docker.internal:host-gateway" + +volumes: + pgdata-e2e: + driver_opts: + type: tmpfs + device: tmpfs + mydata-e2e: + driver_opts: + type: tmpfs + device: tmpfs + chdata-e2e: + driver_opts: + type: tmpfs + device: tmpfs + minio-data-e2e: + driver_opts: + type: tmpfs + device: tmpfs + +networks: + default: + name: peerdb_network diff --git a/flow/e2e/clickhouse/clickhouse.go b/flow/e2e/clickhouse/clickhouse.go index 833d510cc2..2b1d443638 100644 --- a/flow/e2e/clickhouse/clickhouse.go +++ b/flow/e2e/clickhouse/clickhouse.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "math/big" + "os" "reflect" "strings" "testing" @@ -65,6 +66,10 @@ func (s ClickHouseSuite) Suffix() string { } func (s ClickHouseSuite) Peer() *protos.Peer { + host := os.Getenv("CI_CLICKHOUSE_HOST") + if host == "" { + host = "localhost" + } dbname := "e2e_test_" + s.suffix if s.cluster { ret := &protos.Peer{ @@ -72,9 +77,10 @@ func (s ClickHouseSuite) Peer() *protos.Peer { Type: protos.DBType_CLICKHOUSE, Config: &protos.Peer_ClickhouseConfig{ ClickhouseConfig: &protos.ClickhouseConfig{ - Host: "localhost", + Host: host, Port: 9001, Database: dbname, + Password: os.Getenv("CI_CLICKHOUSE_PASSWORD"), DisableTls: true, S3: s.s3Helper.S3Config, Cluster: "cicluster", @@ -90,14 +96,19 @@ func (s ClickHouseSuite) Peer() *protos.Peer { } func (s ClickHouseSuite) PeerForDatabase(dbname string) *protos.Peer { + host := os.Getenv("CI_CLICKHOUSE_HOST") + if host == "" { + host = "localhost" + } ret := &protos.Peer{ Name: e2e.AddSuffix(s, dbname), Type: protos.DBType_CLICKHOUSE, Config: &protos.Peer_ClickhouseConfig{ ClickhouseConfig: &protos.ClickhouseConfig{ - Host: "localhost", + Host: host, Port: 9000, Database: dbname, + Password: os.Getenv("CI_CLICKHOUSE_PASSWORD"), DisableTls: true, S3: s.s3Helper.S3Config, }, @@ -375,7 +386,7 @@ func SetupSuite[TSource e2e.SuiteSource]( t.Helper() source, suffix, err := setupSource(t) - require.NoError(t, err, "failed to setup postgres") + require.NoError(t, err, "failed to setup source") s3Helper, err := e2e_s3.NewS3TestHelper(t.Context(), e2e_s3.Minio) require.NoError(t, err, "failed to setup S3") diff --git a/flow/e2e/clickhouse/peer_flow_ch_test.go b/flow/e2e/clickhouse/peer_flow_ch_test.go index d8ff52d8a2..17a3fb531e 100644 --- a/flow/e2e/clickhouse/peer_flow_ch_test.go +++ b/flow/e2e/clickhouse/peer_flow_ch_test.go @@ -4,6 +4,7 @@ import ( "embed" "fmt" "math/big" + "os" "reflect" "regexp" "strconv" @@ -41,6 +42,10 @@ func TestPeerFlowE2ETestSuitePG_CH(t *testing.T) { } func TestPeerFlowE2ETestSuiteMySQL_CH(t *testing.T) { + if os.Getenv("CI_MYSQL_VERSION") == "" { + t.Skip() + } + e2eshared.RunSuite(t, SetupSuite(t, false, func(t *testing.T) (*e2e.MySqlSource, string, error) { t.Helper() suffix := "mych_" + strings.ToLower(shared.RandomString(8)) @@ -50,6 +55,10 @@ func TestPeerFlowE2ETestSuiteMySQL_CH(t *testing.T) { } func TestPeerFlowE2ETestSuitePG_CH_Cluster(t *testing.T) { + if os.Getenv("CI_CLICKHOUSE_CLUTER") == "" { + t.Skip() + } + e2eshared.RunSuite(t, SetupSuite(t, true, func(t *testing.T) (*e2e.PostgresSource, string, error) { t.Helper() suffix := "pgchcl_" + strings.ToLower(shared.RandomString(8)) @@ -59,6 +68,10 @@ func TestPeerFlowE2ETestSuitePG_CH_Cluster(t *testing.T) { } func TestPeerFlowE2ETestSuiteMySQL_CH_Cluster(t *testing.T) { + if os.Getenv("CI_CLICKHOUSE_CLUTER") == "" || os.Getenv("CI_MYSQL_VERSION") == "" { + t.Skip() + } + e2eshared.RunSuite(t, SetupSuite(t, true, func(t *testing.T) (*e2e.MySqlSource, string, error) { t.Helper() suffix := "mychcl_" + strings.ToLower(shared.RandomString(8)) diff --git a/flow/e2e/test_utils.go b/flow/e2e/test_utils.go index ad6b721c52..6902f9c690 100644 --- a/flow/e2e/test_utils.go +++ b/flow/e2e/test_utils.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "log/slog" + "os" "slices" "strings" "testing" @@ -600,8 +601,13 @@ func NewTemporalClient(t *testing.T) client.Client { ), )) + hostport := os.Getenv("TEMPORAL_HOST_PORT") + if hostport == "" { + hostport = "localhost:7233" + } + tc, err := client.Dial(client.Options{ - HostPort: "localhost:7233", + HostPort: hostport, Logger: logger, }) if err != nil { diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index 7cd3b417d7..9d907034cc 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -780,7 +780,7 @@ func CDCFlowWorkflow( "panic in sync flow", slog.Any("error", panicErr.Error()), slog.String("stack", panicErr.StackTrace()), - slog.Any("sleepFor", sleepFor), + slog.Duration("sleepFor", sleepFor), ) } else { // cannot use shared.IsSQLStateError because temporal serialize/deserialize diff --git a/local-ci/db.sh b/local-ci/db.sh new file mode 100755 index 0000000000..5ba10c1e2a --- /dev/null +++ b/local-ci/db.sh @@ -0,0 +1,31 @@ +#!/bin/sh +set -Eeu + +cd "$(dirname "$0")"/.. + +DOCKER="docker" +EXTRA_ARGS="--no-attach temporal --no-attach temporal-ui" +PODMAN_ARGS="" + +if test -n "${USE_PODMAN:=}" +then + # 0 is found, checking for not found so we check for podman then + if $(docker compose &>/dev/null) && [ $? -ne 0 ]; then + if $(podman compose &>/dev/null) && [ $? -eq 0 ]; then + echo "docker could not be found on PATH, using podman compose" + USE_PODMAN=1 + else + echo "docker compose could not be found on PATH" + exit 1 + fi + fi +fi + +if test -n "$USE_PODMAN"; then + DOCKER="podman" + EXTRA_ARGS="" + PODMAN_ARGS="--podman-run-args=--replace" +fi + +export PEERDB_VERSION_SHA_SHORT=local-$(git rev-parse --short HEAD) +exec $DOCKER compose $PODMAN_ARGS -f docker-compose-e2e.yml up --build $EXTRA_ARGS diff --git a/local-ci/env b/local-ci/env new file mode 100644 index 0000000000..ab7c4d71ca --- /dev/null +++ b/local-ci/env @@ -0,0 +1,17 @@ +export PEERDB_CLICKHOUSE_AWS_CREDENTIALS_AWS_ACCESS_KEY_ID=_peerdb_minioadmin +export PEERDB_CLICKHOUSE_AWS_CREDENTIALS_AWS_SECRET_ACCESS_KEY=_peerdb_minioadmin +export PEERDB_CLICKHOUSE_AWS_CREDENTIALS_AWS_REGION=us-east-1 +export PEERDB_CLICKHOUSE_AWS_CREDENTIALS_AWS_ENDPOINT_URL_S3=http://localhost:9001 +export PEERDB_CLICKHOUSE_AWS_S3_BUCKET_NAME=peerdbbucket +export PEERDB_CATALOG_HOST=localhost +export PEERDB_CATALOG_PORT=9901 +export PEERDB_CATALOG_USER=postgres +export PEERDB_CATALOG_PASSWORD=postgres +export PEERDB_CATALOG_DATABASE=postgres +export TEMPORAL_HOST_PORT=localhost:7233 +export ENABLE_OTEL_METRICS=false +export AWS_ACCESS_KEY_ID=_peerdb_minioadmin +export AWS_SECRET_ACCESS_KEY=_peerdb_minioadmin +export AWS_REGION=us-east-1 +export AWS_ENDPOINT_URL_S3=http://localhost:9001 +export CI_CLICKHOUSE_PASSWORD=example diff --git a/local-ci/flow-worker.sh b/local-ci/flow-worker.sh new file mode 100755 index 0000000000..ab641a4925 --- /dev/null +++ b/local-ci/flow-worker.sh @@ -0,0 +1,5 @@ +#!/bin/sh +cd "$(dirname "$0")" +. ./env +cd ../flow +exec go run . worker diff --git a/local-ci/snapshot-worker.sh b/local-ci/snapshot-worker.sh new file mode 100755 index 0000000000..3a73e271ad --- /dev/null +++ b/local-ci/snapshot-worker.sh @@ -0,0 +1,5 @@ +#!/bin/sh +cd "$(dirname "$0")" +. ./env +cd ../flow +exec go run . snapshot-worker diff --git a/local-ci/test.sh b/local-ci/test.sh new file mode 100755 index 0000000000..31c13a2e7a --- /dev/null +++ b/local-ci/test.sh @@ -0,0 +1,5 @@ +#!/bin/sh +cd "$(dirname "$0")" +. ./env +cd ../flow +exec go test -v -parallel 1 -p 1 ./e2e/clickhouse/... diff --git a/volumes/postgresql.conf b/volumes/postgresql.conf index 36980f0ae7..2cb654880c 100644 --- a/volumes/postgresql.conf +++ b/volumes/postgresql.conf @@ -1,5 +1,6 @@ listen_addresses = '*' +max_connections = 1000 wal_level = logical max_wal_senders = 4 max_replication_slots = 4