Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 146 additions & 0 deletions pysatl_experiment/persistence/power/alchemy/alchemy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
from __future__ import annotations

import json
from typing import ClassVar

from sqlalchemy import Float, Integer, String, UniqueConstraint
from sqlalchemy.orm import Mapped, mapped_column
from typing_extensions import override

from pysatl_experiment.persistence.db_store.base import ModelBase, SessionType
from pysatl_experiment.persistence.db_store.model import AbstractDbStore
from pysatl_experiment.persistence.model.power.power import IPowerStorage, PowerModel, PowerQuery


class AlchemyPower(ModelBase):
__tablename__ = "power"

id: Mapped[int] = mapped_column(Integer, primary_key=True) # type: ignore
experiment_id: Mapped[int] = mapped_column(Integer, nullable=False) # type: ignore
criterion_code: Mapped[str] = mapped_column(String, nullable=False, index=True) # type: ignore
criterion_parameters: Mapped[str] = mapped_column(String, nullable=False, index=True) # type: ignore
sample_size: Mapped[int] = mapped_column(Integer, nullable=False, index=True) # type: ignore
alternative_code: Mapped[str] = mapped_column(String, nullable=False, index=True) # type: ignore
alternative_parameters: Mapped[str] = mapped_column(String, nullable=False, index=True) # type: ignore
monte_carlo_count: Mapped[int] = mapped_column(Integer, nullable=False, index=True) # type: ignore
significance_level: Mapped[float] = mapped_column(Float, nullable=False, index=True) # type: ignore
results_criteria: Mapped[str] = mapped_column(String, nullable=False) # type: ignore

__table_args__ = (
UniqueConstraint(
"criterion_code",
"criterion_parameters",
"sample_size",
"alternative_code",
"alternative_parameters",
"monte_carlo_count",
"significance_level",
name="uq_power_unique",
),
)


class AlchemyPowerStorage(AbstractDbStore, IPowerStorage):
session: ClassVar[SessionType]

def __init__(self, db_url: str):
super().__init__(db_url=db_url)
self._initialized: bool = False

@override
def init(self) -> None:
super().init()
self._initialized = True

def _get_session(self) -> SessionType:
if not getattr(self, "_initialized", False):
raise RuntimeError("Storage not initialized. Call init() first.")
return AlchemyPowerStorage.session

@override
def get_data(self, query: PowerQuery) -> PowerModel | None:
params_json = json.dumps(query.criterion_parameters)
alt_params_json = json.dumps(query.alternative_parameters)
row: AlchemyPower | None = (
self._get_session()
.query(AlchemyPower)
.filter(
AlchemyPower.criterion_code == query.criterion_code,
AlchemyPower.criterion_parameters == params_json,
AlchemyPower.sample_size == int(query.sample_size),
AlchemyPower.alternative_code == query.alternative_code,
AlchemyPower.alternative_parameters == alt_params_json,
AlchemyPower.monte_carlo_count == int(query.monte_carlo_count),
AlchemyPower.significance_level == float(query.significance_level),
)
.one_or_none()
)
if row is None:
return None
return PowerModel(
experiment_id=int(row.experiment_id),
criterion_code=query.criterion_code,
criterion_parameters=query.criterion_parameters,
sample_size=query.sample_size,
alternative_code=query.alternative_code,
alternative_parameters=query.alternative_parameters,
monte_carlo_count=query.monte_carlo_count,
significance_level=query.significance_level,
results_criteria=json.loads(row.results_criteria),
)

@override
def insert_data(self, data: PowerModel) -> None:
params_json = json.dumps(data.criterion_parameters)
alt_params_json = json.dumps(data.alternative_parameters)
existing: AlchemyPower | None = (
self._get_session()
.query(AlchemyPower)
.filter(
AlchemyPower.criterion_code == data.criterion_code,
AlchemyPower.criterion_parameters == params_json,
AlchemyPower.sample_size == int(data.sample_size),
AlchemyPower.alternative_code == data.alternative_code,
AlchemyPower.alternative_parameters == alt_params_json,
AlchemyPower.monte_carlo_count == int(data.monte_carlo_count),
AlchemyPower.significance_level == float(data.significance_level),
)
.one_or_none()
)
if existing is None:
entity = AlchemyPower(
experiment_id=int(data.experiment_id),
criterion_code=data.criterion_code,
criterion_parameters=params_json,
sample_size=int(data.sample_size),
alternative_code=data.alternative_code,
alternative_parameters=alt_params_json,
monte_carlo_count=int(data.monte_carlo_count),
significance_level=float(data.significance_level),
results_criteria=json.dumps(data.results_criteria),
)
self._get_session().add(entity)
else:
existing.experiment_id = int(data.experiment_id)
existing.results_criteria = json.dumps(data.results_criteria)
self._get_session().commit()

