Skip to content

Commit 85fab33

Browse files
committed
#1394 : added retry for pipeline_validation.sh
1 parent f016234 commit 85fab33

File tree

1 file changed

+84
-55
lines changed

1 file changed

+84
-55
lines changed

e2e-tests/pipeline_validation.sh

Lines changed: 84 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,11 @@
2121

2222
set -e
2323

24+
# -------------------------------------------------------------------
25+
# Shared helper for robust Parquet row-count with retry/back-off
26+
# -------------------------------------------------------------------
27+
source "$(dirname "$0")/lib/parquet_utils.sh"
28+
2429
#################################################
2530
# Prints the usage
2631
#################################################
@@ -65,7 +70,7 @@ function validate_args() {
6570
exit 1
6671
fi
6772

68-
if [[ ! -d ${1}/${2} ]]; then
73+
if [[ ! -d "${1}/${2}" ]]; then
6974
echo "The directory ${1}/${2} does not exist."
7075
usage
7176
exit 1
@@ -106,8 +111,8 @@ function setup() {
106111
PARQUET_SUBDIR=$2
107112
FHIR_JSON_SUBDIR=$3
108113
SINK_FHIR_SERVER_URL=$4
109-
rm -rf "${HOME_PATH}/${FHIR_JSON_SUBDIR}"
110-
rm -rf "${HOME_PATH}/${PARQUET_SUBDIR}/*.json"
114+
rm -rf "${HOME_PATH:?}/${FHIR_JSON_SUBDIR:?}"
115+
rm -rf "${HOME_PATH}/${PARQUET_SUBDIR}"/*.json
111116
find "${HOME_PATH}/${PARQUET_SUBDIR}" -size 0 -delete
112117
SOURCE_FHIR_SERVER_URL='http://localhost:8091'
113118
STREAMING=""
@@ -200,65 +205,86 @@ function test_parquet_sink() {
200205
# This global variable is hardcoded to validate the View record count
201206
# which can greater than the number of Resources in the source FHIR
202207
# Server due to flattening
203-
PATIENT_VIEW_ROWCOUNT=528
204-
OBS_VIEW_ROWCOUNT=${TOTAL_TEST_OBS}
208+
209+
local patient_view_expect=528
210+
local obs_view_expect="${TOTAL_TEST_OBS}"
211+
205212
if [[ -n ${OPENMRS} ]]; then
206-
PATIENT_VIEW_ROWCOUNT=108
207-
OBS_VIEW_ROWCOUNT=284379
213+
patient_view_expect=108
214+
obs_view_expect=284379
208215
fi
209216

210-
211217
print_message "Counting number of patients, encounters and obs sinked to parquet files"
212-
local total_patients_streamed=$(java -Xms16g -Xmx16g -jar \
213-
./controller-spark/parquet-tools-1.11.1.jar rowcount "${HOME_PATH}/${PARQUET_SUBDIR}/Patient/" | \
214-
awk '{print $3}')
215-
print_message "Total patients synced to parquet ---> ${total_patients_streamed}"
216218

217-
local total_encounters_streamed=$(java -Xms16g -Xmx16g -jar \
218-
./controller-spark/parquet-tools-1.11.1.jar rowcount "${HOME_PATH}/${PARQUET_SUBDIR}/Encounter/" \
219-
| awk '{print $3}')
220-
print_message "Total encounters synced to parquet ---> ${total_encounters_streamed}"
219+
local total_patients_streamed
220+
total_patients_streamed=$(retry_rowcount \
221+
"${HOME_PATH}/${PARQUET_SUBDIR}/Patient/" \
222+
"${TOTAL_TEST_PATIENTS}" \
223+
"patients") || true
224+
print_message "Total patients synced to Parquet ---> ${total_patients_streamed}"
225+
226+
local total_encounters_streamed
227+
total_encounters_streamed=$(retry_rowcount \
228+
"${HOME_PATH}/${PARQUET_SUBDIR}/Encounter/" \
229+
"${TOTAL_TEST_ENCOUNTERS}" \
230+
"encounters") || true
231+
print_message "Total encounters synced to Parquet ---> ${total_encounters_streamed}"
232+
233+
local total_obs_streamed
234+
total_obs_streamed=$(retry_rowcount \
235+
"${HOME_PATH}/${PARQUET_SUBDIR}/Observation/" \
236+
"${TOTAL_TEST_OBS}" \
237+
"observations") || true
238+
print_message "Total obs synced to Parquet ---> ${total_obs_streamed}"
239+
240+
if [[ -z ${STREAMING} ]]; then
241+
print_message "Parquet Sink Test Non-Streaming mode"
221242

222-
local total_obs_streamed=$(java -Xms16g -Xmx16g -jar ./controller-spark/parquet-tools-1.11.1.jar \
223-
rowcount "${HOME_PATH}/${PARQUET_SUBDIR}/Observation/" | awk '{print $3}')
224-
print_message "Total obs synced to parquet ---> ${total_obs_streamed}"
243+
local total_patient_flat
244+
total_patient_flat=$(retry_rowcount \
245+
"${HOME_PATH}/${PARQUET_SUBDIR}/VIEWS_TIMESTAMP_*/patient_flat/" \
246+
"${patient_view_expect}" \
247+
"patient_flat") || true
248+
print_message "Total patient-flat rows synced ---> ${total_patient_flat}"
249+
250+
local total_encounter_flat
251+
total_encounter_flat=$(retry_rowcount \
252+
"${HOME_PATH}/${PARQUET_SUBDIR}/VIEWS_TIMESTAMP_*/encounter_flat/" \
253+
"${TOTAL_TEST_ENCOUNTERS}" \
254+
"encounter_flat") || true
255+
print_message "Total encounter-flat rows synced ---> ${total_encounter_flat}"
256+
257+
local total_obs_flat
258+
total_obs_flat=$(retry_rowcount \
259+
"${HOME_PATH}/${PARQUET_SUBDIR}/VIEWS_TIMESTAMP_*/observation_flat/" \
260+
"${obs_view_expect}" \
261+
"observation_flat") || true
262+
print_message "Total observation-flat rows synced ---> ${total_obs_flat}"
263+
fi
225264

226-
if [[ ! (-n ${STREAMING}) ]]; then
227-
print_message "Parquet Sink Test Non-Streaming mode"
228-
local total_patient_flat=$(java -Xms16g -Xmx16g -jar \
229-
./controller-spark/parquet-tools-1.11.1.jar rowcount \
230-
"${HOME_PATH}/${PARQUET_SUBDIR}/VIEWS_TIMESTAMP_*/patient_flat/" | \
231-
awk '{print $3}')
232-
print_message "Total patient flat rows synced to parquet ---> ${total_patient_flat}"
233-
234-
local total_encounter_flat=$(java -Xms16g -Xmx16g -jar \
235-
./controller-spark/parquet-tools-1.11.1.jar rowcount \
236-
"${HOME_PATH}/${PARQUET_SUBDIR}/VIEWS_TIMESTAMP_*/encounter_flat/" \
237-
| awk '{print $3}')
238-
print_message "Total encounter flat rows synced to parquet ---> ${total_encounter_flat}"
239-
240-
local total_obs_flat=$(java -Xms16g -Xmx16g -jar \
241-
./controller-spark/parquet-tools-1.11.1.jar rowcount \
242-
"${HOME_PATH}/${PARQUET_SUBDIR}/VIEWS_TIMESTAMP_*/observation_flat/" \
243-
| awk '{print $3}')
244-
print_message "Total observation flat rows synced to parquet ---> ${total_obs_flat}"
245-
246-
if (( total_patients_streamed == TOTAL_TEST_PATIENTS && total_encounters_streamed \
247-
== TOTAL_TEST_ENCOUNTERS && total_obs_streamed == TOTAL_TEST_OBS \
248-
&& total_obs_flat == OBS_VIEW_ROWCOUNT && \
249-
total_patient_flat == PATIENT_VIEW_ROWCOUNT && \
250-
total_encounter_flat == TOTAL_TEST_ENCOUNTERS )) ; then
251-
print_message "PARQUET SINK EXECUTED SUCCESSFULLY USING ${PARQUET_SUBDIR} MODE"
252-
else
253-
print_message "PARQUET SINK TEST FAILED USING ${PARQUET_SUBDIR} MODE"
254-
exit 1
265+
# Success criteria
266+
if [[ -z ${STREAMING} ]]; then
267+
if [[ "${total_patients_streamed}" == "${TOTAL_TEST_PATIENTS}" && \
268+
"${total_encounters_streamed}" == "${TOTAL_TEST_ENCOUNTERS}" && \
269+
"${total_obs_streamed}" == "${TOTAL_TEST_OBS}" && \
270+
"${total_patient_flat}" == "${patient_view_expect}" && \
271+
"${total_encounter_flat}" == "${TOTAL_TEST_ENCOUNTERS}" && \
272+
"${total_obs_flat}" == "${obs_view_expect}" ]]; then
273+
print_message "PARQUET SINK EXECUTED SUCCESSFULLY USING ${PARQUET_SUBDIR} MODE"
274+
else
275+
print_message "PARQUET SINK TEST FAILED USING ${PARQUET_SUBDIR} MODE"
276+
exit 1
255277
fi
256-
elif (( total_patients_streamed == TOTAL_TEST_PATIENTS && total_encounters_streamed \
257-
== TOTAL_TEST_ENCOUNTERS && total_obs_streamed == TOTAL_TEST_OBS )) ; then
258-
print_message "PARQUET SINK EXECUTED SUCCESSFULLY USING ${PARQUET_SUBDIR} MODE"
278+
else
279+
# streaming mode: flat views not produced
280+
if [[ "${total_patients_streamed}" == "${TOTAL_TEST_PATIENTS}" && \
281+
"${total_encounters_streamed}" == "${TOTAL_TEST_ENCOUNTERS}" && \
282+
"${total_obs_streamed}" == "${TOTAL_TEST_OBS}" ]]; then
283+
print_message "PARQUET SINK SUCCESSFUL using ${PARQUET_SUBDIR} mode"
259284
else
260-
print_message "PARQUET SINK TEST FAILED USING ${PARQUET_SUBDIR} MODE"
285+
print_message "PARQUET SINK FAILED using ${PARQUET_SUBDIR} mode"
261286
exit 1
287+
fi
262288
fi
263289
}
264290

