Skip to content

Commit ecd4795

Browse files
committed
ref: make processors from tasks (#62)
1 parent c9e076d commit ecd4795

File tree

12 files changed

+118
-171
lines changed

12 files changed

+118
-171
lines changed
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
from asyncio import sleep
2+
from dataclasses import dataclass
3+
4+
from structlog.types import FilteringBoundLogger
5+
6+
from ttt.application.invitation_to_game.game.auto_cancel_invitations_to_game import ( # noqa: E501
7+
AutoCancelInvitationsToGame,
8+
)
9+
from ttt.infrastructure.dishka.next_container import NextContainer
10+
from ttt.infrastructure.processors.processor import Processor
11+
from ttt.infrastructure.retrier import Retrier
12+
from ttt.infrastructure.structlog.logger import unexpected_error_logging
13+
14+
15+
@dataclass(frozen=True)
16+
class AutoCancelInvitationsToGameProcessor(Processor):
17+
_interval_seconds: float
18+
_logger: FilteringBoundLogger
19+
20+
async def __call__(self, container: NextContainer) -> None:
21+
while True:
22+
await sleep(self._interval_seconds)
23+
async with unexpected_error_logging(self._logger): # noqa: SIM117
24+
async with container() as request:
25+
retrier = await request.get(Retrier)
26+
cancel_invitations = await request.get(
27+
AutoCancelInvitationsToGame,
28+
)
29+
await retrier(cancel_invitations)

src/ttt/presentation/tasks/matchmake_tasks.py renamed to src/ttt/infrastructure/processors/matchmake_processor.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,13 @@
55

66
from ttt.application.user.game.matchmake import Matchmake
77
from ttt.infrastructure.dishka.next_container import NextContainer
8+
from ttt.infrastructure.processors.processor import Processor
89
from ttt.infrastructure.retrier import Retrier
910
from ttt.infrastructure.structlog.logger import unexpected_error_log
10-
from ttt.presentation.tasks.task import Task
1111

1212

1313
@dataclass
14-
class MatchmakeTasks(Task):
14+
class MatchmakeProcessor(Processor):
1515
_max_workers: int
1616
_worker_creation_interval_seconds: float
1717
_logger: FilteringBoundLogger

src/ttt/presentation/tasks/task.py renamed to src/ttt/infrastructure/processors/processor.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,5 +3,5 @@
33
from ttt.infrastructure.dishka.next_container import NextContainer
44

55

6-
class Task(Protocol):
6+
class Processor(Protocol):
77
async def __call__(self, container: NextContainer, /) -> Any: ... # noqa: ANN401
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
from asyncio import gather
2+
from collections.abc import Sequence
3+
from dataclasses import dataclass
4+
5+
from ttt.infrastructure.dishka.next_container import NextContainer
6+
from ttt.infrastructure.processors.processor import Processor
7+
8+
9+
@dataclass(frozen=True, unsafe_hash=False)
10+
class Processors(Processor):
11+
_processors: Sequence[Processor]
12+
13+
async def __call__(self, container: NextContainer) -> None:
14+
await gather(*(processor(container) for processor in self._processors))

src/ttt/infrastructure/structlog/logger.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
import logging
2+
from collections.abc import AsyncIterator
3+
from contextlib import asynccontextmanager
24
from typing import cast
35

46
import structlog
@@ -49,3 +51,13 @@ async def unexpected_error_log(
4951
error: Exception,
5052
) -> None:
5153
await logger.aexception("unexpected_error", exc_info=error)
54+
55+
56+
@asynccontextmanager
57+
async def unexpected_error_logging(
58+
logger: FilteringBoundLogger,
59+
) -> AsyncIterator[None]:
60+
try:
61+
yield
62+
except Exception as error: # noqa: BLE001
63+
await unexpected_error_log(logger, error)

src/ttt/main/common/di.py

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
from asyncio import Queue
22
from collections.abc import AsyncIterator
3+
from typing import Annotated
34

4-
from dishka import Provider, Scope, provide
5+
from dishka import FromComponent, Provider, Scope, provide
56
from nats import connect as connect_to_nats
67
from nats.aio.client import Client as Nats
78
from nats.js import JetStreamContext
@@ -11,6 +12,7 @@
1112
AsyncSession,
1213
create_async_engine,
1314
)
15+
from structlog.types import FilteringBoundLogger
1416
from taskiq.receiver import Receiver
1517