@override
def delete_data(self, query: PowerQuery) -> None:
params_json = json.dumps(query.criterion_parameters)
alt_params_json = json.dumps(query.alternative_parameters)
(
self._get_session()
.query(AlchemyPower)
.filter(
AlchemyPower.criterion_code == query.criterion_code,
AlchemyPower.criterion_parameters == params_json,
AlchemyPower.sample_size == int(query.sample_size),
AlchemyPower.alternative_code == query.alternative_code,
AlchemyPower.alternative_parameters == alt_params_json,
AlchemyPower.monte_carlo_count == int(query.monte_carlo_count),
AlchemyPower.significance_level == float(query.significance_level),
)
.delete()
)
self._get_session().commit()
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from .alchemy import AlchemyRandomValuesStorage


__all__ = ["AlchemyRandomValuesStorage"]
234 changes: 234 additions & 0 deletions pysatl_experiment/persistence/random_values/alchemy/alchemy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,234 @@
from __future__ import annotations

import json
from typing import ClassVar

from sqlalchemy import Integer, String, UniqueConstraint
from sqlalchemy.orm import Mapped, mapped_column
from typing_extensions import override

from pysatl_experiment.persistence.db_store.base import ModelBase, SessionType
from pysatl_experiment.persistence.db_store.model import AbstractDbStore
from pysatl_experiment.persistence.model.random_values.random_values import (
IRandomValuesStorage,
RandomValuesAllModel,
RandomValuesAllQuery,
RandomValuesCountQuery,
RandomValuesModel,
RandomValuesQuery,
)


class AlchemyRandomValues(ModelBase):
__tablename__ = "random_values"

id: Mapped[int] = mapped_column(Integer, primary_key=True) # type: ignore
generator_name: Mapped[str] = mapped_column(String, nullable=False, index=True) # type: ignore
generator_parameters: Mapped[str] = mapped_column(String, nullable=False, index=True) # type: ignore
sample_size: Mapped[int] = mapped_column(Integer, nullable=False, index=True) # type: ignore
sample_num: Mapped[int] = mapped_column(Integer, nullable=False) # type: ignore
data: Mapped[str] = mapped_column(String, nullable=False) # type: ignore

__table_args__ = (
UniqueConstraint(
"generator_name",
"generator_parameters",
"sample_size",
"sample_num",
name="uq_random_values_unique",
),
)


class AlchemyRandomValuesStorage(AbstractDbStore, IRandomValuesStorage):
session: ClassVar[SessionType]

def __init__(self, db_url: str):
super().__init__(db_url=db_url)
self._initialized: bool = False

@override
def init(self) -> None:
# Initialize engine and scoped session via AbstractDbStore
super().init()
self._initialized = True

def _get_session(self) -> SessionType:
if not getattr(self, "_initialized", False):
raise RuntimeError("Storage not initialized. Call init() first.")
# Access class attribute defined by AbstractDbStore after init()
return AlchemyRandomValuesStorage.session

@override
def get_data(self, query: RandomValuesQuery) -> RandomValuesModel | None:
params_json = json.dumps(query.generator_parameters)
row: AlchemyRandomValues | None = (
self._get_session()
.query(AlchemyRandomValues)
.filter(
AlchemyRandomValues.generator_name == query.generator_name,
AlchemyRandomValues.generator_parameters == params_json,
AlchemyRandomValues.sample_size == int(query.sample_size),
AlchemyRandomValues.sample_num == int(query.sample_num),
)
.one_or_none()
)
if row is None:
return None
return RandomValuesModel(
generator_name=query.generator_name,
generator_parameters=query.generator_parameters,
sample_size=query.sample_size,
sample_num=query.sample_num,
data=json.loads(row.data),
)

