Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions e2e-tests/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,21 @@

FROM maven:3.8.7-eclipse-temurin-17-focal

##############################################################################################
# NOTE: This Docker image is designed to run exclusively in Cloud Build environment.
# Where the entire repository is automatically mounted at /workspace.
# Scripts (pipeline_validation.sh, lib/parquet_utils.sh)
# and JARs (parquet-tools-1.11.1.jar) are accessed directly from the mounted repository
# at runtime, so we do **NOT** COPY them into the image.
#
# If you ever need to run this image **outside** Cloud Build, you must bind-mount the repo
# to /workspace or manually copy the required files into the container.
##############################################################################################

RUN apt-get update && apt-get install -y jq python3.8 python3-pip
RUN update-alternatives --install /usr/bin/python3 python3 /usr/bin/python3.8 1
RUN pip3 install virtualenv google-auth requests

COPY pipeline_validation.sh pipeline_validation.sh
COPY controller-spark/parquet-tools-1.11.1.jar parquet-tools-1.11.1.jar
ENV PARQUET_SUBDIR="NON_JDBC"
ENV FHIR_JSON_SUBDIR="FHIR_JSON"
ENV DOCKER_NETWORK="--use_docker_network"
Expand Down
13 changes: 11 additions & 2 deletions e2e-tests/controller-spark/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,21 @@

FROM bitnami/spark:3.3

##############################################################################################
# NOTE: This Docker image is designed to run exclusively in Cloud Build environment.
# Where the entire repository is automatically mounted at /workspace.
# Scripts (controller_spark_sql_validation.sh, lib/parquet_utils.sh)
# and JARs (parquet-tools-1.11.1.jar) are accessed directly from the mounted repository
# at runtime so we do **NOT** COPY them into the image.
#
# If you ever need to run this image **outside** Cloud Build, you must bind-mount the repo
# to /workspace or manually copy the required files into the container.
##############################################################################################

USER root
RUN apt-get update && apt-get install -y jq curl python3 python3-pip
RUN pip3 install virtualenv google-auth requests

COPY controller_spark_sql_validation.sh controller_spark_sql_validation.sh
COPY parquet-tools-1.11.1.jar parquet-tools-1.11.1.jar
ENV PARQUET_SUBDIR="dwh"
ENV DOCKER_NETWORK="--use_docker_network"
ENV HOME_DIR="/workspace/e2e-tests/controller-spark"
Expand Down
15 changes: 9 additions & 6 deletions e2e-tests/controller-spark/controller_spark_sql_validation.sh
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ set -e
# -------------------------------------------------------------------
source "$(dirname "$0")/../lib/parquet_utils.sh"

PARQUET_TOOLS_JAR=""

#################################################
# Prints the usage
#################################################
Expand Down Expand Up @@ -104,6 +106,7 @@ function setup() {
SINK_FHIR_SERVER_URL='http://localhost:8098'
PIPELINE_CONTROLLER_URL='http://localhost:8090'
THRIFTSERVER_URL='localhost:10001'
PARQUET_TOOLS_JAR="${HOME_PATH}/parquet-tools-1.11.1.jar"
if [[ $3 = "--use_docker_network" ]]; then
SOURCE_FHIR_SERVER_URL='http://hapi-server:8080'
SINK_FHIR_SERVER_URL='http://sink-server-controller:8080'
Expand Down Expand Up @@ -252,37 +255,37 @@ function check_parquet() {
total_patients=$(retry_rowcount \
"${output}/*/Patient/" \
"${TOTAL_TEST_PATIENTS}" \
"patients") || true
"${PARQUET_TOOLS_JAR}") || true

local total_encounters
total_encounters=$(retry_rowcount \
"${output}/*/Encounter/" \
"${TOTAL_TEST_ENCOUNTERS}" \
"encounters") || true
"${PARQUET_TOOLS_JAR}") || true

local total_observations
total_observations=$(retry_rowcount \
"${output}/*/Observation/" \
"${TOTAL_TEST_OBS}" \
"observations") || true
"${PARQUET_TOOLS_JAR}") || true

