Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 23 additions & 2 deletions e2e-tests/lib/parquet_utils.sh
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,37 @@ retry_rowcount() {
local label="$3"

# CI can override cadence through env vars
local max_retries="${ROWCOUNT_MAX_RETRIES:-5}"
local max_retries="${ROWCOUNT_MAX_RETRIES:-12}"
local sleep_secs="${ROWCOUNT_SLEEP_SECS:-5}"

local retries=0
local raw_count=0
local final_count=0

# JAR discovery logic
if [[ -z "${PARQUET_TOOLS_JAR:-}" ]]; then
# Search up to three levels above this script (first match wins).
local this_dir
this_dir="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"

PARQUET_TOOLS_JAR="$(find "$this_dir"/.. -maxdepth 3 -name 'parquet-tools-*.jar' 2>/dev/null | head -n1)"
PARQUET_TOOLS_JAR="${PARQUET_TOOLS_JAR:-./parquet-tools-1.11.1.jar}" # legacy relative path
echo "E2E TEST DEBUG: using parquet JAR => $PARQUET_TOOLS_JAR" >&2
export PARQUET_TOOLS_JAR
fi

local parquet_tools_jar="$PARQUET_TOOLS_JAR"

if [[ ! -f "$parquet_tools_jar" ]]; then
echo "E2E TEST ERROR: parquet-tools JAR not found at: $parquet_tools_jar" >&2
echo "E2E TEST ERROR: Set PARQUET_TOOLS_JAR environment variable to override." >&2
echo "0"
return
fi

while true; do
# ── 1. Ask parquet-tools for a row count
raw_count=$(java -Xms16g -Xmx16g -jar ./parquet-tools-1.11.1.jar rowcount \
raw_count=$(java -Xms16g -Xmx16g -jar "${parquet_tools_jar}" rowcount \
"${parquet_glob}" 2>/dev/null | awk '{print $3}')

# ── 2. Normalise raw_count
Expand Down
145 changes: 87 additions & 58 deletions e2e-tests/pipeline_validation.sh
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@

set -e

# -------------------------------------------------------------------
# Shared helper for robust Parquet row-count with retry/back-off
# -------------------------------------------------------------------
source "$(dirname "$0")/lib/parquet_utils.sh"

#################################################
# Prints the usage
#################################################
Expand Down Expand Up @@ -65,7 +70,7 @@ function validate_args() {
exit 1
fi

if [[ ! -d ${1}/${2} ]]; then
if [[ ! -d "${1}/${2}" ]]; then
echo "The directory ${1}/${2} does not exist."
usage
exit 1
Expand Down Expand Up @@ -106,21 +111,21 @@ function setup() {
PARQUET_SUBDIR=$2
FHIR_JSON_SUBDIR=$3
SINK_FHIR_SERVER_URL=$4
rm -rf "${HOME_PATH}/${FHIR_JSON_SUBDIR}"
rm -rf "${HOME_PATH}/${PARQUET_SUBDIR}/*.json"
rm -rf "${HOME_PATH:?}/${FHIR_JSON_SUBDIR:?}"
rm -rf "${HOME_PATH}/${PARQUET_SUBDIR}"/*.json
find "${HOME_PATH}/${PARQUET_SUBDIR}" -size 0 -delete
SOURCE_FHIR_SERVER_URL='http://localhost:8091'
STREAMING=""
OPENMRS=""

# TODO: We should refactor this code to parse the arguments by going through
# each one and checking which ones are turned on.
if [[ $5 = "--openmrs" ]] || [[ $6 = "--openmrs" ]] || [[ $7 = "--openmrs" ]]; then
if [[ "${5:-}" = "--openmrs" ]] || [[ "${6:-}" = "--openmrs" ]] || [[ "${7:-}" = "--openmrs" ]]; then
OPENMRS="on"
SOURCE_FHIR_SERVER_URL='http://localhost:8099/openmrs/ws/fhir2/R4'
fi

if [[ $5 = "--use_docker_network" ]] || [[ $6 = "--use_docker_network" ]] || [[ $7 = "--use_docker_network" ]]; then
if [[ "${5:-}" = "--use_docker_network" ]] || [[ "${6:-}" = "--use_docker_network" ]] || [[ "${7:-}" = "--use_docker_network" ]]; then
if [[ -n ${OPENMRS} ]]; then
SOURCE_FHIR_SERVER_URL='http://openmrs:8080/openmrs/ws/fhir2/R4'
else
Expand All @@ -130,7 +135,7 @@ function setup() {

# TODO: the streaming mode is currently not tested as it was removed; we have
# kept this logic around since we may add streaming mode in the Beam pipeline.
if [[ $5 = "--streaming" ]] || [[ $6 = "--streaming" ]] || [[ $7 = "--streaming" ]]; then
if [[ "${5:-}" = "--streaming" ]] || [[ "${6:-}" = "--streaming" ]] || [[ "${7:-}" = "--streaming" ]]; then
STREAMING="on"
fi
}
Expand Down Expand Up @@ -200,65 +205,86 @@ function test_parquet_sink() {
# This global variable is hardcoded to validate the View record count
# which can greater than the number of Resources in the source FHIR
# Server due to flattening
PATIENT_VIEW_ROWCOUNT=528
OBS_VIEW_ROWCOUNT=${TOTAL_TEST_OBS}

local patient_view_expect=528
local obs_view_expect="${TOTAL_TEST_OBS}"

if [[ -n ${OPENMRS} ]]; then
PATIENT_VIEW_ROWCOUNT=108
OBS_VIEW_ROWCOUNT=284379
patient_view_expect=108
obs_view_expect=284379
fi


print_message "Counting number of patients, encounters and obs sinked to parquet files"
local total_patients_streamed=$(java -Xms16g -Xmx16g -jar \
./controller-spark/parquet-tools-1.11.1.jar rowcount "${HOME_PATH}/${PARQUET_SUBDIR}/Patient/" | \
awk '{print $3}')
print_message "Total patients synced to parquet ---> ${total_patients_streamed}"

local total_encounters_streamed=$(java -Xms16g -Xmx16g -jar \
./controller-spark/parquet-tools-1.11.1.jar rowcount "${HOME_PATH}/${PARQUET_SUBDIR}/Encounter/" \
| awk '{print $3}')
print_message "Total encounters synced to parquet ---> ${total_encounters_streamed}"
local total_patients_streamed
total_patients_streamed=$(retry_rowcount \
"${HOME_PATH}/${PARQUET_SUBDIR}/Patient/" \
"${TOTAL_TEST_PATIENTS}" \
"patients") || true
print_message "Total patients synced to Parquet ---> ${total_patients_streamed}"

local total_encounters_streamed
total_encounters_streamed=$(retry_rowcount \
"${HOME_PATH}/${PARQUET_SUBDIR}/Encounter/" \
"${TOTAL_TEST_ENCOUNTERS}" \
"encounters") || true
print_message "Total encounters synced to Parquet ---> ${total_encounters_streamed}"

local total_obs_streamed
total_obs_streamed=$(retry_rowcount \
"${HOME_PATH}/${PARQUET_SUBDIR}/Observation/" \
"${TOTAL_TEST_OBS}" \
"observations") || true
print_message "Total obs synced to Parquet ---> ${total_obs_streamed}"

if [[ -z ${STREAMING} ]]; then
print_message "Parquet Sink Test Non-Streaming mode"

local total_obs_streamed=$(java -Xms16g -Xmx16g -jar ./controller-spark/parquet-tools-1.11.1.jar \
rowcount "${HOME_PATH}/${PARQUET_SUBDIR}/Observation/" | awk '{print $3}')
print_message "Total obs synced to parquet ---> ${total_obs_streamed}"
local total_patient_flat
total_patient_flat=$(retry_rowcount \
"${HOME_PATH}/${PARQUET_SUBDIR}/VIEWS_TIMESTAMP_*/patient_flat/" \
"${patient_view_expect}" \
"patient_flat") || true
print_message "Total patient-flat rows synced ---> ${total_patient_flat}"

local total_encounter_flat
total_encounter_flat=$(retry_rowcount \
"${HOME_PATH}/${PARQUET_SUBDIR}/VIEWS_TIMESTAMP_*/encounter_flat/" \
"${TOTAL_TEST_ENCOUNTERS}" \
"encounter_flat") || true
print_message "Total encounter-flat rows synced ---> ${total_encounter_flat}"

local total_obs_flat
total_obs_flat=$(retry_rowcount \
"${HOME_PATH}/${PARQUET_SUBDIR}/VIEWS_TIMESTAMP_*/observation_flat/" \
"${obs_view_expect}" \
"observation_flat") || true
print_message "Total observation-flat rows synced ---> ${total_obs_flat}"
fi

if [[ ! (-n ${STREAMING}) ]]; then
print_message "Parquet Sink Test Non-Streaming mode"
local total_patient_flat=$(java -Xms16g -Xmx16g -jar \
./controller-spark/parquet-tools-1.11.1.jar rowcount \
"${HOME_PATH}/${PARQUET_SUBDIR}/VIEWS_TIMESTAMP_*/patient_flat/" | \
awk '{print $3}')
print_message "Total patient flat rows synced to parquet ---> ${total_patient_flat}"

local total_encounter_flat=$(java -Xms16g -Xmx16g -jar \
./controller-spark/parquet-tools-1.11.1.jar rowcount \
"${HOME_PATH}/${PARQUET_SUBDIR}/VIEWS_TIMESTAMP_*/encounter_flat/" \
| awk '{print $3}')
print_message "Total encounter flat rows synced to parquet ---> ${total_encounter_flat}"

local total_obs_flat=$(java -Xms16g -Xmx16g -jar \
./controller-spark/parquet-tools-1.11.1.jar rowcount \
"${HOME_PATH}/${PARQUET_SUBDIR}/VIEWS_TIMESTAMP_*/observation_flat/" \
| awk '{print $3}')
print_message "Total observation flat rows synced to parquet ---> ${total_obs_flat}"

if (( total_patients_streamed == TOTAL_TEST_PATIENTS && total_encounters_streamed \
== TOTAL_TEST_ENCOUNTERS && total_obs_streamed == TOTAL_TEST_OBS \
&& total_obs_flat == OBS_VIEW_ROWCOUNT && \
total_patient_flat == PATIENT_VIEW_ROWCOUNT && \
total_encounter_flat == TOTAL_TEST_ENCOUNTERS )) ; then
print_message "PARQUET SINK EXECUTED SUCCESSFULLY USING ${PARQUET_SUBDIR} MODE"
else
print_message "PARQUET SINK TEST FAILED USING ${PARQUET_SUBDIR} MODE"
exit 1
# Success criteria
if [[ -z ${STREAMING} ]]; then
if [[ "${total_patients_streamed}" == "${TOTAL_TEST_PATIENTS}" && \
"${total_encounters_streamed}" == "${TOTAL_TEST_ENCOUNTERS}" && \
"${total_obs_streamed}" == "${TOTAL_TEST_OBS}" && \
"${total_patient_flat}" == "${patient_view_expect}" && \
"${total_encounter_flat}" == "${TOTAL_TEST_ENCOUNTERS}" && \
"${total_obs_flat}" == "${obs_view_expect}" ]]; then
print_message "PARQUET SINK EXECUTED SUCCESSFULLY USING ${PARQUET_SUBDIR} MODE"
else
print_message "PARQUET SINK TEST FAILED USING ${PARQUET_SUBDIR} MODE"
exit 1
fi
elif (( total_patients_streamed == TOTAL_TEST_PATIENTS && total_encounters_streamed \
== TOTAL_TEST_ENCOUNTERS && total_obs_streamed == TOTAL_TEST_OBS )) ; then
print_message "PARQUET SINK EXECUTED SUCCESSFULLY USING ${PARQUET_SUBDIR} MODE"
else
# streaming mode: flat views not produced
if [[ "${total_patients_streamed}" == "${TOTAL_TEST_PATIENTS}" && \
"${total_encounters_streamed}" == "${TOTAL_TEST_ENCOUNTERS}" && \
"${total_obs_streamed}" == "${TOTAL_TEST_OBS}" ]]; then
print_message "PARQUET SINK SUCCESSFUL using ${PARQUET_SUBDIR} mode"
else
print_message "PARQUET SINK TEST FAILED USING ${PARQUET_SUBDIR} MODE"
print_message "PARQUET SINK FAILED using ${PARQUET_SUBDIR} mode"
exit 1
fi
fi
}

Expand Down Expand Up @@ -308,13 +334,16 @@ function test_fhir_sink() {

print_message "Counting number of patients, encounters and obs sinked to fhir files"

local total_patients_sinked_fhir=$(jq '.total' "${HOME_PATH}/${FHIR_JSON_SUBDIR}/fhir/patients.json")
local total_patients_sinked_fhir
total_patients_sinked_fhir=$(jq '.total' "${HOME_PATH}/${FHIR_JSON_SUBDIR}/fhir/patients.json")
print_message "Total patients sinked to fhir ---> ${total_patients_sinked_fhir}"

local total_encounters_sinked_fhir=$(jq '.total' "${HOME_PATH}/${FHIR_JSON_SUBDIR}/fhir/encounters.json")
local total_encounters_sinked_fhir
total_encounters_sinked_fhir=$(jq '.total' "${HOME_PATH}/${FHIR_JSON_SUBDIR}/fhir/encounters.json")
print_message "Total encounters sinked to fhir ---> ${total_encounters_sinked_fhir}"

local total_obs_sinked_fhir=$(jq '.total' "${HOME_PATH}/${FHIR_JSON_SUBDIR}/fhir/obs.json")
local total_obs_sinked_fhir
total_obs_sinked_fhir=$(jq '.total' "${HOME_PATH}/${FHIR_JSON_SUBDIR}/fhir/obs.json")
print_message "Total observations sinked to fhir ---> ${total_obs_sinked_fhir}"

if [[ "${total_patients_sinked_fhir}" == "${TOTAL_TEST_PATIENTS}" && "${total_encounters_sinked_fhir}" \
Expand Down
Loading