@@ -308,13 +334,16 @@ function test_fhir_sink() {
308334

309335
print_message "Counting number of patients, encounters and obs sinked to fhir files"
310336

311-
local total_patients_sinked_fhir=$(jq '.total' "${HOME_PATH}/${FHIR_JSON_SUBDIR}/fhir/patients.json")
337+
local total_patients_sinked_fhir
338+
total_patients_sinked_fhir=$(jq '.total' "${HOME_PATH}/${FHIR_JSON_SUBDIR}/fhir/patients.json")
312339
print_message "Total patients sinked to fhir ---> ${total_patients_sinked_fhir}"
313340

314-
local total_encounters_sinked_fhir=$(jq '.total' "${HOME_PATH}/${FHIR_JSON_SUBDIR}/fhir/encounters.json")
341+
local total_encounters_sinked_fhir
342+
total_encounters_sinked_fhir=$(jq '.total' "${HOME_PATH}/${FHIR_JSON_SUBDIR}/fhir/encounters.json")
315343
print_message "Total encounters sinked to fhir ---> ${total_encounters_sinked_fhir}"
316344

317-
local total_obs_sinked_fhir=$(jq '.total' "${HOME_PATH}/${FHIR_JSON_SUBDIR}/fhir/obs.json")
345+
local total_obs_sinked_fhir
346+
total_encounters_sinked_fhir=$(jq '.total' "${HOME_PATH}/${FHIR_JSON_SUBDIR}/fhir/obs.json")
318347
print_message "Total observations sinked to fhir ---> ${total_obs_sinked_fhir}"
319348

320349
if [[ "${total_patients_sinked_fhir}" == "${TOTAL_TEST_PATIENTS}" && "${total_encounters_sinked_fhir}" \

0 commit comments

Comments
 (0)