diff --git a/.gitignore b/.gitignore index 71fce44..0ed42c9 100644 --- a/.gitignore +++ b/.gitignore @@ -82,6 +82,9 @@ celerybeat-schedule # dotenv .env +# direnv +.envrc + # virtualenv .venv venv/ @@ -100,4 +103,4 @@ ENV/ # mypy .mypy_cache/ -.vscode/ \ No newline at end of file +.vscode/ diff --git a/config.sample.json b/config.sample.json index 365f1c5..cdbc541 100644 --- a/config.sample.json +++ b/config.sample.json @@ -2,5 +2,6 @@ "project_id": "bigquery-public-data", "dataset_id": "samples", "table_id": "github_timeline", - "validate_records": true + "validate_records": true, + "add_tap_metadata": false } diff --git a/target_bigquery.py b/target_bigquery.py index 111d11f..eade687 100644 --- a/target_bigquery.py +++ b/target_bigquery.py @@ -94,10 +94,42 @@ def define_schema(field, name): return (schema_name, schema_type, schema_mode, schema_description, schema_fields) -def build_schema(schema): +def build_tap_metadata_schema(): + metadata_fields = { + "properties": { + "_sdc_version": { + "inclusion": "available", + "minimum": -2147483648, + "maximum": -2147483647, + "type": [ + "null", + "integer", + ] + }, + "_sdc_extracted_at": { + "inclusion": "available", + "format": "date-time", + "type": [ + "null", + "string", + ], + }, + "_sdc_deleted_at": { + "inclusion": "available", + "format": "date-time", + "type": [ + "null", + "string", + ], + }, + } + } + return build_schema(metadata_fields, add_metadata_fields=False) + +def build_schema(schema, add_metadata_fields=False): SCHEMA = [] for key in schema['properties'].keys(): - + if not (bool(schema['properties'][key])): # if we endup with an empty record. continue @@ -105,9 +137,11 @@ def build_schema(schema): schema_name, schema_type, schema_mode, schema_description, schema_fields = define_schema(schema['properties'][key], key) SCHEMA.append(SchemaField(schema_name, schema_type, schema_mode, schema_description, schema_fields)) + if add_metadata_fields: + SCHEMA += build_tap_metadata_schema() return SCHEMA -def persist_lines_job(project_id, dataset_id, lines=None, truncate=False, validate_records=True): +def persist_lines_job(project_id, dataset_id, lines=None, truncate=False, validate_records=True, add_tap_metadata=False): state = None schemas = {} key_properties = {} @@ -138,6 +172,10 @@ def persist_lines_job(project_id, dataset_id, lines=None, truncate=False, valida if validate_records: validate(msg.record, schema) + if add_tap_metadata: + msg.record['_sdc_version'] = msg.version + msg.record['_sdc_extracted_at'] = str(msg.time_extracted) + # NEWLINE_DELIMITED_JSON expects literal JSON formatted data, with a newline character splitting each row. dat = bytes(json.dumps(msg.record) + '\n', 'UTF-8') @@ -154,7 +192,7 @@ def persist_lines_job(project_id, dataset_id, lines=None, truncate=False, valida table = msg.stream schemas[table] = msg.schema key_properties[table] = msg.key_properties - #tables[table] = bigquery.Table(dataset.table(table), schema=build_schema(schemas[table])) + #tables[table] = bigquery.Table(dataset.table(table), schema=build_schema(schemas[table], add_tap_metadata)) rows[table] = TemporaryFile(mode='w+b') errors[table] = None # try: @@ -171,7 +209,7 @@ def persist_lines_job(project_id, dataset_id, lines=None, truncate=False, valida for table in rows.keys(): table_ref = bigquery_client.dataset(dataset_id).table(table) - SCHEMA = build_schema(schemas[table]) + SCHEMA = build_schema(schemas[table], add_tap_metadata) load_config = LoadJobConfig() load_config.schema = SCHEMA load_config.source_format = SourceFormat.NEWLINE_DELIMITED_JSON @@ -195,7 +233,7 @@ def persist_lines_job(project_id, dataset_id, lines=None, truncate=False, valida return state -def persist_lines_stream(project_id, dataset_id, lines=None, validate_records=True): +def persist_lines_stream(project_id, dataset_id, lines=None, validate_records=True, add_tap_metadata=False): state = None schemas = {} key_properties = {} @@ -228,6 +266,10 @@ def persist_lines_stream(project_id, dataset_id, lines=None, validate_records=Tr if validate_records: validate(msg.record, schema) + if add_tap_metadata: + msg.record['_sdc_version'] = msg.version + msg.record['_sdc_time_extracted'] = str(msg.time_extracted) + errors[msg.stream] = bigquery_client.insert_rows_json(tables[msg.stream], [msg.record]) rows[msg.stream] += 1 @@ -241,7 +283,7 @@ def persist_lines_stream(project_id, dataset_id, lines=None, validate_records=Tr table = msg.stream schemas[table] = msg.schema key_properties[table] = msg.key_properties - tables[table] = bigquery.Table(dataset.table(table), schema=build_schema(schemas[table])) + tables[table] = bigquery.Table(dataset.table(table), schema=build_schema(schemas[table], add_tap_metadata)) rows[table] = 0 errors[table] = None try: @@ -298,14 +340,19 @@ def main(): else: truncate = False + if config.get('add_tap_metadata'): + add_tap_metadata = True + else: + add_tap_metadata = False + validate_records = config.get('validate_records', True) input = io.TextIOWrapper(sys.stdin.buffer, encoding='utf-8') if config.get('stream_data', True): - state = persist_lines_stream(config['project_id'], config['dataset_id'], input, validate_records=validate_records) + state = persist_lines_stream(config['project_id'], config['dataset_id'], input, validate_records=validate_records, add_tap_metadata=add_tap_metadata) else: - state = persist_lines_job(config['project_id'], config['dataset_id'], input, truncate=truncate, validate_records=validate_records) + state = persist_lines_job(config['project_id'], config['dataset_id'], input, truncate=truncate, validate_records=validate_records, add_tap_metadata=add_tap_metadata) emit_state(state) logger.debug("Exiting normally")