From 6cad942d7cdc64b1cc768f9ea2404e3f085e63e3 Mon Sep 17 00:00:00 2001 From: "Ma, Xiangxiang" Date: Mon, 27 Oct 2025 12:25:47 +0100 Subject: [PATCH 1/2] use ServerSentEvent from sse_starlette Signed-off-by: Ma, Xiangxiang --- application/backend/src/api/endpoints/job_endpoints.py | 6 +++--- application/backend/src/services/job_service.py | 9 ++++----- application/backend/tests/unit/endpoints/test_jobs.py | 7 ++++--- .../backend/tests/unit/services/test_job_service.py | 4 ++-- 4 files changed, 13 insertions(+), 13 deletions(-) diff --git a/application/backend/src/api/endpoints/job_endpoints.py b/application/backend/src/api/endpoints/job_endpoints.py index a0fba9149b..cd2841ba7e 100644 --- a/application/backend/src/api/endpoints/job_endpoints.py +++ b/application/backend/src/api/endpoints/job_endpoints.py @@ -5,7 +5,7 @@ from uuid import UUID from fastapi import APIRouter, Body, Depends -from fastapi.responses import StreamingResponse +from sse_starlette import EventSourceResponse from api.dependencies import get_job_id, get_job_service from api.endpoints import API_PREFIX @@ -39,6 +39,6 @@ async def submit_train_job( async def get_job_logs( job_id: Annotated[UUID, Depends(get_job_id)], job_service: Annotated[JobService, Depends(get_job_service)], -) -> StreamingResponse: +) -> EventSourceResponse: """Endpoint to get the logs of a job by its ID""" - return StreamingResponse(job_service.stream_logs(job_id=job_id), media_type="text/event-stream") + return EventSourceResponse(job_service.stream_logs(job_id=job_id)) diff --git a/application/backend/src/services/job_service.py b/application/backend/src/services/job_service.py index e4eb1ffe52..e771813122 100644 --- a/application/backend/src/services/job_service.py +++ b/application/backend/src/services/job_service.py @@ -3,10 +3,12 @@ import asyncio import datetime import os +from collections.abc import AsyncGenerator from uuid import UUID import anyio from sqlalchemy.exc import IntegrityError +from sse_starlette import ServerSentEvent from db import get_async_db_session_ctx from exceptions import DuplicateJobException, ResourceNotFoundException @@ -75,7 +77,7 @@ async def update_job_status( await repo.update(job, updates) @classmethod - async def stream_logs(cls, job_id: UUID | str): + async def stream_logs(cls, job_id: UUID | str) -> AsyncGenerator[ServerSentEvent, None]: from core.logging.utils import get_job_logs_path log_file = get_job_logs_path(job_id=job_id) @@ -110,8 +112,5 @@ async def is_job_still_running(): continue # No more lines are expected else: - yield "data: DONE\n\n" break - - # Format as an SSE message - yield f"data: {line.rstrip()}\n\n" + yield ServerSentEvent(data=line.rstrip()) diff --git a/application/backend/tests/unit/endpoints/test_jobs.py b/application/backend/tests/unit/endpoints/test_jobs.py index b4036cf949..213ad63d65 100644 --- a/application/backend/tests/unit/endpoints/test_jobs.py +++ b/application/backend/tests/unit/endpoints/test_jobs.py @@ -4,6 +4,7 @@ import pytest from fastapi import status +from sse_starlette import ServerSentEvent from api.dependencies import get_job_service from main import app @@ -41,8 +42,8 @@ def test_get_job_logs_success(fxt_client, fxt_job_service, fxt_job): # Mock the stream_logs generator async def mock_stream(): - yield '{"level": "INFO", "message": "Line 1"}\n' - yield '{"level": "INFO", "message": "Line 2"}\n' + yield ServerSentEvent(data='{"level": "INFO", "message": "Line 1"}') + yield ServerSentEvent(data='{"level": "INFO", "message": "Line 2"}') fxt_job_service.stream_logs.return_value = mock_stream() @@ -52,7 +53,7 @@ async def mock_stream(): # Verify the streamed content content = response.content.decode("utf-8") lines = [line for line in content.split("\n") if line] - assert len(lines) == 2 + assert len(lines) == 4 # 2 events + 2 newlines assert '"level": "INFO"' in lines[0] assert '"message": "Line 1"' in lines[0] diff --git a/application/backend/tests/unit/services/test_job_service.py b/application/backend/tests/unit/services/test_job_service.py index ef84af816d..c3bbf36627 100644 --- a/application/backend/tests/unit/services/test_job_service.py +++ b/application/backend/tests/unit/services/test_job_service.py @@ -206,7 +206,7 @@ async def consume_stream(): def test_stream_logs_success(self, fxt_job_repository, fxt_job): """Test streaming logs successfully from a completed job.""" - log_lines = ['{"level": "INFO", "message": "Line 1"}\n', '{"level": "INFO", "message": "Line 2"}\n'] + log_lines = ['{"level": "INFO", "message": "Line 1"}', '{"level": "INFO", "message": "Line 2"}'] # Mock job as completed completed_job = fxt_job.model_copy(update={"status": JobStatus.COMPLETED}) @@ -240,7 +240,7 @@ async def mock_anyio_open_file(*args, **kwargs): async def consume_stream(): result = [] async for line in JobService.stream_logs(fxt_job.id): - result.append(line) + result.append(line.data) return result result = asyncio.run(consume_stream()) From 5cd863bcc91aa7bf0da23fe5a0984ecba2fb127c Mon Sep 17 00:00:00 2001 From: "Ma, Xiangxiang" Date: Mon, 27 Oct 2025 17:05:48 +0100 Subject: [PATCH 2/2] fix test Signed-off-by: Ma, Xiangxiang --- application/backend/pyproject.toml | 1 + .../tests/unit/services/test_job_service.py | 21 +++++++++++++++---- application/backend/uv.lock | 14 +++++++++++++ 3 files changed, 32 insertions(+), 4 deletions(-) diff --git a/application/backend/pyproject.toml b/application/backend/pyproject.toml index 03755ac72c..9cd9cc3a8b 100644 --- a/application/backend/pyproject.toml +++ b/application/backend/pyproject.toml @@ -33,6 +33,7 @@ dev = [ "flaky~=3.8", "testfixtures~=8.2", "httpx~=0.24", + "freezegun~=1.5", ] lint = [ "ruff~=0.11.2", diff --git a/application/backend/tests/unit/services/test_job_service.py b/application/backend/tests/unit/services/test_job_service.py index c3bbf36627..bbf47b2066 100644 --- a/application/backend/tests/unit/services/test_job_service.py +++ b/application/backend/tests/unit/services/test_job_service.py @@ -1,9 +1,11 @@ # Copyright (C) 2025 Intel Corporation # SPDX-License-Identifier: Apache-2.0 import asyncio +from datetime import datetime, timezone from unittest.mock import AsyncMock, MagicMock, patch import pytest +from freezegun import freeze_time from sqlalchemy.exc import IntegrityError from exceptions import DuplicateJobException, ResourceNotFoundException @@ -147,14 +149,25 @@ def test_get_pending_train_job(self, fxt_job_repository, fxt_job, job_exists, ex fxt_job_repository.get_pending_job_by_type.assert_called_once_with(JobType.TRAINING) @pytest.mark.parametrize( - "has_message,message,expected_updates", + "has_message,message", [ - (True, "Test message", {"status": JobStatus.COMPLETED, "message": "Test message", "progress": 100}), - (False, None, {"status": JobStatus.COMPLETED, "progress": 100}), + (True, "Test message"), + (False, None), ], ) - def test_update_job_status_success(self, fxt_job_repository, fxt_job, has_message, message, expected_updates): + @freeze_time("2025-01-01 00:00:00") + def test_update_job_status_success(self, fxt_job_repository, fxt_job, has_message, message): """Test updating job status successfully with and without message.""" + # Expected updates include end_time since status is COMPLETED + frozen_time = datetime(2025, 1, 1, 0, 0, 0, tzinfo=timezone.utc) + expected_updates = { + "status": JobStatus.COMPLETED, + "end_time": frozen_time, + "progress": 100, + } + if has_message: + expected_updates["message"] = message + # Create an updated job object that the repository would return updated_job = fxt_job.model_copy(update=expected_updates) fxt_job_repository.get_by_id.return_value = fxt_job diff --git a/application/backend/uv.lock b/application/backend/uv.lock index d38de8c58d..5e1109f83e 100644 --- a/application/backend/uv.lock +++ b/application/backend/uv.lock @@ -1317,6 +1317,18 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/cf/58/8acf1b3e91c58313ce5cb67df61001fc9dcd21be4fadb76c1a2d540e09ed/fqdn-1.5.1-py3-none-any.whl", hash = "sha256:3a179af3761e4df6eb2e026ff9e1a3033d3587bf980a0b1b2e1e5d08d7358014", size = 9121, upload-time = "2021-03-11T07:16:28.351Z" }, ] +[[package]] +name = "freezegun" +version = "1.5.5" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "python-dateutil" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/95/dd/23e2f4e357f8fd3bdff613c1fe4466d21bfb00a6177f238079b17f7b1c84/freezegun-1.5.5.tar.gz", hash = "sha256:ac7742a6cc6c25a2c35e9292dfd554b897b517d2dec26891a2e8debf205cb94a", size = 35914, upload-time = "2025-08-09T10:39:08.338Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/5e/2e/b41d8a1a917d6581fc27a35d05561037b048e47df50f27f8ac9c7e27a710/freezegun-1.5.5-py3-none-any.whl", hash = "sha256:cd557f4a75cf074e84bc374249b9dd491eaeacd61376b9eb3c423282211619d2", size = 19266, upload-time = "2025-08-09T10:39:06.636Z" }, +] + [[package]] name = "freia" version = "0.2" @@ -1460,6 +1472,7 @@ mqtt = [ [package.dev-dependencies] dev = [ { name = "flaky" }, + { name = "freezegun" }, { name = "httpx" }, { name = "pdbpp" }, { name = "pre-commit" }, @@ -1500,6 +1513,7 @@ provides-extras = ["mqtt"] [package.metadata.requires-dev] dev = [ { name = "flaky", specifier = "~=3.8" }, + { name = "freezegun", specifier = "~=1.5" }, { name = "httpx", specifier = "~=0.24" }, { name = "pdbpp", specifier = "~=0.10" }, { name = "pre-commit", specifier = "~=4.1" },