local total_patient_flat
total_patient_flat=$(retry_rowcount \
"${output}/*/VIEWS_TIMESTAMP_*/patient_flat/" \
"${TOTAL_VIEW_PATIENTS}" \
"patient_flat") || true
"${PARQUET_TOOLS_JAR}") || true

local total_encounter_flat
total_encounter_flat=$(retry_rowcount \
"${output}/*/VIEWS_TIMESTAMP_*/encounter_flat/" \
"${TOTAL_TEST_ENCOUNTERS}" \
"encounter_flat") || true
"${PARQUET_TOOLS_JAR}") || true

local total_obs_flat
total_obs_flat=$(retry_rowcount \
"${output}/*/VIEWS_TIMESTAMP_*/observation_flat/" \
"${TOTAL_TEST_OBS}" \
"observation_flat") || true
"${PARQUET_TOOLS_JAR}") || true
# ------------------------------------------------------------------

print_message "Total patients: ${total_patients}"
Expand Down
31 changes: 19 additions & 12 deletions e2e-tests/lib/parquet_utils.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,36 +4,43 @@

set -euo pipefail

# retry_rowcount <path> <expected> <label>
# path – shell glob pointing to a Parquet folder (wildcards allowed).
# The glob is passed verbatim to parquet-tools, which understands
# Hadoop-style wild-cards (e.g. "…/*/Patient/").
# expected – integer row count we expect to see.
# label – short metric name for log messages.
# retry_rowcount <path> <expected> <parquet_jar>
# path – shell glob pointing to a Parquet folder (wildcards allowed).
# The glob is passed verbatim to parquet-tools, which understands
# Hadoop-style wild-cards (e.g. "…/*/Patient/").
# expected – integer row count we expect to see.
# parquet_jar – full path to the parquet-tools JAR file.
#
# Prints the final count on stdout.

