Skip to content

Commit dce532a

Browse files
authored
Merge pull request #611 from cmu-delphi/covidcast-logging
Covidcast logging
2 parents 81951e0 + 5b7ae88 commit dce532a

File tree

7 files changed

+53
-50
lines changed

7 files changed

+53
-50
lines changed

src/acquisition/covidcast/covidcast_meta_cache_updater.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,18 +48,18 @@ def main(args, epidata_impl=Epidata, database_impl=Database):
4848
if len(metadata)==0:
4949
args = ("no results",-2)
5050

51-
print('covidcast_meta result: %s (code %d)' % args)
51+
logger.info('covidcast_meta result: %s (code %d)' % args)
5252

5353
if args[-1] != 1:
54-
print('unable to cache epidata')
54+
logger.error('unable to cache epidata')
5555
return False
5656

5757
# update the cache
5858
try:
5959
metadata_update_start_time = time.time()
6060
database.update_covidcast_meta_cache(metadata)
6161
metadata_update_interval_in_seconds = time.time() - metadata_update_start_time
62-
print('successfully cached epidata')
62+
logger.info('successfully cached epidata')
6363
finally:
6464
# no catch block so that an exception above will cause the program to
6565
# fail after the following cleanup

src/acquisition/covidcast/csv_importer.py

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
# first party
1515
from delphi_utils import Nans
1616
from delphi.utils.epiweek import delta_epiweeks
17+
from delphi.epidata.acquisition.covidcast.logger import get_structured_logger
1718

1819
class CsvImporter:
1920
"""Finds and parses covidcast CSV files."""
@@ -84,16 +85,17 @@ def is_sane_week(value):
8485

8586
@staticmethod
8687
def find_issue_specific_csv_files(scan_dir, glob=glob):
88+
logger = get_structured_logger('find_issue_specific_csv_files')
8789
for path in sorted(glob.glob(os.path.join(scan_dir, '*'))):
8890
issuedir_match = CsvImporter.PATTERN_ISSUE_DIR.match(path.lower())
8991
if issuedir_match and os.path.isdir(path):
9092
issue_date_value = int(issuedir_match.group(2))
9193
issue_date = CsvImporter.is_sane_day(issue_date_value)
9294
if issue_date:
93-
print(' processing csv files from issue date: "' + str(issue_date) + '", directory', path)
95+
logger.info('processing csv files from issue date: "' + str(issue_date) + '", directory', path)
9496
yield from CsvImporter.find_csv_files(path, issue=(issue_date, epi.Week.fromdate(issue_date)), glob=glob)
9597
else:
96-
print(' invalid issue directory day', issue_date_value)
98+
logger.warning(event='invalid issue directory day', detail=issue_date_value, file=path)
9799