1618
from ttt.application.common.errors.serialization_error import SerializationError
@@ -105,6 +107,11 @@
105107
from ttt.infrastructure.adapters.users import InPostgresUsers
106108
from ttt.infrastructure.adapters.uuids import UUIDv4s
107109
from ttt.infrastructure.openai.gemini import Gemini, gemini
110+
from ttt.infrastructure.processors.auto_cancel_invitations_to_game_processor import ( # noqa: E501
111+
AutoCancelInvitationsToGameProcessor,
112+
)
113+
from ttt.infrastructure.processors.matchmake_processor import MatchmakeProcessor
114+
from ttt.infrastructure.processors.processors import Processors
108115
from ttt.infrastructure.pydantic_settings.envs import Envs
109116
from ttt.infrastructure.pydantic_settings.secrets import Secrets
110117
from ttt.infrastructure.retrier import Retrier
@@ -358,3 +365,43 @@ def provide_retrier(self, envs: Envs) -> Retrier:
358365
})
359366

360367
provide_retry = provide(RetrierRetry, provides=Retry, scope=Scope.REQUEST)
368+
369+
@provide(scope=Scope.APP)
370+
def provide_auto_cancel_invitations_to_game_task(
371+
self,
372+
envs: Envs,
373+
logger: Annotated[FilteringBoundLogger, FromComponent("app")],
374+
) -> AutoCancelInvitationsToGameProcessor:
375+
return AutoCancelInvitationsToGameProcessor(
376+
_interval_seconds=(
377+
envs.auto_cancel_invitations_to_game_interval_seconds
378+
),
379+
_logger=logger,
380+
)
381+
382+
@provide(scope=Scope.APP)
383+
def provide_matchmake_processor(
384+
self,
385+
envs: Envs,
386+
logger: Annotated[FilteringBoundLogger, FromComponent("app")],
387+
) -> MatchmakeProcessor:
388+
return MatchmakeProcessor(
389+
_max_workers=envs.matchmaking_max_workers,
390+
_worker_creation_interval_seconds=(
391+
envs.matchmaking_worker_creation_interval_seconds
392+
),
393+
_logger=logger,
394+
)
395+
396+
@provide(scope=Scope.APP)
397+
async def processors(
398+
self,
399+
auto_cancel_invitations_to_game_processor: (
400+
AutoCancelInvitationsToGameProcessor
401+
),
402+
matchmake_processor: MatchmakeProcessor,
403+
) -> Processors:
404+
return Processors((
405+
auto_cancel_invitations_to_game_processor,
406+
matchmake_processor,
407+
))

src/ttt/main/tg_bot/di.py

Lines changed: 1 addition & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
from collections.abc import AsyncIterator
22
from dataclasses import dataclass
3-
from typing import Annotated, cast
3+
from typing import cast
44

55
from aiogram import Bot, Dispatcher
66
from aiogram.fsm.storage.base import BaseStorage, DefaultKeyBuilder
@@ -15,15 +15,13 @@
1515
from aiogram_dialog.manager.bg_manager import BgManagerFactoryImpl
1616
from aiogram_dialog.manager.manager import ManagerImpl
1717
from dishka import (
18-
FromComponent,
1918
Provider,
2019
Scope,
2120
from_context,
2221
provide,
2322
)
2423
from dishka.integrations.aiogram import AiogramMiddlewareData
2524
from redis.asyncio import Redis
26-
from structlog.types import FilteringBoundLogger
2725

2826
from ttt.application.common.ports.emojis import Emojis
2927
from ttt.application.game.game.cancel_game import CancelGame
@@ -120,7 +118,6 @@
120118
from ttt.application.user.view_other_user import ViewOtherUser
121119
from ttt.application.user.view_user import ViewUser
122120
from ttt.application.user.view_user_emojis import ViewUserEmojis
123-
from ttt.infrastructure.pydantic_settings.envs import Envs
124121
from ttt.infrastructure.pydantic_settings.secrets import Secrets
125122
from ttt.presentation.adapters.emojis import PictographsAsEmojis
126123
from ttt.presentation.adapters.game_views import (
@@ -151,12 +148,6 @@
151148
)
152149
from ttt.presentation.aiogram_dialog.main_dialog import main_dialog
153150
from ttt.presentation.result_buffer import ResultBuffer
154-
from ttt.presentation.tasks.auto_cancel_invitations_to_game_task import (
155-
AutoCancelInvitationsToGameTask,
156-
)
157-
from ttt.presentation.tasks.matchmake_tasks import MatchmakeTasks
158-
from ttt.presentation.tasks.unkillable_tasks import UnkillableTasks
159-
from ttt.presentation.unkillable_task_group import UnkillableTaskGroup
160151

