Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 141 additions & 0 deletions dtbase/ingress/ingress_monitored_from_csv.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
"""
Python module to import data from csv file 'MonitoredSynthetic_5to25_05.csv'
"""
import logging
import os
from datetime import datetime, timedelta
from typing import Tuple, Union

import pandas as pd
import requests

from dtbase.services.base import BaseIngress

# Mapping of CSV metrics to sensor measures in the database.
METRICS_TO_MEASURES = {
"temperature": {
"name": "Air temperature",
"units": "degrees Celsius",
"datatype": "float",
},
"relative_humidity": {
"name": "Relative Humidity",
"units": "percentage",
"datatype": "float",
},
}

# Type of sensor for OpenWeatherMap
SENSOR_TYPE = {
"name": "Monitored",
"description": (
"Monitored data (synthetic), "
),
"measures": [
METRICS_TO_MEASURES["temperature"],
METRICS_TO_MEASURES["relative_humidity"],
],
}

#
SENSOR_MONITOREDCSV = {
"unique_identifier": "MonitoredCSV",
"type_name": "Monitored",
"name": "Synthetic",
}

class CSVDataIngress(BaseIngress):
"""
Custom class inheriting from the BaseIngress class for interacting with the
OpenWeatherData API.
"""

def __init__(self) -> None:
super().__init__()
self.present = datetime.now()

def get_service_data(self):
csvdata = pd.read_csv('/Users/rward/DTBase/dtbase/models/MonitoredSynthetic_5to25_05.csv') # dataframe

csvdata['DateTime'] = pd.to_datetime(csvdata['DateTime'], format='%d/%m/%Y %H:%M:%S')

hourly_records = []
for i,row in csvdata.iterrows():
record = {}
record["timestamp"] = row['DateTime']
record["temperature"] = row["T_air"]
record["relative_humidity"] = row["RH"]

hourly_records.append(record)

monitored_df = pd.DataFrame(hourly_records)
monitored_df.set_index("timestamp", inplace=True)

all_timestamps = [ts.isoformat() for ts in monitored_df.index]
measure_payloads = []
for metric in monitored_df.columns:
values = monitored_df[metric]
# Some values may be None, filter those out.
timestamps, values = zip(
*((t, v) for t, v in zip(all_timestamps, values) if v is not None)
)

measure_payloads.append(
{
"measure_name": METRICS_TO_MEASURES[metric]["name"],
"unique_identifier": SENSOR_MONITOREDCSV["unique_identifier"],
"readings": values,
"timestamps": timestamps,
}
)

# Define outputs in the format (endpoint, payload)
sensor_type_output = [("/sensor/insert-sensor-type", SENSOR_TYPE)]
sensor_output = [("/sensor/insert-sensor", SENSOR_MONITOREDCSV)]
sensor_readings_output = [
("/sensor/insert-sensor-readings", payload) for payload in measure_payloads
]

return sensor_type_output + sensor_output + sensor_readings_output



def example_weather_ingress() -> None:
"""
Ingress weather data from 60 hours before today and 2 days after.
As there are two different APIs used for past and future, we need to make two
seperate calls. This is likely to be specific to the weather ingress. Other
ingress methods may not need to do this.
"""
api_key = os.environ.get("DT_OPENWEATHERMAP_APIKEY")
latitude = 51.53
longitude = -0.127

# First do calls from before now
dt_from = datetime.now() - timedelta(days=5)
dt_to = "present"
weather_ingress = OpenWeatherDataIngress()
weather_ingress(
dt_from=dt_from,
dt_to=dt_to,
api_key=api_key,
latitude=latitude,
longitude=longitude,
)

# Now repeat for after
dt_from = "present"
dt_to = datetime.now() + timedelta(days=2)
weather_ingress = OpenWeatherDataIngress()
weather_ingress(
dt_from=dt_from,
dt_to=dt_to,
api_key=api_key,
latitude=latitude,
longitude=longitude,
)


if __name__ == "__main__":
ingress = CSVDataIngress()
ingress()
Loading