@override
def insert_data(self, data: RandomValuesModel) -> None:
params_json = json.dumps(data.generator_parameters)
entity = AlchemyRandomValues(
generator_name=data.generator_name,
generator_parameters=params_json,
sample_size=int(data.sample_size),
sample_num=int(data.sample_num),
data=json.dumps(data.data),
)
existing: AlchemyRandomValues | None = (
self._get_session()
.query(AlchemyRandomValues)
.filter(
AlchemyRandomValues.generator_name == entity.generator_name,
AlchemyRandomValues.generator_parameters == entity.generator_parameters,
AlchemyRandomValues.sample_size == entity.sample_size,
AlchemyRandomValues.sample_num == entity.sample_num,
)
.one_or_none()
)
if existing is None:
self._get_session().add(entity)
else:
existing.data = entity.data
self._get_session().commit()

@override
def delete_data(self, query: RandomValuesQuery) -> None:
params_json = json.dumps(query.generator_parameters)
(
self._get_session()
.query(AlchemyRandomValues)
.filter(
AlchemyRandomValues.generator_name == query.generator_name,
AlchemyRandomValues.generator_parameters == params_json,
AlchemyRandomValues.sample_size == int(query.sample_size),
AlchemyRandomValues.sample_num == int(query.sample_num),
)
.delete()
)
self._get_session().commit()

@override
def get_rvs_count(self, query: RandomValuesAllQuery) -> int:
params_json = json.dumps(query.generator_parameters)
return (
self._get_session()
.query(AlchemyRandomValues)
.filter(
AlchemyRandomValues.generator_name == query.generator_name,
AlchemyRandomValues.generator_parameters == params_json,
AlchemyRandomValues.sample_size == int(query.sample_size),
)
.count()
)

@override
def insert_all_data(self, query: RandomValuesAllModel) -> None:
params_json = json.dumps(query.generator_parameters)
# delete existing
(
self._get_session()
.query(AlchemyRandomValues)
.filter(
AlchemyRandomValues.generator_name == query.generator_name,
AlchemyRandomValues.generator_parameters == params_json,
AlchemyRandomValues.sample_size == int(query.sample_size),
)
.delete()
)
# insert new
for i, sample in enumerate(query.data, start=1):
self._get_session().add(
AlchemyRandomValues(
generator_name=query.generator_name,
generator_parameters=params_json,
sample_size=int(query.sample_size),
sample_num=i,
data=json.dumps(sample),
)
)
self._get_session().commit()

@override
def get_all_data(self, query: RandomValuesAllQuery) -> list[RandomValuesModel] | None:
params_json = json.dumps(query.generator_parameters)
rows: list[AlchemyRandomValues] = (
self._get_session()
.query(AlchemyRandomValues)
.filter(
AlchemyRandomValues.generator_name == query.generator_name,
AlchemyRandomValues.generator_parameters == params_json,
AlchemyRandomValues.sample_size == int(query.sample_size),
)
.order_by(AlchemyRandomValues.sample_num)
.all()
)
return [
RandomValuesModel(
generator_name=query.generator_name,
generator_parameters=query.generator_parameters,
sample_size=query.sample_size,
sample_num=row.sample_num,
data=json.loads(row.data),
)
for row in rows
]

@override
def delete_all_data(self, query: RandomValuesAllQuery) -> None:
params_json = json.dumps(query.generator_parameters)
(
self._get_session()
.query(AlchemyRandomValues)
.filter(
AlchemyRandomValues.generator_name == query.generator_name,
AlchemyRandomValues.generator_parameters == params_json,
AlchemyRandomValues.sample_size == int(query.sample_size),
)
.delete()
)
self._get_session().commit()

@override
def get_count_data(self, query: RandomValuesCountQuery) -> list[RandomValuesModel] | None:
params_json = json.dumps(query.generator_parameters)
rows: list[AlchemyRandomValues] = (
self._get_session()
.query(AlchemyRandomValues)
.filter(
AlchemyRandomValues.generator_name == query.generator_name,
AlchemyRandomValues.generator_parameters == params_json,
AlchemyRandomValues.sample_size == int(query.sample_size),
)
.order_by(AlchemyRandomValues.sample_num)
.limit(int(query.count))
.all()
)
return [
RandomValuesModel(
generator_name=query.generator_name,
generator_parameters=query.generator_parameters,
sample_size=query.sample_size,
sample_num=row.sample_num,
data=json.loads(row.data),
)
for row in rows
]
Loading
Loading