Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/flow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,7 @@ jobs:
CI_MONGO_URI: mongodb://localhost:27017/?replicaSet=rs0&authSource=admin
CI_MONGO_USERNAME: "csuser"
CI_MONGO_PASSWORD: "cspass"
CI_CLICKHOUSE_CLUSTER: "true"
ENABLE_OTEL_METRICS: ${{ (matrix.db-version.pg == '16' || matrix.db-version.mysql == 'mysql-pos') && 'true' || 'false' }}
OTEL_EXPORTER_OTLP_METRICS_ENDPOINT: http://localhost:4317
OTEL_EXPORTER_OTLP_METRICS_PROTOCOL: grpc
Expand Down
171 changes: 171 additions & 0 deletions docker-compose-e2e.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
name: peerdb-quickstart-e2e

x-minio-config: &minio-config
PEERDB_CLICKHOUSE_AWS_CREDENTIALS_AWS_ACCESS_KEY_ID: _peerdb_minioadmin
PEERDB_CLICKHOUSE_AWS_CREDENTIALS_AWS_SECRET_ACCESS_KEY: _peerdb_minioadmin
PEERDB_CLICKHOUSE_AWS_CREDENTIALS_AWS_REGION: us-east-1
PEERDB_CLICKHOUSE_AWS_CREDENTIALS_AWS_ENDPOINT_URL_S3: http://host.docker.internal:9001
PEERDB_CLICKHOUSE_AWS_S3_BUCKET_NAME: peerdbbucket

x-catalog-config: &catalog-config
PEERDB_CATALOG_HOST: catalog-e2e
PEERDB_CATALOG_PORT: 5432
PEERDB_CATALOG_USER: postgres
PEERDB_CATALOG_PASSWORD: postgres
PEERDB_CATALOG_DATABASE: postgres

services:
catalog-e2e:
container_name: catalog-e2e
image: postgres:17-alpine@sha256:fbe21607052bb5c298674f2fd8cf044a63aa3ddf50b81627f894f91f40f50bcb
command: -c config_file=/etc/postgresql.conf
ports:
- 9901:5432
environment:
PGUSER: postgres
POSTGRES_PASSWORD: postgres
POSTGRES_DB: postgres
POSTGRES_INITDB_ARGS: --locale=C.UTF-8
extra_hosts:
- "host.docker.internal:host-gateway"
volumes:
- pgdata-e2e:/var/lib/postgresql/data
- ./volumes/postgresql.conf:/etc/postgresql.conf
- ./volumes/docker-entrypoint-initdb.d:/docker-entrypoint-initdb.d
healthcheck:
test: ["CMD", "pg_isready", "-d", "postgres", "-U", "postgres"]
interval: 10s
timeout: 30s
retries: 5
start_period: 60s

temporal-e2e:
container_name: temporal-e2e
depends_on:
catalog-e2e:
condition: service_healthy
environment:
- DB=postgres12
- DB_PORT=5432
- POSTGRES_USER=postgres
- POSTGRES_PWD=postgres
- POSTGRES_SEEDS=catalog-e2e
- DYNAMIC_CONFIG_FILE_PATH=config/dynamicconfig/development-sql.yaml
image: temporalio/auto-setup:1.28@sha256:3ee84bf3ec5494f2be6ee0c1fea3b52684e50da3ddc5f997e6fa56ad340da9a8
ports:
- 7233:7233
volumes:
- ./volumes/temporal-dynamicconfig:/etc/temporal/config/dynamicconfig
labels:
kompose.volume.type: configMap

temporal-admin-tools-e2e:
container_name: temporal-admin-tools-e2e
depends_on:
- temporal-e2e
environment:
- TEMPORAL_ADDRESS=temporal-e2e:7233
- TEMPORAL_CLI_ADDRESS=temporal-e2e:7233
- TEMPORAL_CLI_SHOW_STACKS=1
image: temporalio/admin-tools:1.25.2-tctl-1.18.1-cli-1.1.1@sha256:da0c7a7982b571857173ab8f058e7f139b3054800abb4dcb100445d29a563ee8
stdin_open: true
tty: true
entrypoint: /etc/temporal/entrypoint.sh
restart: on-failure
healthcheck:
test: ["CMD", "tctl", "workflow", "list"]
interval: 1s
timeout: 5s
retries: 30
volumes:
- ./scripts/mirror-name-search.sh:/etc/temporal/entrypoint.sh

temporal-ui-e2e:
container_name: temporal-ui-e2e
depends_on:
- temporal-e2e
environment:
- TEMPORAL_ADDRESS=temporal-e2e:7233
- TEMPORAL_CORS_ORIGINS=http://localhost:3000
- TEMPORAL_CSRF_COOKIE_INSECURE=true
image: temporalio/ui:2.39.0@sha256:b768f87f18b59663a6749e98a2f7782c266e8e4e4749f92248e2ba41d6330d3f
ports:
- 8085:8080

