Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions environments/postgres-database-docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
version: '3.8'
services:
db:
image: postgres:14.1-alpine
restart: always
environment:
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
ports:
- '5432:5432'
volumes:
- db:/var/lib/postgresql/data
volumes:
db:
driver: local
10 changes: 10 additions & 0 deletions turbo_stream/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,3 +149,13 @@ def write_date_to_local(self, file_location):
"""
logging.info(f"Writing data to local path: {file_location}.")
write_file(data=self._data_set, file_location=file_location)


class _WriterInterface:
"""
Turbo Stream Writer Class Interface
"""

def __init__(self, credentials: (dict, str), configuration: dict = None, **kwargs):
self._configuration: dict = configuration
self._credentials: (dict, str) = credentials
59 changes: 56 additions & 3 deletions turbo_stream/google_analyitcs/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import logging
from socket import timeout

from turbo_stream.postgresql.writer import _PostgreSQLWriter

from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from oauth2client.service_account import ServiceAccountCredentials
Expand Down Expand Up @@ -147,9 +149,14 @@ def _iterate_report(self, reports, view_id):
else:
row_dict[metric.get("name")] = int(value)

# add additional data
row_dict["ga:viewId"] = view_id
self._data_set.append(row_dict)
# add additional data & clean up field names
row_dict["viewId"] = view_id
new_row_dict = {}

for key, value in row_dict.items():
new_row_dict[key.replace("ga:", "")] = value

self._data_set.append(new_row_dict)

def run_query(self):
"""
Expand Down Expand Up @@ -190,3 +197,49 @@ def run_query(self):

logging.info(f"{self.__class__.__name__} process complete!")
return self._data_set

def write_data_to_postgresql(
self,
credentials: dict,
table_name: str,
truncate_on_insert: bool = False,
deduplicate: bool = False,
):
_writer = _PostgreSQLWriter(credentials=credentials)

_dimensions = [
dim.replace("ga:", "") for dim in self._configuration.get("dimensions", [])
]
_metrics = [
met.replace("ga:", "") for met in self._configuration.get("metrics", [])
]

_schema = {
"viewId": {
"type": "VARCHAR",
"not_null": True,
}
}

for _metric in _metrics:
_schema[_metric] = {
"type": "NUMERIC",
"not_null": True,
}

for _dimension in _dimensions:
_schema[_dimension] = {
"type": "VARCHAR",
"not_null": True,
}

_writer._create_table(table_name=table_name, schema=_schema)

_writer._insert_table(
table_name=table_name,
dataset=self._data_set,
truncate_on_insert=truncate_on_insert,
)

if deduplicate:
_writer._drop_duplicates(table_name=table_name, schema=_schema)
58 changes: 58 additions & 0 deletions turbo_stream/google_search_console/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
from googleapiclient.errors import HttpError
from oauth2client.client import OAuth2WebServerFlow

from turbo_stream.postgresql.writer import _PostgreSQLWriter

from turbo_stream import ReaderInterface, write_file, write_file_to_s3
from turbo_stream.utils.date_handlers import date_range
from turbo_stream.utils.request_handlers import request_handler, retry_handler
Expand Down Expand Up @@ -216,3 +218,59 @@ def write_partition_data_to_s3(
key=f"{path}/{partition_name}_{dimension}.{fmt}",
data=partition_data,
)

def write_data_to_postgresql(
self,
credentials: dict,
table_name: str,
truncate_on_insert: bool = False,
deduplicate: bool = False,
):
_writer = _PostgreSQLWriter(credentials=credentials)
_dimensions = self._configuration.get("dimensions", [])
_metrics = self._configuration.get("metrics", [])

# for gsc, we have a base schema that has each dimension added as
# multi-metric databases in an rds setting
for _dimension in _dimensions:
_schema = {
"site_url": {
"type": "VARCHAR",
"not_null": True,
},
"search_type": {
"type": "VARCHAR",
"not_null": True,
},
}

for _metric in _metrics:
_schema[_metric] = {
"type": "NUMERIC",
"not_null": True,
}

_schema[_dimension] = {
"type": "VARCHAR",
"not_null": True,
}

if "date" not in _schema:
_schema["date"] = {
"type": "VARCHAR",
"not_null": True,
}

_writer._create_table(
table_name=f"{table_name}_{_dimension}", schema=_schema
)

for _dimension, _dimension_dataset in self._data_set[0].items():
_writer._insert_table(
table_name=f"{table_name}_{_dimension}",
dataset=_dimension_dataset,
truncate_on_insert=truncate_on_insert,
)

if deduplicate:
_writer._drop_duplicates(table_name=table_name, schema=_schema)
Empty file.
201 changes: 201 additions & 0 deletions turbo_stream/postgresql/writer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
"""
Writer class for PostgreSQL that suits the expectations of the turbo stream services.
"""
import logging

import psycopg2
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT

from turbo_stream import _WriterInterface

logging.basicConfig(
format="%(asctime)s %(name)-12s %(levelname)-8s %(message)s", level=logging.INFO
)


class _PostgreSQLWriter(_WriterInterface):
def __init__(self, credentials: (dict, str), configuration: dict = None, **kwargs):
super().__init__(configuration, credentials, **kwargs)

self._credentials = credentials
self._configuration = configuration