161152

162153
@dataclass
@@ -253,50 +244,6 @@ async def provide_bot(self, secrets: Secrets) -> AsyncIterator[Bot]:
253244
def provide_result_buffer(self) -> ResultBuffer:
254245
return ResultBuffer()
255246

256-
@provide(scope=Scope.APP)
257-
def provide_auto_cancel_invitations_to_game_task(
258-
self, envs: Envs,
259-
) -> AutoCancelInvitationsToGameTask:
260-
return AutoCancelInvitationsToGameTask(
261-
_interval_seconds=(
262-
envs.auto_cancel_invitations_to_game_interval_seconds
263-
),
264-
)
265-
266-
@provide(scope=Scope.APP)
267-
def provide_matchmake_tasks(
268-
self,
269-
envs: Envs,
270-
logger: Annotated[FilteringBoundLogger, FromComponent("app")],
271-
) -> MatchmakeTasks:
272-
return MatchmakeTasks(
273-
_max_workers=envs.matchmaking_max_workers,
274-
_worker_creation_interval_seconds=(
275-
envs.matchmaking_worker_creation_interval_seconds
276-
),
277-
_logger=logger,
278-
)
279-
280-
@provide(scope=Scope.APP)
281-
async def unkillable_task_group(
282-
self, logger: Annotated[FilteringBoundLogger, FromComponent("app")],
283-
) -> AsyncIterator[UnkillableTaskGroup]:
284-
async with UnkillableTaskGroup(logger) as group:
285-
yield group
286-
287-
@provide(scope=Scope.APP)
288-
async def unkillable_tasks(
289-
self,
290-
task_group: UnkillableTaskGroup,
291-
auto_cancel_invitations_to_game_task: AutoCancelInvitationsToGameTask,
292-
matchmake_tasks: MatchmakeTasks,
293-
) -> UnkillableTasks:
294-
tasks = (
295-
auto_cancel_invitations_to_game_task,
296-
matchmake_tasks,
297-
)
298-
return UnkillableTasks(tasks, task_group)
299-
300247
@provide(scope=Scope.APP)
301248
def provide_dp(self, storage: BaseStorage) -> Dispatcher:
302249
dp = Dispatcher(name="main", storage=storage)

src/ttt/main/tg_bot/start_tg_bot.py

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,21 @@
11
import logging
2+
from asyncio import CancelledError, TaskGroup
3+
from contextlib import suppress
24

35
from aiogram import Bot, Dispatcher
46
from aiogram.types import TelegramObject
57
from dishka import AsyncContainer
6-
from dishka.integrations.aiogram import (
7-
AiogramMiddlewareData,
8-
)
8+
from dishka.integrations.aiogram import AiogramMiddlewareData
99
from taskiq import TaskiqMessage
1010

11+
from ttt.infrastructure.processors.processors import Processors
1112
from ttt.infrastructure.taskiq.broker import NatsBroker
1213
from ttt.infrastructure.taskiq.middlewares import TaskiqNextContainerMiddleware
1314
from ttt.infrastructure.taskiq.worker import TaskiqBgWorker
1415
from ttt.main.common.next_container import NextContainerWithFilledContext
1516
from ttt.presentation.aiogram.common.middlewares import (
1617
AiogramNextContainerMiddleware,
1718
)
18-
from ttt.presentation.tasks.unkillable_tasks import UnkillableTasks
1919

2020

2121
async def start_tg_bot(container: AsyncContainer) -> None:
@@ -33,14 +33,17 @@ async def start_tg_bot(container: AsyncContainer) -> None:
3333
nats_broker.add_middlewares(TaskiqNextContainerMiddleware(next_container))
3434

3535
taskiq_bg_worker = await container.get(TaskiqBgWorker)
36-
tasks = await container.get(UnkillableTasks)
36+
processors = await container.get(Processors)
3737
bot = await container.get(Bot)
3838

3939
logging.basicConfig(level=logging.INFO)
4040

4141
try:
42-
await tasks(next_container)
43-
await taskiq_bg_worker()
44-
await dp.start_polling(bot)
42+
with suppress(CancelledError):
43+
async with TaskGroup() as tasks:
44+
tasks.create_task(processors(next_container))
45+
await taskiq_bg_worker()
46+
await dp.start_polling(bot)
47+
raise CancelledError
4548
finally:
4649
await container.close()

src/ttt/presentation/tasks/auto_cancel_invitations_to_game_task.py

Lines changed: 0 additions & 24 deletions
This file was deleted.

0 commit comments

Comments
 (0)