Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions application/backend/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ dev = [
"flaky~=3.8",
"testfixtures~=8.2",
"httpx~=0.24",
"freezegun~=1.5",
]
lint = [
"ruff~=0.11.2",
Expand Down
6 changes: 3 additions & 3 deletions application/backend/src/api/endpoints/job_endpoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from uuid import UUID

from fastapi import APIRouter, Body, Depends
from fastapi.responses import StreamingResponse
from sse_starlette import EventSourceResponse

from api.dependencies import get_job_id, get_job_service
from api.endpoints import API_PREFIX
Expand Down Expand Up @@ -39,6 +39,6 @@ async def submit_train_job(
async def get_job_logs(
job_id: Annotated[UUID, Depends(get_job_id)],
job_service: Annotated[JobService, Depends(get_job_service)],
) -> StreamingResponse:
) -> EventSourceResponse:
"""Endpoint to get the logs of a job by its ID"""
return StreamingResponse(job_service.stream_logs(job_id=job_id), media_type="text/event-stream")
return EventSourceResponse(job_service.stream_logs(job_id=job_id))
9 changes: 4 additions & 5 deletions application/backend/src/services/job_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@
import asyncio
import datetime
import os
from collections.abc import AsyncGenerator
from uuid import UUID

import anyio
from sqlalchemy.exc import IntegrityError
from sse_starlette import ServerSentEvent

from db import get_async_db_session_ctx
from exceptions import DuplicateJobException, ResourceNotFoundException
Expand Down Expand Up @@ -75,7 +77,7 @@ async def update_job_status(
await repo.update(job, updates)

@classmethod
async def stream_logs(cls, job_id: UUID | str):
async def stream_logs(cls, job_id: UUID | str) -> AsyncGenerator[ServerSentEvent, None]:
from core.logging.utils import get_job_logs_path

log_file = get_job_logs_path(job_id=job_id)
Expand Down Expand Up @@ -110,8 +112,5 @@ async def is_job_still_running():
continue
# No more lines are expected
else:
yield "data: DONE\n\n"
break

# Format as an SSE message
yield f"data: {line.rstrip()}\n\n"
yield ServerSentEvent(data=line.rstrip())
7 changes: 4 additions & 3 deletions application/backend/tests/unit/endpoints/test_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import pytest
from fastapi import status
from sse_starlette import ServerSentEvent

from api.dependencies import get_job_service
from main import app
Expand Down Expand Up @@ -41,8 +42,8 @@ def test_get_job_logs_success(fxt_client, fxt_job_service, fxt_job):

# Mock the stream_logs generator
async def mock_stream():
yield '{"level": "INFO", "message": "Line 1"}\n'
yield '{"level": "INFO", "message": "Line 2"}\n'
yield ServerSentEvent(data='{"level": "INFO", "message": "Line 1"}')
yield ServerSentEvent(data='{"level": "INFO", "message": "Line 2"}')

fxt_job_service.stream_logs.return_value = mock_stream()

Expand All @@ -52,7 +53,7 @@ async def mock_stream():
# Verify the streamed content
content = response.content.decode("utf-8")
lines = [line for line in content.split("\n") if line]
assert len(lines) == 2
assert len(lines) == 4 # 2 events + 2 newlines
assert '"level": "INFO"' in lines[0]
assert '"message": "Line 1"' in lines[0]

Expand Down
25 changes: 19 additions & 6 deletions application/backend/tests/unit/services/test_job_service.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
# Copyright (C) 2025 Intel Corporation
# SPDX-License-Identifier: Apache-2.0
import asyncio
from datetime import datetime, timezone
from unittest.mock import AsyncMock, MagicMock, patch

import pytest
from freezegun import freeze_time
from sqlalchemy.exc import IntegrityError

from exceptions import DuplicateJobException, ResourceNotFoundException
Expand Down Expand Up @@ -147,14 +149,25 @@ def test_get_pending_train_job(self, fxt_job_repository, fxt_job, job_exists, ex
fxt_job_repository.get_pending_job_by_type.assert_called_once_with(JobType.TRAINING)

@pytest.mark.parametrize(
"has_message,message,expected_updates",
"has_message,message",
[
(True, "Test message", {"status": JobStatus.COMPLETED, "message": "Test message", "progress": 100}),
(False, None, {"status": JobStatus.COMPLETED, "progress": 100}),
(True, "Test message"),
(False, None),
],
)
def test_update_job_status_success(self, fxt_job_repository, fxt_job, has_message, message, expected_updates):
@freeze_time("2025-01-01 00:00:00")
def test_update_job_status_success(self, fxt_job_repository, fxt_job, has_message, message):
"""Test updating job status successfully with and without message."""
# Expected updates include end_time since status is COMPLETED
frozen_time = datetime(2025, 1, 1, 0, 0, 0, tzinfo=timezone.utc)
expected_updates = {
"status": JobStatus.COMPLETED,
"end_time": frozen_time,
"progress": 100,
}
if has_message:
expected_updates["message"] = message

# Create an updated job object that the repository would return
updated_job = fxt_job.model_copy(update=expected_updates)
fxt_job_repository.get_by_id.return_value = fxt_job
Expand Down Expand Up @@ -206,7 +219,7 @@ async def consume_stream():

def test_stream_logs_success(self, fxt_job_repository, fxt_job):
"""Test streaming logs successfully from a completed job."""
log_lines = ['{"level": "INFO", "message": "Line 1"}\n', '{"level": "INFO", "message": "Line 2"}\n']
log_lines = ['{"level": "INFO", "message": "Line 1"}', '{"level": "INFO", "message": "Line 2"}']

# Mock job as completed
completed_job = fxt_job.model_copy(update={"status": JobStatus.COMPLETED})
Expand Down Expand Up @@ -240,7 +253,7 @@ async def mock_anyio_open_file(*args, **kwargs):
async def consume_stream():
result = []
async for line in JobService.stream_logs(fxt_job.id):
result.append(line)
result.append(line.data)
return result

result = asyncio.run(consume_stream())
Expand Down
14 changes: 14 additions & 0 deletions application/backend/uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading