From b686f903e9c05cda3a8367c29a61b623e7cdd738 Mon Sep 17 00:00:00 2001 From: John Bulcher Date: Thu, 19 Nov 2020 16:25:23 -0800 Subject: [PATCH 1/3] add metadata fields Add _sdc_version, _sdc_extracted_at and _sdc_deleted_at fields This allows querying for current data when using an append-only strategy in BigQuery --- target_bigquery.py | 65 +++++++++++++++++++++++++++++++++++++++------- 1 file changed, 56 insertions(+), 9 deletions(-) diff --git a/target_bigquery.py b/target_bigquery.py index 111d11f..eade687 100644 --- a/target_bigquery.py +++ b/target_bigquery.py @@ -94,10 +94,42 @@ def define_schema(field, name): return (schema_name, schema_type, schema_mode, schema_description, schema_fields) -def build_schema(schema): +def build_tap_metadata_schema(): + metadata_fields = { + "properties": { + "_sdc_version": { + "inclusion": "available", + "minimum": -2147483648, + "maximum": -2147483647, + "type": [ + "null", + "integer", + ] + }, + "_sdc_extracted_at": { + "inclusion": "available", + "format": "date-time", + "type": [ + "null", + "string", + ], + }, + "_sdc_deleted_at": { + "inclusion": "available", + "format": "date-time", + "type": [ + "null", + "string", + ], + }, + } + } + return build_schema(metadata_fields, add_metadata_fields=False) + +def build_schema(schema, add_metadata_fields=False): SCHEMA = [] for key in schema['properties'].keys(): - + if not (bool(schema['properties'][key])): # if we endup with an empty record. continue @@ -105,9 +137,11 @@ def build_schema(schema): schema_name, schema_type, schema_mode, schema_description, schema_fields = define_schema(schema['properties'][key], key) SCHEMA.append(SchemaField(schema_name, schema_type, schema_mode, schema_description, schema_fields)) + if add_metadata_fields: + SCHEMA += build_tap_metadata_schema() return SCHEMA -def persist_lines_job(project_id, dataset_id, lines=None, truncate=False, validate_records=True): +def persist_lines_job(project_id, dataset_id, lines=None, truncate=False, validate_records=True, add_tap_metadata=False): state = None schemas = {} key_properties = {} @@ -138,6 +172,10 @@ def persist_lines_job(project_id, dataset_id, lines=None, truncate=False, valida if validate_records: validate(msg.record, schema) + if add_tap_metadata: + msg.record['_sdc_version'] = msg.version + msg.record['_sdc_extracted_at'] = str(msg.time_extracted) + # NEWLINE_DELIMITED_JSON expects literal JSON formatted data, with a newline character splitting each row. dat = bytes(json.dumps(msg.record) + '\n', 'UTF-8') @@ -154,7 +192,7 @@ def persist_lines_job(project_id, dataset_id, lines=None, truncate=False, valida table = msg.stream schemas[table] = msg.schema key_properties[table] = msg.key_properties - #tables[table] = bigquery.Table(dataset.table(table), schema=build_schema(schemas[table])) + #tables[table] = bigquery.Table(dataset.table(table), schema=build_schema(schemas[table], add_tap_metadata)) rows[table] = TemporaryFile(mode='w+b') errors[table] = None # try: @@ -171,7 +209,7 @@ def persist_lines_job(project_id, dataset_id, lines=None, truncate=False, valida for table in rows.keys(): table_ref = bigquery_client.dataset(dataset_id).table(table) - SCHEMA = build_schema(schemas[table]) + SCHEMA = build_schema(schemas[table], add_tap_metadata) load_config = LoadJobConfig() load_config.schema = SCHEMA load_config.source_format = SourceFormat.NEWLINE_DELIMITED_JSON @@ -195,7 +233,7 @@ def persist_lines_job(project_id, dataset_id, lines=None, truncate=False, valida return state -def persist_lines_stream(project_id, dataset_id, lines=None, validate_records=True): +def persist_lines_stream(project_id, dataset_id, lines=None, validate_records=True, add_tap_metadata=False): state = None schemas = {} key_properties = {} @@ -228,6 +266,10 @@ def persist_lines_stream(project_id, dataset_id, lines=None, validate_records=Tr if validate_records: validate(msg.record, schema) + if add_tap_metadata: + msg.record['_sdc_version'] = msg.version + msg.record['_sdc_time_extracted'] = str(msg.time_extracted) + errors[msg.stream] = bigquery_client.insert_rows_json(tables[msg.stream], [msg.record]) rows[msg.stream] += 1 @@ -241,7 +283,7 @@ def persist_lines_stream(project_id, dataset_id, lines=None, validate_records=Tr table = msg.stream schemas[table] = msg.schema key_properties[table] = msg.key_properties - tables[table] = bigquery.Table(dataset.table(table), schema=build_schema(schemas[table])) + tables[table] = bigquery.Table(dataset.table(table), schema=build_schema(schemas[table], add_tap_metadata)) rows[table] = 0 errors[table] = None try: @@ -298,14 +340,19 @@ def main(): else: truncate = False + if config.get('add_tap_metadata'): + add_tap_metadata = True + else: + add_tap_metadata = False + validate_records = config.get('validate_records', True) input = io.TextIOWrapper(sys.stdin.buffer, encoding='utf-8') if config.get('stream_data', True): - state = persist_lines_stream(config['project_id'], config['dataset_id'], input, validate_records=validate_records) + state = persist_lines_stream(config['project_id'], config['dataset_id'], input, validate_records=validate_records, add_tap_metadata=add_tap_metadata) else: - state = persist_lines_job(config['project_id'], config['dataset_id'], input, truncate=truncate, validate_records=validate_records) + state = persist_lines_job(config['project_id'], config['dataset_id'], input, truncate=truncate, validate_records=validate_records, add_tap_metadata=add_tap_metadata) emit_state(state) logger.debug("Exiting normally") From 7502dc8ec68da5cabdd00821d4322b043c546dda Mon Sep 17 00:00:00 2001 From: John Bulcher Date: Thu, 19 Nov 2020 16:31:03 -0800 Subject: [PATCH 2/3] ignore direnv --- .gitignore | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index 71fce44..0ed42c9 100644 --- a/.gitignore +++ b/.gitignore @@ -82,6 +82,9 @@ celerybeat-schedule # dotenv .env +# direnv +.envrc + # virtualenv .venv venv/ @@ -100,4 +103,4 @@ ENV/ # mypy .mypy_cache/ -.vscode/ \ No newline at end of file +.vscode/ From 77c924467a20627bd7789e15e250d5618e5760a8 Mon Sep 17 00:00:00 2001 From: John Bulcher Date: Thu, 19 Nov 2020 16:51:14 -0800 Subject: [PATCH 3/3] add option to config sample --- config.sample.json | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/config.sample.json b/config.sample.json index 365f1c5..cdbc541 100644 --- a/config.sample.json +++ b/config.sample.json @@ -2,5 +2,6 @@ "project_id": "bigquery-public-data", "dataset_id": "samples", "table_id": "github_timeline", - "validate_records": true + "validate_records": true, + "add_tap_metadata": false }