peerdb-e2e:
container_name: peerdb-server-e2e
stop_signal: SIGINT
build:
context: .
dockerfile: stacks/peerdb-server.Dockerfile
environment:
<<: *catalog-config
PEERDB_PASSWORD: peerdb
PEERDB_FLOW_SERVER_ADDRESS: grpc://flow_api:8112
RUST_LOG: info
RUST_BACKTRACE: 1
ports:
- 9900:9900
depends_on:
catalog-e2e:
condition: service_healthy

minio-e2e:
image: bitnami/minio:2025.6.13
volumes:
- minio-data-e2e:/data
ports:
- "9001:9000"
- "9002:36987"
environment:
MINIO_ROOT_USER: _peerdb_minioadmin
MINIO_ROOT_PASSWORD: _peerdb_minioadmin
MINIO_CONSOLE_PORT_NUMBER: 36987
MINIO_DEFAULT_BUCKETS: peerdbbucket

mysql-e2e:
container_name: mysql-e2e
build:
context: .
dockerfile: stacks/mysql.Dockerfile
ports:
- 3306:3306
environment:
MYSQL_ROOT_PASSWORD: example
volumes:
- mydata-e2e:/var/lib/mysql
extra_hosts:
- "host.docker.internal:host-gateway"

clickhouse-e2e:
image: clickhouse/clickhouse-server
ports:
- 9000:9000
environment:
CLICKHOUSE_PASSWORD: example
volumes:
- chdata-e2e:/var/lib/clickhouse
extra_hosts:
- "host.docker.internal:host-gateway"

volumes:
pgdata-e2e:
driver_opts:
type: tmpfs
device: tmpfs
mydata-e2e:
driver_opts:
type: tmpfs
device: tmpfs
chdata-e2e:
driver_opts:
type: tmpfs
device: tmpfs
minio-data-e2e:
driver_opts:
type: tmpfs
device: tmpfs

networks:
default:
name: peerdb_network
17 changes: 14 additions & 3 deletions flow/e2e/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"math/big"
"os"
"reflect"
"strings"
"testing"
Expand Down Expand Up @@ -65,16 +66,21 @@ func (s ClickHouseSuite) Suffix() string {
}