retry_rowcount() {
local parquet_glob="$1"
local expected="$2"
local label="$3"
local parquet_tools_jar="$3"

# CI can override cadence through env vars
local max_retries="${ROWCOUNT_MAX_RETRIES:-5}"
local sleep_secs="${ROWCOUNT_SLEEP_SECS:-5}"
local max_retries="${ROWCOUNT_MAX_RETRIES:-15}"
local sleep_secs="${ROWCOUNT_SLEEP_SECS:-20}"

local retries=0
local raw_count=0
local final_count=0

# Verify JAR exists
if [[ ! -f "$parquet_tools_jar" ]]; then
echo "E2E TEST ERROR: parquet-tools JAR not found at: $parquet_tools_jar" >&2
echo "0"
return
fi

while true; do
# ── 1. Ask parquet-tools for a row count
raw_count=$(java -Xms16g -Xmx16g -jar ./parquet-tools-1.11.1.jar rowcount \
raw_count=$(java -Xms16g -Xmx16g -jar "${parquet_tools_jar}" rowcount \
"${parquet_glob}" 2>/dev/null | awk '{print $3}')

# ── 2. Normalise raw_count
if [[ -z "${raw_count}" || ! "${raw_count}" =~ ^[0-9]+$ ]]; then
echo "E2E TEST ERROR: [${label}] parquet-tools returned '${raw_count}' " \
echo "E2E TEST ERROR: [${parquet_glob}] parquet-tools returned '${raw_count}' " \
"(treating as 0)" >&2
final_count=0
else
Expand All @@ -57,7 +64,7 @@ retry_rowcount() {

# ── 6. Sleep & retry
retries=$((retries + 1))
echo "E2E TEST: [${label}] raw=${raw_count}, expected=${expected} — retry ${retries}/${max_retries} in ${sleep_secs}s" >&2
echo "E2E TEST: [${parquet_glob}] raw=${raw_count}, expected=${expected} — retry ${retries}/${max_retries} in ${sleep_secs}s" >&2
sleep "${sleep_secs}"
done
}
148 changes: 90 additions & 58 deletions e2e-tests/pipeline_validation.sh
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,13 @@

set -e

# -------------------------------------------------------------------
# Shared helper for robust Parquet row-count with retry/back-off
# -------------------------------------------------------------------
source "$(dirname "$0")/lib/parquet_utils.sh"

PARQUET_TOOLS_JAR=""

#################################################
# Prints the usage
#################################################
Expand Down Expand Up @@ -65,7 +72,7 @@ function validate_args() {
exit 1
fi

if [[ ! -d ${1}/${2} ]]; then
if [[ ! -d "${1}/${2}" ]]; then
echo "The directory ${1}/${2} does not exist."
usage
exit 1
Expand Down Expand Up @@ -106,21 +113,22 @@ function setup() {
PARQUET_SUBDIR=$2
FHIR_JSON_SUBDIR=$3
SINK_FHIR_SERVER_URL=$4
rm -rf "${HOME_PATH}/${FHIR_JSON_SUBDIR}"
rm -rf "${HOME_PATH}/${PARQUET_SUBDIR}/*.json"
PARQUET_TOOLS_JAR="${HOME_PATH}/controller-spark/parquet-tools-1.11.1.jar"
rm -rf "${HOME_PATH:?}/${FHIR_JSON_SUBDIR:?}"
rm -rf "${HOME_PATH}/${PARQUET_SUBDIR}"/*.json
find "${HOME_PATH}/${PARQUET_SUBDIR}" -size 0 -delete
SOURCE_FHIR_SERVER_URL='http://localhost:8091'
STREAMING=""
OPENMRS=""

# TODO: We should refactor this code to parse the arguments by going through
# each one and checking which ones are turned on.
if [[ $5 = "--openmrs" ]] || [[ $6 = "--openmrs" ]] || [[ $7 = "--openmrs" ]]; then
if [[ "${5:-}" = "--openmrs" ]] || [[ "${6:-}" = "--openmrs" ]] || [[ "${7:-}" = "--openmrs" ]]; then
OPENMRS="on"
SOURCE_FHIR_SERVER_URL='http://localhost:8099/openmrs/ws/fhir2/R4'
fi

if [[ $5 = "--use_docker_network" ]] || [[ $6 = "--use_docker_network" ]] || [[ $7 = "--use_docker_network" ]]; then
if [[ "${5:-}" = "--use_docker_network" ]] || [[ "${6:-}" = "--use_docker_network" ]] || [[ "${7:-}" = "--use_docker_network" ]]; then
if [[ -n ${OPENMRS} ]]; then
SOURCE_FHIR_SERVER_URL='http://openmrs:8080/openmrs/ws/fhir2/R4'
else
Expand All @@ -130,7 +138,7 @@ function setup() {

# TODO: the streaming mode is currently not tested as it was removed; we have
# kept this logic around since we may add streaming mode in the Beam pipeline.
if [[ $5 = "--streaming" ]] || [[ $6 = "--streaming" ]] || [[ $7 = "--streaming" ]]; then
if [[ "${5:-}" = "--streaming" ]] || [[ "${6:-}" = "--streaming" ]] || [[ "${7:-}" = "--streaming" ]]; then
STREAMING="on"
fi
}
Expand Down Expand Up @@ -200,65 +208,86 @@ function test_parquet_sink() {
# This global variable is hardcoded to validate the View record count
# which can greater than the number of Resources in the source FHIR
# Server due to flattening
PATIENT_VIEW_ROWCOUNT=528
OBS_VIEW_ROWCOUNT=${TOTAL_TEST_OBS}

local patient_view_expect=528
local obs_view_expect="${TOTAL_TEST_OBS}"

if [[ -n ${OPENMRS} ]]; then
PATIENT_VIEW_ROWCOUNT=108
OBS_VIEW_ROWCOUNT=284379
patient_view_expect=108
obs_view_expect=284379
fi


print_message "Counting number of patients, encounters and obs sinked to parquet files"
local total_patients_streamed=$(java -Xms16g -Xmx16g -jar \
./controller-spark/parquet-tools-1.11.1.jar rowcount "${HOME_PATH}/${PARQUET_SUBDIR}/Patient/" | \
awk '{print $3}')
print_message "Total patients synced to parquet ---> ${total_patients_streamed}"

local total_encounters_streamed=$(java -Xms16g -Xmx16g -jar \
./controller-spark/parquet-tools-1.11.1.jar rowcount "${HOME_PATH}/${PARQUET_SUBDIR}/Encounter/" \
| awk '{print $3}')
print_message "Total encounters synced to parquet ---> ${total_encounters_streamed}"
local total_patients_streamed
total_patients_streamed=$(retry_rowcount \
"${HOME_PATH}/${PARQUET_SUBDIR}/Patient/" \
"${TOTAL_TEST_PATIENTS}" \
"${PARQUET_TOOLS_JAR}") || true
print_message "Total patients synced to Parquet ---> ${total_patients_streamed}"

local total_encounters_streamed
total_encounters_streamed=$(retry_rowcount \
"${HOME_PATH}/${PARQUET_SUBDIR}/Encounter/" \
"${TOTAL_TEST_ENCOUNTERS}" \
"${PARQUET_TOOLS_JAR}") || true
print_message "Total encounters synced to Parquet ---> ${total_encounters_streamed}"

local total_obs_streamed
total_obs_streamed=$(retry_rowcount \
"${HOME_PATH}/${PARQUET_SUBDIR}/Observation/" \
"${TOTAL_TEST_OBS}" \
"${PARQUET_TOOLS_JAR}") || true
print_message "Total obs synced to Parquet ---> ${total_obs_streamed}"

if [[ -z ${STREAMING} ]]; then
print_message "Parquet Sink Test Non-Streaming mode"

local total_obs_streamed=$(java -Xms16g -Xmx16g -jar ./controller-spark/parquet-tools-1.11.1.jar \
rowcount "${HOME_PATH}/${PARQUET_SUBDIR}/Observation/" | awk '{print $3}')
print_message "Total obs synced to parquet ---> ${total_obs_streamed}"
local total_patient_flat
total_patient_flat=$(retry_rowcount \
"${HOME_PATH}/${PARQUET_SUBDIR}/VIEWS_TIMESTAMP_*/patient_flat/" \
"${patient_view_expect}" \
"${PARQUET_TOOLS_JAR}") || true
print_message "Total patient-flat rows synced ---> ${total_patient_flat}"

local total_encounter_flat
total_encounter_flat=$(retry_rowcount \
"${HOME_PATH}/${PARQUET_SUBDIR}/VIEWS_TIMESTAMP_*/encounter_flat/" \
"${TOTAL_TEST_ENCOUNTERS}" \
"${PARQUET_TOOLS_JAR}") || true
print_message "Total encounter-flat rows synced ---> ${total_encounter_flat}"

local total_obs_flat
total_obs_flat=$(retry_rowcount \
"${HOME_PATH}/${PARQUET_SUBDIR}/VIEWS_TIMESTAMP_*/observation_flat/" \
"${obs_view_expect}" \
"${PARQUET_TOOLS_JAR}") || true
print_message "Total observation-flat rows synced ---> ${total_obs_flat}"
fi

if [[ ! (-n ${STREAMING}) ]]; then
print_message "Parquet Sink Test Non-Streaming mode"
local total_patient_flat=$(java -Xms16g -Xmx16g -jar \
./controller-spark/parquet-tools-1.11.1.jar rowcount \
"${HOME_PATH}/${PARQUET_SUBDIR}/VIEWS_TIMESTAMP_*/patient_flat/" | \
awk '{print $3}')
print_message "Total patient flat rows synced to parquet ---> ${total_patient_flat}"

local total_encounter_flat=$(java -Xms16g -Xmx16g -jar \
./controller-spark/parquet-tools-1.11.1.jar rowcount \
"${HOME_PATH}/${PARQUET_SUBDIR}/VIEWS_TIMESTAMP_*/encounter_flat/" \
| awk '{print $3}')
print_message "Total encounter flat rows synced to parquet ---> ${total_encounter_flat}"

local total_obs_flat=$(java -Xms16g -Xmx16g -jar \
./controller-spark/parquet-tools-1.11.1.jar rowcount \
"${HOME_PATH}/${PARQUET_SUBDIR}/VIEWS_TIMESTAMP_*/observation_flat/" \
| awk '{print $3}')
print_message "Total observation flat rows synced to parquet ---> ${total_obs_flat}"

if (( total_patients_streamed == TOTAL_TEST_PATIENTS && total_encounters_streamed \
== TOTAL_TEST_ENCOUNTERS && total_obs_streamed == TOTAL_TEST_OBS \
&& total_obs_flat == OBS_VIEW_ROWCOUNT && \
total_patient_flat == PATIENT_VIEW_ROWCOUNT && \
total_encounter_flat == TOTAL_TEST_ENCOUNTERS )) ; then
print_message "PARQUET SINK EXECUTED SUCCESSFULLY USING ${PARQUET_SUBDIR} MODE"
else
print_message "PARQUET SINK TEST FAILED USING ${PARQUET_SUBDIR} MODE"
exit 1
# Success criteria
if [[ -z ${STREAMING} ]]; then
if [[ "${total_patients_streamed}" == "${TOTAL_TEST_PATIENTS}" && \
"${total_encounters_streamed}" == "${TOTAL_TEST_ENCOUNTERS}" && \
"${total_obs_streamed}" == "${TOTAL_TEST_OBS}" && \
"${total_patient_flat}" == "${patient_view_expect}" && \
"${total_encounter_flat}" == "${TOTAL_TEST_ENCOUNTERS}" && \
"${total_obs_flat}" == "${obs_view_expect}" ]]; then
print_message "PARQUET SINK EXECUTED SUCCESSFULLY USING ${PARQUET_SUBDIR} MODE"
else
print_message "PARQUET SINK TEST FAILED USING ${PARQUET_SUBDIR} MODE"
exit 1
fi
elif (( total_patients_streamed == TOTAL_TEST_PATIENTS && total_encounters_streamed \
== TOTAL_TEST_ENCOUNTERS && total_obs_streamed == TOTAL_TEST_OBS )) ; then
print_message "PARQUET SINK EXECUTED SUCCESSFULLY USING ${PARQUET_SUBDIR} MODE"
else
# streaming mode: flat views not produced
if [[ "${total_patients_streamed}" == "${TOTAL_TEST_PATIENTS}" && \
"${total_encounters_streamed}" == "${TOTAL_TEST_ENCOUNTERS}" && \
"${total_obs_streamed}" == "${TOTAL_TEST_OBS}" ]]; then
print_message "PARQUET SINK SUCCESSFUL using ${PARQUET_SUBDIR} mode"
else
print_message "PARQUET SINK TEST FAILED USING ${PARQUET_SUBDIR} MODE"
print_message "PARQUET SINK FAILED using ${PARQUET_SUBDIR} mode"
exit 1
fi
fi
}

Expand Down Expand Up @@ -308,13 +337,16 @@ function test_fhir_sink() {

print_message "Counting number of patients, encounters and obs sinked to fhir files"

local total_patients_sinked_fhir=$(jq '.total' "${HOME_PATH}/${FHIR_JSON_SUBDIR}/fhir/patients.json")
local total_patients_sinked_fhir
total_patients_sinked_fhir=$(jq '.total' "${HOME_PATH}/${FHIR_JSON_SUBDIR}/fhir/patients.json")
print_message "Total patients sinked to fhir ---> ${total_patients_sinked_fhir}"

local total_encounters_sinked_fhir=$(jq '.total' "${HOME_PATH}/${FHIR_JSON_SUBDIR}/fhir/encounters.json")
local total_encounters_sinked_fhir
total_encounters_sinked_fhir=$(jq '.total' "${HOME_PATH}/${FHIR_JSON_SUBDIR}/fhir/encounters.json")
print_message "Total encounters sinked to fhir ---> ${total_encounters_sinked_fhir}"

local total_obs_sinked_fhir=$(jq '.total' "${HOME_PATH}/${FHIR_JSON_SUBDIR}/fhir/obs.json")
local total_obs_sinked_fhir
total_obs_sinked_fhir=$(jq '.total' "${HOME_PATH}/${FHIR_JSON_SUBDIR}/fhir/obs.json")
print_message "Total observations sinked to fhir ---> ${total_obs_sinked_fhir}"

if [[ "${total_patients_sinked_fhir}" == "${TOTAL_TEST_PATIENTS}" && "${total_encounters_sinked_fhir}" \
Expand Down
Loading