Skip to content
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 81 additions & 0 deletions alembic/versions/0ea9ab7cb60f_add_inbox_mail.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
"""Add inbox mail

Revision ID: 0ea9ab7cb60f
Revises: 85bb3ebee137
Create Date: 2025-11-21 13:43:44.056253

"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql

# revision identifiers, used by Alembic.
revision = '0ea9ab7cb60f'
down_revision = '85bb3ebee137'
branch_labels = None
depends_on = None


def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('inbox_mail_thread',
sa.Column('id', postgresql.UUID(as_uuid=True), nullable=False),
sa.Column('created_by', postgresql.UUID(as_uuid=True), nullable=True),
sa.Column('organization_id', postgresql.UUID(as_uuid=True), nullable=True),
sa.Column('created_at', sa.DateTime(), nullable=True),
sa.Column('subject', sa.String(), nullable=True),
sa.Column('meta_data', sa.JSON(), nullable=True),
sa.Column('is_important', sa.Boolean(), nullable=True),
sa.Column('progress_state', sa.String(), nullable=True),
sa.Column('support_owner_id', postgresql.UUID(as_uuid=True), nullable=True),
sa.Column('is_admin_support_thread', sa.Boolean(), nullable=True),
sa.ForeignKeyConstraint(['created_by'], ['user.id'], ondelete='CASCADE'),
sa.ForeignKeyConstraint(['organization_id'], ['organization.id'], ondelete='CASCADE'),
sa.ForeignKeyConstraint(['support_owner_id'], ['user.id'], ondelete='SET NULL'),
sa.PrimaryKeyConstraint('id'),
schema='global'
)
op.create_index(op.f('ix_global_inbox_mail_thread_created_by'), 'inbox_mail_thread', ['created_by'], unique=False, schema='global')
op.create_index(op.f('ix_global_inbox_mail_thread_organization_id'), 'inbox_mail_thread', ['organization_id'], unique=False, schema='global')
op.create_index(op.f('ix_global_inbox_mail_thread_support_owner_id'), 'inbox_mail_thread', ['support_owner_id'], unique=False, schema='global')
op.create_table('inbox_mail',
sa.Column('id', postgresql.UUID(as_uuid=True), nullable=False),
sa.Column('created_at', sa.DateTime(), nullable=True),
sa.Column('sender_id', postgresql.UUID(as_uuid=True), nullable=True),
sa.Column('thread_id', postgresql.UUID(as_uuid=True), nullable=True),
sa.Column('content', sa.String(), nullable=True),
sa.ForeignKeyConstraint(['sender_id'], ['user.id'], ondelete='SET NULL'),
sa.ForeignKeyConstraint(['thread_id'], ['global.inbox_mail_thread.id'], ondelete='CASCADE'),
sa.PrimaryKeyConstraint('id'),
schema='global'
)
op.create_index(op.f('ix_global_inbox_mail_sender_id'), 'inbox_mail', ['sender_id'], unique=False, schema='global')
op.create_index(op.f('ix_global_inbox_mail_thread_id'), 'inbox_mail', ['thread_id'], unique=False, schema='global')
op.create_table('inbox_mail_thread_association',
sa.Column('id', postgresql.UUID(as_uuid=True), nullable=False),
sa.Column('thread_id', postgresql.UUID(as_uuid=True), nullable=True),
sa.Column('user_id', postgresql.UUID(as_uuid=True), nullable=True),
sa.Column('unread_mail_count', sa.Integer(), nullable=True),
sa.ForeignKeyConstraint(['thread_id'], ['global.inbox_mail_thread.id'], ondelete='CASCADE'),
sa.ForeignKeyConstraint(['user_id'], ['user.id'], ondelete='CASCADE'),
sa.PrimaryKeyConstraint('id'),
schema='global'
)
op.create_index(op.f('ix_global_inbox_mail_thread_association_thread_id'), 'inbox_mail_thread_association', ['thread_id'], unique=False, schema='global')
op.create_index(op.f('ix_global_inbox_mail_thread_association_user_id'), 'inbox_mail_thread_association', ['user_id'], unique=False, schema='global')
# ### end Alembic commands ###


def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index(op.f('ix_global_inbox_mail_thread_association_user_id'), table_name='inbox_mail_thread_association', schema='global')
op.drop_index(op.f('ix_global_inbox_mail_thread_association_thread_id'), table_name='inbox_mail_thread_association', schema='global')
op.drop_table('inbox_mail_thread_association', schema='global')
op.drop_index(op.f('ix_global_inbox_mail_thread_id'), table_name='inbox_mail', schema='global')
op.drop_index(op.f('ix_global_inbox_mail_sender_id'), table_name='inbox_mail', schema='global')
op.drop_table('inbox_mail', schema='global')
op.drop_index(op.f('ix_global_inbox_mail_thread_support_owner_id'), table_name='inbox_mail_thread', schema='global')
op.drop_index(op.f('ix_global_inbox_mail_thread_organization_id'), table_name='inbox_mail_thread', schema='global')
op.drop_index(op.f('ix_global_inbox_mail_thread_created_by'), table_name='inbox_mail_thread', schema='global')
op.drop_table('inbox_mail_thread', schema='global')
# ### end Alembic commands ###
6 changes: 6 additions & 0 deletions app.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
from fast_api.routes.task_execution import router as task_execution_router
from fast_api.routes.record_internal import router as record_internal_router
from fast_api.routes.playground import router as playground_router
from fast_api.routes.inbox_mail import router as inbox_mail_router
from middleware.database_session import handle_db_session
from middleware.starlette_tmp_middleware import DatabaseSessionHandler
from starlette.applications import Starlette
Expand Down Expand Up @@ -62,6 +63,7 @@
PREFIX_LABELING_TASKS,
PREFIX_TASK_EXECUTION,
PREFIX_PLAYGROUND,
PREFIX_INBOX_MAIL,
)
from util import security, clean_up
from middleware import log_storage
Expand Down Expand Up @@ -121,6 +123,10 @@
playground_router, prefix=PREFIX_PLAYGROUND, tags=["playground"]
)

fastapi_app.include_router(
inbox_mail_router, prefix=PREFIX_INBOX_MAIL, tags=["inbox_mail"]
)

app_name_internal = app_name + "-i"
fastapi_app_internal = FastAPI(title=app_name_internal)

Expand Down
51 changes: 49 additions & 2 deletions controller/auth/kratos.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
from datetime import datetime, timedelta
from urllib.parse import quote

from controller.user import manager
from submodules.model import daemon
from submodules.model.business_objects import general, user


logging.basicConfig(level=logging.INFO)
Expand Down Expand Up @@ -76,7 +77,7 @@ def __refresh_identity_cache(update_db_users: bool = True) -> None:
KRATOS_IDENTITY_CACHE = {}

if update_db_users:
manager.migrate_kratos_users()
migrate_kratos_users()


def __get_link_from_kratos_request(request: requests.Response) -> str:
Expand Down Expand Up @@ -293,3 +294,49 @@ def check_user_exists(email: str) -> bool:
if i["traits"]["email"].lower() == email.lower():
return True
return False


def migrate_kratos_users() -> None:
daemon.run_with_db_token(__migrate_kratos_users)


def __migrate_kratos_users():
users_kratos = get_cached_values(False)
users_database = user.get_all()

for user_database in users_database:
user_id = str(user_database.id)
if user_id not in users_kratos or users_kratos[user_id] is None:
continue
user_identity = users_kratos[user_id]["identity"]
if user_database.email != user_identity["traits"]["email"]:
user_database.email = user_identity["traits"]["email"]
if (
user_database.verified
!= user_identity["verifiable_addresses"][0]["verified"]
):
user_database.verified = user_identity["verifiable_addresses"][0][
"verified"
]
if (
user_database.created_at
!= user_identity["verifiable_addresses"][0]["created_at"]
):
user_database.created_at = user_identity["verifiable_addresses"][0][
"created_at"
]
if user_database.metadata_public != user_identity["metadata_public"]:
user_database.metadata_public = user_identity["metadata_public"]
sso_provider = (
(
user_identity["metadata_public"]
.get("registration_scope", {})
.get("provider_id", None)
)
if user_identity["metadata_public"]
else None
)
if user_database.sso_provider != sso_provider:
user_database.sso_provider = sso_provider

general.commit()
Empty file.
198 changes: 198 additions & 0 deletions controller/inbox_mail/manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
from typing import Any, Dict, List, Optional
from submodules.model.business_objects import general, user
from submodules.model.global_objects import inbox_mail
from submodules.model.util import sql_alchemy_to_dict
from controller.auth import kratos
from sqlalchemy.orm.attributes import flag_modified

DEFAULT_KERN_AI_ADMIN_NAME = {"first": "Kern AI Support", "last": ""}
DEFAULT_SYSTEM_AUTOGENERATED_NAME = {"first": "System", "last": ""}


def create_inbox_mail_by_thread(
org_id: str,
sender_id: str,
recipient_ids: List[str],
subject: str,
content: str,
is_important: bool = False,
is_admin_support_thread: bool = False,
meta_data: Optional[Dict[str, Any]] = None,
thread_id: Optional[str] = None,
) -> Dict[str, Any]:

return inbox_mail.create_by_thread(
org_id=org_id,
sender_id=sender_id,
content=content,
recipient_ids=recipient_ids,
subject=subject,
meta_data=meta_data,
thread_id=thread_id,
is_important=is_important,
is_admin_support_thread=is_admin_support_thread,
)


def get_inbox_mails_by_thread(
org_id: str,
user_id: str,
thread_id: Optional[str] = None,
user_is_admin: bool = False,
) -> List[Dict[str, Any]]:
mails = inbox_mail.get_by_thread(
org_id, user_id, thread_id, user_is_admin=user_is_admin
)
participant_ids = [
str(pid) for pid in inbox_mail.get_participant_ids_by_thread_id(thread_id)
]
admin_user_ids = [str(u.id) for u in user.get_admin_users()]

mail_dicts = [sql_alchemy_to_dict(mail) for mail in mails]
thread_entity = inbox_mail.get_inbox_mail_thread_by_id(thread_id)
thread_association = (
inbox_mail.get_inbox_mail_thread_association_by_thread_id_and_user_id(
thread_id, user_id
)
)
if thread_association:
thread_association.unread_mail_count = 0
general.flush_or_commit(True)
if thread_entity and thread_entity.is_admin_support_thread:
meta_data = thread_entity.meta_data or {}
if user_id in admin_user_ids:
if meta_data.get("unreadMailCountAdmin", 0) > 0:
meta_data["unreadMailCountAdmin"] = 0
thread_entity.meta_data = meta_data
flag_modified(thread_entity, "meta_data")
general.flush_or_commit(True)
for mail_dict in mail_dicts:
extend_inbox_mail_sender_receiver_names(
participant_ids=participant_ids,
mail_dict=mail_dict,
user_id=user_id,
admin_user_ids=admin_user_ids,
is_admin_support_thread=(
thread_entity.is_admin_support_thread if thread_entity else False
),
is_auto_generated=thread_entity.meta_data.get("autoGenerated", False),
)
return mail_dicts


def get_inbox_mail_threads_overview(
org_id: str,
user_id: str,
page: int = 1,
limit: int = 10,
user_is_admin: bool = False,
) -> Dict[str, Any]:
overview_by_threads = inbox_mail.get_overview_by_threads(
org_id=org_id,
user_id=user_id,
page=page,
limit=limit,
user_is_admin=user_is_admin,
)

admin_user_ids = [str(u.id) for u in user.get_admin_users()]

for thread in overview_by_threads["threads"]:
if not thread.get("latest_mail"):
continue
if not user_is_admin:
thread.pop("meta_data", None)

extend_inbox_mail_sender_receiver_names(
participant_ids=thread["participant_ids"],
mail_dict=thread["latest_mail"],
user_id=user_id,
admin_user_ids=admin_user_ids,
is_admin_support_thread=thread.get("is_admin_support_thread", False),
is_auto_generated=thread.get("meta_data", {}).get("autoGenerated", False),
)

return overview_by_threads


def create_auto_generated_inbox_mail_thread_for_error(
org_id: str,
created_by: str,
subject: str,
content: str,
is_important: bool = False,
meta_data: Optional[Dict[str, Any]] = None,
) -> Dict[str, Any]:

inbox_mail.create_by_thread(
org_id=org_id,
sender_id=None,
content=content,
recipient_ids=[],
subject=subject,
meta_data=meta_data,
thread_id=None,
is_important=is_important,
created_by=created_by,
is_admin_support_thread=True,
)


def get_new_inbox_mails_info(
org_id: str, user_id: str, user_is_admin: bool
) -> Dict[str, int]:
return inbox_mail.get_new_inbox_mails(
org_id=org_id, user_id=user_id, user_is_admin=user_is_admin
)


def extend_inbox_mail_sender_receiver_names(
participant_ids: List[str],
mail_dict: Dict[str, Any],
user_id: str,
admin_user_ids: list[str],
is_admin_support_thread: bool = False,
is_auto_generated: bool = False,
) -> None:

recipient_ids = [
rid for rid in participant_ids if str(rid) != str(mail_dict.get("sender_id"))
]
if not is_admin_support_thread:
mail_dict["senderName"] = kratos.resolve_user_name_by_id(mail_dict["sender_id"])
mail_dict["recipientNames"] = [
kratos.resolve_user_name_by_id(rid) for rid in recipient_ids
]
return

user_is_admin = str(user_id) in admin_user_ids
user_is_sender = str(user_id) == str(mail_dict["sender_id"])
sender_is_admin = str(mail_dict["sender_id"]) in admin_user_ids
if is_auto_generated and user_is_admin:

if not (mail_dict.get("sender_id")):
mail_dict["senderName"] = DEFAULT_SYSTEM_AUTOGENERATED_NAME
mail_dict["recipientNames"] = [DEFAULT_KERN_AI_ADMIN_NAME]
else:
mail_dict["senderName"] = DEFAULT_KERN_AI_ADMIN_NAME
mail_dict["recipientNames"] = [DEFAULT_SYSTEM_AUTOGENERATED_NAME]
return
if (user_is_sender and not user_is_admin) or (
user_is_admin and not sender_is_admin
):
mail_dict["senderName"] = kratos.resolve_user_name_by_id(mail_dict["sender_id"])
mail_dict["recipientNames"] = [DEFAULT_KERN_AI_ADMIN_NAME]

elif not user_is_sender and not user_is_admin:
mail_dict["senderName"] = DEFAULT_KERN_AI_ADMIN_NAME
mail_dict["recipientNames"] = [
kratos.resolve_user_name_by_id(rid) for rid in recipient_ids
]
elif user_is_admin and user_is_sender and user_id in participant_ids:
mail_dict["senderName"] = kratos.resolve_user_name_by_id(mail_dict["sender_id"])
mail_dict["recipientNames"] = [DEFAULT_KERN_AI_ADMIN_NAME]
elif user_is_admin and sender_is_admin:
mail_dict["senderName"] = DEFAULT_KERN_AI_ADMIN_NAME
mail_dict["recipientNames"] = [
kratos.resolve_user_name_by_id(rid) for rid in recipient_ids
]
Loading