self._test_cursor()

def _connect(self):
"""
Establish connection to PostgreSQL database.
The basic connection parameters are:
- *dbname*: the database name
- *database*: the database name (only as keyword argument)
- *user*: user name used to authenticate (defaults to postgres if not provided)
- *password*: password used to authenticate
- *host*: database host address (defaults to UNIX socket if not provided)
- *port*: connection port number (defaults to 5432 if not provided)
"""
logging.info(f"Establishing a connection to PostgreSQL database.")

if "dbname" in self._credentials:
return psycopg2.connect(
dbname=self._credentials.get("dbname"),
user=self._credentials.get("user", "postgres"),
password=self._credentials.get("password"),
host=self._credentials.get("host"),
port=self._credentials.get("port", 5432),
)

return psycopg2.connect(
database=self._credentials.get("database", "postgres"),
user=self._credentials.get("user", "postgres"),
password=self._credentials.get("password"),
host=self._credentials.get("host", "localhost"),
port=self._credentials.get("port", 5432),
)

def _test_cursor(self):
"""
Get PostgreSQL cursor for querying.
:return: Cursor object.
"""
# test the connection before returning it
try:
self._execute_query("SELECT 1")
logging.info("Connection to PostgreSQL successful.")

except psycopg2.OperationalError as err:
logging.info("Connection to PostgreSQL failed.")
raise err

def _execute_query(self, query, dataset=None):
# Some PostgreSQL command such as CREATE DATABASE or VACUUM can’t run into a transaction.
_connection = self._connect()
_connection.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
_cursor = _connection.cursor()
logging.info(query)
if dataset is None:
_cursor.execute(query)
else:
_cursor.executemany(query, dataset)
_connection.close()

def _create_table(self, table_name: str, schema: dict):
"""
Converts a standardised dict object that represents the typical PostgreSQL
Table Schema and generates a physical database from it.
:param table_name: The name of the new table.
:param schema: The dict schema object of the new table which looks like:
schema = {
"user_id": {
"type": "INT",
"not_null": True,
"primary_key": True,
"unique": True
},
"user_name": {
"type": "INT",
"not_null": True,
"primary_key": False,
"unique": False,
},
}
"""
logging.info(
f"Attempting to create {table_name} table for PostgreSQL database."
)
field_set_string = []
count = 1
for field_name, field_meta in schema.items():
field_set_string.append(field_name)

if field_meta.get("type", False):
field_set_string.append(field_meta.get("type"))

if field_meta.get("not null", False):
field_set_string.append("NOT NULL")

if field_meta.get("primary_key", False):
field_set_string.append("PRIMARY KEY")

if field_meta.get("unique", False):
field_set_string.append("UNIQUE")

if len(schema) > count:
# break each statement except for the last one
field_set_string.append(",")

count += 1

query_frame_string = (
f"CREATE TABLE IF NOT EXISTS "
f"{table_name} (ts_id SERIAL, ts_date DATE DEFAULT now(), {' '.join(field_set_string)});"
)

self._execute_query(query_frame_string)

logging.info(
f"Attempting to create {table_name} table for PostgreSQL database complete."
)

def _insert_table(self, table_name: str, dataset: list, truncate_on_insert=False):
"""
Submits a dataset in a record orientation that fists the tables schema.
:param table_name: The table to write to.
:param dataset: The dataset that fits the table schema.
:param truncate_on_insert: Bool to clear out table before inserting dataset.
"""

if truncate_on_insert:
logging.info(f"Attempting to truncate {table_name} table.")
query = f"TRUNCATE TABLE {table_name};"
self._execute_query(query)

logging.info(f"Attempting to insert data into {table_name} table.")
fields = [str(f) for f in dataset[0].keys()]

value_insert_set = []
for field in fields:
value_insert_set.append(f"%({field})s")

query = (
f"INSERT INTO {table_name}({', '.join(fields)}) "
f"VALUES ({', '.join(value_insert_set)})"
)
self._execute_query(query=query, dataset=dataset)
logging.info(f"Attempting to insert data into {table_name} table complete.")

def _drop_duplicates(self, table_name: str, schema: dict):
"""
Query to drop all fields that may have duplicates.
This is faster because this runs only 2 queries.
First one to select all the duplicates, then one to delete all items from the table.
:param table_name: The name of the new table.
:param schema: The dict schema object of the new table which looks like:
schema = {
"user_id": {
"type": "INT",
"not_null": True,
"primary_key": True,
"unique": True
},
"user_name": {
"type": "INT",
"not_null": True,
"primary_key": False,
"unique": False,
},
}
"""

logging.info(f"Attempting to drop duplicates for {table_name} table.")
fields = schema.keys()
comparison_clauses = []
for field in fields:
comparison_clauses.append(f"a.{field} = b.{field}")
query = (
f"DELETE FROM {table_name} a USING (SELECT MIN(ctid) as ctid, {', '.join(fields)} "
f"FROM {table_name} GROUP BY {', '.join(fields)} HAVING COUNT(*) > 1) b WHERE "
f"{' AND '.join(comparison_clauses)} AND a.ctid <> b.ctid;"
)
logging.info(query)
self._execute_query(query)
logging.info(f"Attempting to drop duplicates for {table_name} table complete.")