98100
@staticmethod
99101
def find_csv_files(scan_dir, issue=(date.today(), epi.Week.fromdate(date.today())), glob=glob):
@@ -105,7 +107,7 @@ def find_csv_files(scan_dir, issue=(date.today(), epi.Week.fromdate(date.today()
105107
valid, details is a tuple of (source, signal, time_type, geo_type,
106108
time_value, issue, lag) (otherwise None).
107109
"""
108-
110+
logger = get_structured_logger('find_csv_files')
109111
issue_day,issue_epiweek=issue
110112
issue_day_value=int(issue_day.strftime("%Y%m%d"))
111113
issue_epiweek_value=int(str(issue_epiweek))
@@ -117,14 +119,11 @@ def find_csv_files(scan_dir, issue=(date.today(), epi.Week.fromdate(date.today()
117119
if not path.lower().endswith('.csv'):
118120
# safe to ignore this file
119121
continue
120-
121-
print('file:', path)
122-
123122
# match a daily or weekly naming pattern
124123
daily_match = CsvImporter.PATTERN_DAILY.match(path.lower())
125124
weekly_match = CsvImporter.PATTERN_WEEKLY.match(path.lower())
126125
if not daily_match and not weekly_match:
127-
print(' invalid csv path/filename', path)
126+
logger.warning(event='invalid csv path/filename', detail=path, file=path)
128127
yield (path, None)
129128
continue
130129

@@ -135,7 +134,7 @@ def find_csv_files(scan_dir, issue=(date.today(), epi.Week.fromdate(date.today()
135134
match = daily_match
136135
time_value_day = CsvImporter.is_sane_day(time_value)
137136
if not time_value_day:
138-
print(' invalid filename day', time_value)
137+
logger.warning(event='invalid filename day', detail=time_value, file=path)
139138
yield (path, None)
140139
continue
141140
issue_value=issue_day_value
@@ -146,7 +145,7 @@ def find_csv_files(scan_dir, issue=(date.today(), epi.Week.fromdate(date.today()
146145
match = weekly_match
147146
time_value_week=CsvImporter.is_sane_week(time_value)
148147
if not time_value_week:
149-
print(' invalid filename week', time_value)
148+
logger.warning(event='invalid filename week', detail=time_value, file=path)
150149
yield (path, None)
151150
continue
152151
issue_value=issue_epiweek_value
@@ -155,15 +154,15 @@ def find_csv_files(scan_dir, issue=(date.today(), epi.Week.fromdate(date.today()
155154
# # extract and validate geographic resolution
156155
geo_type = match.group(3).lower()
157156
if geo_type not in CsvImporter.GEOGRAPHIC_RESOLUTIONS:
158-
print(' invalid geo_type', geo_type)
157+
logger.warning(event='invalid geo_type', detail=geo_type, file=path)
159158
yield (path, None)
160159
continue
161160

162161
# extract additional values, lowercased for consistency
163162
source = match.group(1).lower()
164163
signal = match.group(4).lower()
165164
if len(signal) > 64:
166-
print(' invalid signal name (64 char limit)',signal)
165+
logger.warning(event='invalid signal name (64 char limit)',detail=signal, file=path)
167166
yield (path, None)
168167
continue
169168

@@ -344,19 +343,19 @@ def load_csv(filepath, geo_type, pandas=pandas):
344343
In case of a validation error, `None` is yielded for the offending row,
345344
including the header.
346345
"""
347-
346+
logger = get_structured_logger('load_csv')
348347
# don't use type inference, just get strings
349348
table = pandas.read_csv(filepath, dtype='str')
350349

351350
if not CsvImporter.is_header_valid(table.columns):
352-
print(' invalid header')
351+
logger.warning(event='invalid header', detail=table.columns, file=filepath)
353352
yield None
354353
return
355354

356355
for row in table.itertuples(index=False):
357356
row_values, error = CsvImporter.extract_and_check_row(row, geo_type)
358357
if error:
359-
print(' invalid value for %s (%s)' % (str(row), error))
358+
logger.warning(event = 'invalid value for row', detail=(str(row), error), file=filepath)
360359
yield None
361360
continue
362361
yield row_values

src/acquisition/covidcast/csv_to_database.py

Lines changed: 21 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -37,25 +37,25 @@ def get_argument_parser():
3737
help="filename for log output (defaults to stdout)")
3838
return parser
3939

40-
def collect_files(data_dir, specific_issue_date, csv_importer_impl=CsvImporter):
40+
def collect_files(data_dir, specific_issue_date,csv_importer_impl=CsvImporter):
4141
"""Fetch path and data profile details for each file to upload."""
42-
42+
logger= get_structured_logger('collect_files')
4343
if specific_issue_date:
4444
results = list(csv_importer_impl.find_issue_specific_csv_files(data_dir))
4545
else:
4646
results = list(csv_importer_impl.find_csv_files(os.path.join(data_dir, 'receiving')))
47-
print(f'found {len(results)} files')
47+
logger.info(f'found {len(results)} files')
4848
return results
4949

5050
def make_handlers(data_dir, specific_issue_date, file_archiver_impl=FileArchiver):
5151
if specific_issue_date:
5252
# issue-specific uploads are always one-offs, so we can leave all
5353
# files in place without worrying about cleaning up
54-
def handle_failed(path_src, filename, source):
55-
print(f'leaving failed file alone - {source}')
54+
def handle_failed(path_src, filename, source, logger):
55+
logger.info(event='leaving failed file alone', dest=source, file=filename)
5656

57-
def handle_successful(path_src, filename, source):
58-
print('archiving as successful')
57+
def handle_successful(path_src, filename, source, logger):
58+
logger.info(event='archiving as successful',file=filename)
5959
file_archiver_impl.archive_inplace(path_src, filename)
6060
else:
6161
# normal automation runs require some shuffling to remove files
@@ -64,15 +64,15 @@ def handle_successful(path_src, filename, source):
6464
archive_failed_dir = os.path.join(data_dir, 'archive', 'failed')
6565

6666
# helper to archive a failed file without compression
67-
def handle_failed(path_src, filename, source):
68-
print('archiving as failed - '+source)
67+
def handle_failed(path_src, filename, source, logger):
68+
logger.info(event='archiving as failed - ', detail=source, file=filename)
6969
path_dst = os.path.join(archive_failed_dir, source)
7070
compress = False
7171
file_archiver_impl.archive_file(path_src, path_dst, filename, compress)
7272

7373
# helper to archive a successful file with compression
74-
def handle_successful(path_src, filename, source):
75-
print('archiving as successful')
74+
def handle_successful(path_src, filename, source, logger):
75+
logger.info(event='archiving as successful',file=filename)
7676
path_dst = os.path.join(archive_successful_dir, source)
7777
compress = True
7878
file_archiver_impl.archive_file(path_src, path_dst, filename, compress)
@@ -101,15 +101,14 @@ def upload_archive(
101101
"""
102102
archive_as_successful, archive_as_failed = handlers
103103
total_modified_row_count = 0
104-
105104
# iterate over each file
106105
for path, details in path_details:
107-
print('handling ', path)
106+
logger.info(event='handling',dest=path)
108107
path_src, filename = os.path.split(path)
109108

110109
if not details:
111110
# file path or name was invalid, source is unknown
112-
archive_as_failed(path_src, filename, 'unknown')
111+
archive_as_failed(path_src, filename, 'unknown',logger)
113112
continue
114113

115114
(source, signal, time_type, geo_type, time_value, issue, lag) = details
@@ -130,7 +129,7 @@ def upload_archive(
130129
if all_rows_valid:
131130
try:
132131
modified_row_count = database.insert_or_update_bulk(rows_list)
133-
print(f"insert_or_update_bulk {filename} returned {modified_row_count}")
132+
logger.info(f"insert_or_update_bulk {filename} returned {modified_row_count}")
134133
logger.info(
135134
"Inserted database rows",
136135
row_count = modified_row_count,
@@ -145,14 +144,14 @@ def upload_archive(
145144
database.commit()
146145
except Exception as e:
147146
all_rows_valid = False
148-
print('exception while inserting rows:', e)
147+
logger.exception('exception while inserting rows:', e)
149148
database.rollback()
150149

151150
# archive the current file based on validation results
152151
if all_rows_valid:
153-
archive_as_successful(path_src, filename, source)
152+
archive_as_successful(path_src, filename, source, logger)
154153
else:
155-
archive_as_failed(path_src, filename, source)
154+
archive_as_failed(path_src, filename, source,logger)
156155

157156
return total_modified_row_count
158157

@@ -168,7 +167,7 @@ def main(
168167
start_time = time.time()
169168

170169
if args.is_wip_override and args.not_wip_override:
171-
print('conflicting overrides for forcing WIP option! exiting...')
170+
logger.error('conflicting overrides for forcing WIP option! exiting...')
172171
return
173172
wip_override = None
174173
if args.is_wip_override:
@@ -179,7 +178,7 @@ def main(
179178
# shortcut escape without hitting db if nothing to do
180179
path_details = collect_files_impl(args.data_dir, args.specific_issue_date)
181180
if not path_details:
182-
print('nothing to do; exiting...')
181+
logger.info('nothing to do; exiting...')
183182
return
184183

185184
logger.info("Ingesting CSVs", csv_count = len(path_details))
@@ -195,7 +194,8 @@ def main(
195194
logger,
196195
is_wip_override=wip_override)
197196
logger.info("Finished inserting database rows", row_count = modified_row_count)
198-
print('inserted/updated %d rows' % modified_row_count)
197+
# the following print statement serves the same function as the logger.info call above
198+
# print('inserted/updated %d rows' % modified_row_count)
199199
finally:
200200
# unconditionally commit database changes since CSVs have been archived
201201
database.disconnect(True)

src/acquisition/covidcast/database.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
# first party
1717
import delphi.operations.secrets as secrets
1818

19+
from delphi.epidata.acquisition.covidcast.logger import get_structured_logger
1920

2021
class CovidcastRow():
2122
"""A container for all the values of a single covidcast row."""
@@ -248,7 +249,7 @@ def insert_or_update_batch(self, cc_rows, batch_size=2**20, commit_partial=False
248249

249250
def compute_covidcast_meta(self, table_name='covidcast', use_index=True):
250251
"""Compute and return metadata on all non-WIP COVIDcast signals."""
251-
252+
logger = get_structured_logger("compute_covidcast_meta")
252253
index_hint = ""
253254
if use_index:
254255
index_hint = "USE INDEX (for_metadata)"
@@ -304,7 +305,7 @@ def compute_covidcast_meta(self, table_name='covidcast', use_index=True):
304305
meta_lock = threading.Lock()
305306

306307
def worker():
307-
print("starting thread: " + threading.current_thread().name)
308+
logger.info("starting thread: " + threading.current_thread().name)
308309
# set up new db connection for thread
309310
worker_dbc = Database()
310311
worker_dbc.connect(connector_impl=self._connector_impl)
@@ -319,7 +320,7 @@ def worker():
319320
))
320321
srcsigs.task_done()
321322
except Empty:
322-
print("no jobs left, thread terminating: " + threading.current_thread().name)
323+
logger.info("no jobs left, thread terminating: " + threading.current_thread().name)
323324
finally:
324325
worker_dbc.disconnect(False) # cleanup
325326

@@ -330,10 +331,10 @@ def worker():
330331
threads.append(t)
331332

332333
srcsigs.join()
333-
print("jobs complete")
334+
logger.info("jobs complete")
334335
for t in threads:
335336
t.join()
336-
print("threads terminated")
337+
logger.error("threads terminated")
337338

338339
# sort the metadata because threaded workers dgaf
339340
sorting_fields = "data_source signal time_type geo_type".split()

src/acquisition/covidcast/file_archiver.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
import os
66
import shutil
77

8+
# first party
9+
from delphi.epidata.acquisition.covidcast.logger import get_structured_logger
810

911
class FileArchiver:
1012
"""Archives files by moving and compressing."""
@@ -40,6 +42,7 @@ def archive_file(
4042
file already exists, it will be overwritten.
4143
"""
4244

45+
logger = get_structured_logger("file_archiver")
4346
src = os.path.join(path_src, filename)
4447
dst = os.path.join(path_dst, filename)
4548

@@ -51,7 +54,7 @@ def archive_file(
5154

5255
if os.path.exists(dst):
5356
# warn that destination is about to be overwritten
54-
print('destination exists, will overwrite (%s)' % dst)
57+
logger.warning(event='destination exists, will overwrite', file=dst)
5558

5659
if compress:
5760
# make a compressed copy

src/acquisition/covidcast/fill_is_latest_issue.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88

99
# first party
1010
import delphi.operations.secrets as secrets
11-
11+
from delphi.epidata.acquisition.covidcast.logger import get_structured_logger
1212

1313
# partition configuration
1414
###PARTITION_VARIABLE = 'geo_value'
@@ -34,6 +34,9 @@
3434

3535
def main(*, CLEAR_LATEST_BY_PARTITION=_CLEAR_LATEST_BY_PARTITION, FILTER_CONDITION=_FILTER_CONDITION):
3636

37+
38+
logger = get_structured_logger("fill_is_lastest_issue")
39+
3740
u, p = secrets.db.epi
3841
connection = mysql.connector.connect(
3942
host=secrets.db.host,
@@ -94,7 +97,7 @@ def main(*, CLEAR_LATEST_BY_PARTITION=_CLEAR_LATEST_BY_PARTITION, FILTER_CONDITI
9497
commit = True
9598
except Exception as e:
9699
connection.rollback()
97-
print("exception raised at partition %s (partition index #%s) of column `%s`" % (PARTITION_SPLITS[partition_index], partition_index, PARTITION_VARIABLE))
100+
logger.exception("exception raised at partition %s (partition index #%s) of column `%s`" % (PARTITION_SPLITS[partition_index], partition_index, PARTITION_VARIABLE))
98101
raise e
99102
finally:
100103
cursor.close()

src/acquisition/covidcast/generate_islatest_fix_sql.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,6 @@
1414
for typ in ('num', 'prop'):
1515
signals.append(case+period+count+typ)
1616
### signals = ['sig2'] ###
17-
18-
1917
# variable to split on, 'time_value' is good because its high cardinality is suitable for chunking
2018
PARTITION_VARIABLE = 'time_value'
2119
PARTITION_SPLITS = [20200101 + i*100 for i in range(10)] # first day of the month for jan - oct 2020 in YYYYMMDD form
@@ -40,7 +38,6 @@
4038
ge_condition = 'TRUE' if partition_index == 0 else f'`{PARTITION_VARIABLE}` >= {PARTITION_SPLITS[partition_index - 1]}'
4139
l_condition = 'TRUE' if partition_index == len(PARTITION_SPLITS) else f'`{PARTITION_VARIABLE}` < {PARTITION_SPLITS[partition_index]}'
4240
partition_condition = f'({ge_condition}) AND ({l_condition})'
43-
4441
for sig in signals:
4542
where_clause = base_where_clause + " AND `signal`='%s' AND %s" % (sig, partition_condition)
4643

@@ -63,4 +60,4 @@
6360

6461
# clean up temp table
6562
print("-- TODO: drop this table")
66-
print("-- DROP TABLE `islatest_fix`;")
63+
print("-- DROP TABLE `islatest_fix`;")

0 commit comments

Comments
 (0)