func (s ClickHouseSuite) Peer() *protos.Peer {
host := os.Getenv("CI_CLICKHOUSE_HOST")
if host == "" {
host = "localhost"
}
dbname := "e2e_test_" + s.suffix
if s.cluster {
ret := &protos.Peer{
Name: e2e.AddSuffix(s, dbname),
Type: protos.DBType_CLICKHOUSE,
Config: &protos.Peer_ClickhouseConfig{
ClickhouseConfig: &protos.ClickhouseConfig{
Host: "localhost",
Host: host,
Port: 9001,
Database: dbname,
Password: os.Getenv("CI_CLICKHOUSE_PASSWORD"),
DisableTls: true,
S3: s.s3Helper.S3Config,
Cluster: "cicluster",
Expand All @@ -90,14 +96,19 @@ func (s ClickHouseSuite) Peer() *protos.Peer {
}

func (s ClickHouseSuite) PeerForDatabase(dbname string) *protos.Peer {
host := os.Getenv("CI_CLICKHOUSE_HOST")
if host == "" {
host = "localhost"
}
ret := &protos.Peer{
Name: e2e.AddSuffix(s, dbname),
Type: protos.DBType_CLICKHOUSE,
Config: &protos.Peer_ClickhouseConfig{
ClickhouseConfig: &protos.ClickhouseConfig{
Host: "localhost",
Host: host,
Port: 9000,
Database: dbname,
Password: os.Getenv("CI_CLICKHOUSE_PASSWORD"),
DisableTls: true,
S3: s.s3Helper.S3Config,
},
Expand Down Expand Up @@ -375,7 +386,7 @@ func SetupSuite[TSource e2e.SuiteSource](
t.Helper()

source, suffix, err := setupSource(t)
require.NoError(t, err, "failed to setup postgres")
require.NoError(t, err, "failed to setup source")

s3Helper, err := e2e_s3.NewS3TestHelper(t.Context(), e2e_s3.Minio)
require.NoError(t, err, "failed to setup S3")
Expand Down
13 changes: 13 additions & 0 deletions flow/e2e/clickhouse/peer_flow_ch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"embed"
"fmt"
"math/big"
"os"
"reflect"
"regexp"
"strconv"
Expand Down Expand Up @@ -41,6 +42,10 @@ func TestPeerFlowE2ETestSuitePG_CH(t *testing.T) {
}

func TestPeerFlowE2ETestSuiteMySQL_CH(t *testing.T) {
if os.Getenv("CI_MYSQL_VERSION") == "" {
t.Skip()
}

e2eshared.RunSuite(t, SetupSuite(t, false, func(t *testing.T) (*e2e.MySqlSource, string, error) {
t.Helper()
suffix := "mych_" + strings.ToLower(shared.RandomString(8))
Expand All @@ -50,6 +55,10 @@ func TestPeerFlowE2ETestSuiteMySQL_CH(t *testing.T) {
}

func TestPeerFlowE2ETestSuitePG_CH_Cluster(t *testing.T) {
if os.Getenv("CI_CLICKHOUSE_CLUTER") == "" {
t.Skip()
}

e2eshared.RunSuite(t, SetupSuite(t, true, func(t *testing.T) (*e2e.PostgresSource, string, error) {
t.Helper()
suffix := "pgchcl_" + strings.ToLower(shared.RandomString(8))
Expand All @@ -59,6 +68,10 @@ func TestPeerFlowE2ETestSuitePG_CH_Cluster(t *testing.T) {
}

func TestPeerFlowE2ETestSuiteMySQL_CH_Cluster(t *testing.T) {
if os.Getenv("CI_CLICKHOUSE_CLUTER") == "" || os.Getenv("CI_MYSQL_VERSION") == "" {
t.Skip()
}

e2eshared.RunSuite(t, SetupSuite(t, true, func(t *testing.T) (*e2e.MySqlSource, string, error) {
t.Helper()
suffix := "mychcl_" + strings.ToLower(shared.RandomString(8))
Expand Down
8 changes: 7 additions & 1 deletion flow/e2e/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"log/slog"
"os"
"slices"
"strings"
"testing"
Expand Down Expand Up @@ -600,8 +601,13 @@ func NewTemporalClient(t *testing.T) client.Client {
),
))

hostport := os.Getenv("TEMPORAL_HOST_PORT")
if hostport == "" {
hostport = "localhost:7233"
}

tc, err := client.Dial(client.Options{
HostPort: "localhost:7233",
HostPort: hostport,
Logger: logger,
})
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion flow/workflows/cdc_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -780,7 +780,7 @@ func CDCFlowWorkflow(
"panic in sync flow",
slog.Any("error", panicErr.Error()),
slog.String("stack", panicErr.StackTrace()),
slog.Any("sleepFor", sleepFor),
slog.Duration("sleepFor", sleepFor),
)
} else {
// cannot use shared.IsSQLStateError because temporal serialize/deserialize
Expand Down
31 changes: 31 additions & 0 deletions local-ci/db.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
#!/bin/sh
set -Eeu

cd "$(dirname "$0")"/..

DOCKER="docker"
EXTRA_ARGS="--no-attach temporal --no-attach temporal-ui"
PODMAN_ARGS=""

if test -n "${USE_PODMAN:=}"
then
# 0 is found, checking for not found so we check for podman then
if $(docker compose &>/dev/null) && [ $? -ne 0 ]; then
if $(podman compose &>/dev/null) && [ $? -eq 0 ]; then
echo "docker could not be found on PATH, using podman compose"
USE_PODMAN=1
else
echo "docker compose could not be found on PATH"
exit 1
fi
fi
fi

if test -n "$USE_PODMAN"; then
DOCKER="podman"
EXTRA_ARGS=""
PODMAN_ARGS="--podman-run-args=--replace"
fi

export PEERDB_VERSION_SHA_SHORT=local-$(git rev-parse --short HEAD)
exec $DOCKER compose $PODMAN_ARGS -f docker-compose-e2e.yml up --build $EXTRA_ARGS
17 changes: 17 additions & 0 deletions local-ci/env
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
export PEERDB_CLICKHOUSE_AWS_CREDENTIALS_AWS_ACCESS_KEY_ID=_peerdb_minioadmin
export PEERDB_CLICKHOUSE_AWS_CREDENTIALS_AWS_SECRET_ACCESS_KEY=_peerdb_minioadmin
export PEERDB_CLICKHOUSE_AWS_CREDENTIALS_AWS_REGION=us-east-1
export PEERDB_CLICKHOUSE_AWS_CREDENTIALS_AWS_ENDPOINT_URL_S3=http://localhost:9001
export PEERDB_CLICKHOUSE_AWS_S3_BUCKET_NAME=peerdbbucket
export PEERDB_CATALOG_HOST=localhost
export PEERDB_CATALOG_PORT=9901
export PEERDB_CATALOG_USER=postgres
export PEERDB_CATALOG_PASSWORD=postgres
export PEERDB_CATALOG_DATABASE=postgres
export TEMPORAL_HOST_PORT=localhost:7233
export ENABLE_OTEL_METRICS=false
export AWS_ACCESS_KEY_ID=_peerdb_minioadmin
export AWS_SECRET_ACCESS_KEY=_peerdb_minioadmin
export AWS_REGION=us-east-1
export AWS_ENDPOINT_URL_S3=http://localhost:9001
export CI_CLICKHOUSE_PASSWORD=example
5 changes: 5 additions & 0 deletions local-ci/flow-worker.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#!/bin/sh
cd "$(dirname "$0")"
. ./env
cd ../flow
exec go run . worker
5 changes: 5 additions & 0 deletions local-ci/snapshot-worker.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#!/bin/sh
cd "$(dirname "$0")"
. ./env
cd ../flow
exec go run . snapshot-worker
5 changes: 5 additions & 0 deletions local-ci/test.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#!/bin/sh
cd "$(dirname "$0")"
. ./env
cd ../flow
exec go test -v -parallel 1 -p 1 ./e2e/clickhouse/...
1 change: 1 addition & 0 deletions volumes/postgresql.conf
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
listen_addresses = '*'

max_connections = 1000
wal_level = logical
max_wal_senders = 4
max_replication_slots = 4
Loading