Skip to content

Commit 0d24ac4

Browse files
committed
fix: make it launchable (#62)
1 parent 4e66648 commit 0d24ac4

File tree

18 files changed

+318
-107
lines changed

18 files changed

+318
-107
lines changed

deploy/dev/nats/add_streams.sh

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
11
#!/bin/bash
22

3-
nats -s nats://nats:4222 stream add --config /mnt/streams/user.json
3+
for stream_config_file in `ls -lx /mnt/streams`; do
4+
nats -s nats://nats:4222 stream add --config /mnt/streams/$stream_config_file
5+
done

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ ignore = [
117117

118118
[tool.ruff.lint.per-file-ignores]
119119
"src/ttt/infrastructure/sqlalchemy/tables/__init__.py" = ["F401"]
120+
"src/ttt/infrastructure/taskiq/tasks/__init__.py" = ["F401"]
120121
"src/ttt/infrastructure/adapters/*" = ["ARG002"]
121122
"src/ttt/presentation/adapters/*" = ["ARG002"]
122123
"src/ttt/presentation/*" = ["RUF001"]

src/ttt/application/game/game/make_ai_move_in_game.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,7 @@
2626
class MakeAiMoveInGame:
2727
map_: Map
2828
games: Games
29-
game_views: GameViews
30-
users: Users
29+
views: GameViews
3130
uuids: UUIDs
3231
randoms: Randoms
3332
ai_gateway: GameAiGateway
@@ -77,4 +76,4 @@ async def __call__(self, user_id: int, game_id: UUID, ai_id: UUID) -> None:
7776
await self.map_(tracking)
7877
await self.transaction.commit()
7978

80-
await self.game_views.game_view(game)
79+
await self.views.game_view(game)

src/ttt/application/game/game/ports/game_views.py

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,6 @@ async def no_current_game_view(
2525
/,
2626
) -> None: ...
2727

28-
@abstractmethod
29-
async def no_game_with_id_view(
30-
self,
31-
game_id: UUID,
32-
/,
33-
) -> None: ...
34-
3528
@abstractmethod
3629
async def game_already_complteted_view(
3730
self,

src/ttt/infrastructure/adapters/transaction.py

Lines changed: 28 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,16 @@ async def __aexit__(
3232
error: BaseException | None,
3333
traceback: TracebackType | None,
3434
) -> None:
35-
transaction = not_none(self._session.get_transaction())
35+
transaction = self._session.get_transaction()
36+
37+
if transaction is None:
38+
return
39+
3640
with reraise_serialization_error():
37-
await transaction.__aexit__(error_type, error, traceback)
41+
if error is None:
42+
await transaction.commit()
43+
else:
44+
await transaction.rollback()
3845

3946
async def commit(self) -> None:
4047
transaction = not_none(self._session.get_transaction())
@@ -49,7 +56,7 @@ class InPostgresNotSerializableTransaction(NotSerializableTransaction):
4956
async def __aenter__(self) -> Self:
5057
assert_(not self._session.in_transaction())
5158
await self._session.connection(
52-
execution_options={"isolation_level": "READ COMMITED"},
59+
execution_options={"isolation_level": "READ COMMITTED"},
5360
)
5461
return self
5562

@@ -59,8 +66,15 @@ async def __aexit__(
5966
error: BaseException | None,
6067
traceback: TracebackType | None,
6168
) -> None:
62-
transaction = not_none(self._session.get_transaction())
63-
await transaction.__aexit__(error_type, error, traceback)
69+
transaction = self._session.get_transaction()
70+
71+
if transaction is None:
72+
return
73+
74+
if error is None:
75+
await transaction.commit()
76+
else:
77+
await transaction.rollback()
6478

6579
async def commit(self) -> None:
6680
transaction = not_none(self._session.get_transaction())
@@ -83,5 +97,12 @@ async def __aexit__(
8397
error: BaseException | None,
8498
traceback: TracebackType | None,
8599
) -> None:
86-
transaction = not_none(self._session.get_transaction())
87-
await transaction.__aexit__(error_type, error, traceback)
100+
transaction = self._session.get_transaction()
101+
102+
if transaction is None:
103+
return
104+
105+
if error is None:
106+
await transaction.commit()
107+
else:
108+
await transaction.rollback()

src/ttt/infrastructure/adapters/user_locks.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
from pydantic.dataclasses import dataclass
1+
from dataclasses import dataclass
2+
23
from sqlalchemy import select
34
from sqlalchemy.ext.asyncio import AsyncSession
45

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
"""
2+
remove `IS TRUE` from `ix_users_has_matchmaking_waiting`.
3+
4+
Revision ID: 2dcb2be9e277
5+
Revises: 6642791b3bf8
6+
Create Date: 2025-09-24 13:46:33.322702
7+
8+
"""
9+
10+
from collections.abc import Sequence
11+
12+
from alembic import op
13+
14+
15+
revision: str = "2dcb2be9e277"
16+
down_revision: str | None = "6642791b3bf8"
17+
branch_labels: str | Sequence[str] | None = None
18+
depends_on: str | Sequence[str] | None = None
19+
20+
21+
def upgrade() -> None:
22+
op.drop_index(
23+
op.f("ix_users_has_matchmaking_waiting"),
24+
table_name="users",
25+
postgresql_where="(has_matchmaking_waiting IS TRUE)",
26+
)
27+
op.create_index(
28+
op.f("ix_users_has_matchmaking_waiting"),
29+
"users",
30+
["has_matchmaking_waiting"],
31+
unique=False,
32+
postgresql_where="has_matchmaking_waiting",
33+
)
34+
35+
36+
def downgrade() -> None:
37+
op.drop_index(op.f("ix_users_has_matchmaking_waiting"), table_name="users")
38+
op.create_index(
39+
op.f("ix_users_has_matchmaking_waiting"),
40+
"users",
41+
["has_matchmaking_waiting"],
42+
unique=False,
43+
postgresql_where="(has_matchmaking_waiting IS TRUE)",
44+
)

src/ttt/infrastructure/sqlalchemy/tables/user.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ class TableUser(Base[User]):
122122
Index(
123123
"ix_users_has_matchmaking_waiting",
124124
has_matchmaking_waiting,
125-
postgresql_where=(has_matchmaking_waiting.is_(True)),
125+
postgresql_where="has_matchmaking_waiting",
126126
),
127127
)
128128

Lines changed: 143 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,64 +1,174 @@
1-
from collections.abc import AsyncGenerator
2-
from typing import NewType, Protocol
1+
from asyncio import Queue, TaskGroup, gather
2+
from collections.abc import AsyncGenerator, Awaitable, Callable
3+
from dataclasses import dataclass, field
4+
from typing import Any, overload
35

46
from nats.aio.msg import Msg as NatsMessage
57
from nats.errors import TimeoutError as NatsTimeoutError
68
from nats.js import JetStreamContext
79
from taskiq import (
810
AckableMessage,
911
AsyncBroker,
12+
AsyncTaskiqDecoratedTask,
1013
BrokerMessage,
1114
)
1215

1316

14-
class PullSubscribe(Protocol):
17+
@dataclass(frozen=True)
18+
class PullSubscribe:
19+
_func: Callable[
20+
[JetStreamContext, str], Awaitable[JetStreamContext.PullSubscription],
21+
]
22+
1523
async def __call__(
16-
self, js: JetStreamContext, subject: str, /,
17-
) -> JetStreamContext.PullSubscription: ...
24+
self, js: JetStreamContext, subject: str,
25+
) -> JetStreamContext.PullSubscription:
26+
return await self._func(js, subject)
1827

1928

20-
class NatsBroker(AsyncBroker):
21-
_consumer: JetStreamContext.PullSubscription
22-
js: JetStreamContext
29+
@dataclass
30+
class NatsQueue:
31+
_subject: str
32+
_pull_subscribe: PullSubscribe
33+
_pull_consume_batch: int
34+
_pull_consume_timeout: float | None
2335

24-
def __init__(
25-
self,
26-
subject: str,
27-
pull_subscribe: PullSubscribe,
28-
pull_consume_batch: int = 1,
29-
pull_consume_timeout: float | None = 5,
30-
) -> None:
31-
super().__init__()
32-
self._subject = subject
33-
self._pull_subscribe = pull_subscribe
34-
self._pull_consume_batch = pull_consume_batch
35-
self._pull_consume_timeout = pull_consume_timeout
36+
_js: JetStreamContext = field(init=False)
37+
_pull_subscription: JetStreamContext.PullSubscription = field(init=False)
3638

37-
async def startup(self) -> None:
38-
await super().startup()
39-
self._consumer = await self._pull_subscribe(self.js, self._subject)
39+
async def startup(self, js: JetStreamContext) -> None:
40+
self._js = js
41+
self._pull_subscription = await self._pull_subscribe(js, self._subject)
4042

41-
async def kick(self, message: BrokerMessage) -> None:
42-
await self.js.publish(
43+
async def push(self, message: BrokerMessage) -> None:
44+
await self._js.publish(
4345
self._subject,
4446
payload=message.message,
4547
headers=message.labels,
4648
)
4749

48-
async def listen(self) -> AsyncGenerator[AckableMessage]:
50+
async def pull_to(self, output: Queue[AckableMessage]) -> None:
51+
nats_messages: list[NatsMessage]
52+
4953
while True:
5054
try:
51-
nats_messages: list[NatsMessage] = await self._consumer.fetch(
55+
nats_messages = await self._pull_subscription.fetch(
5256
batch=self._pull_consume_batch,
5357
timeout=self._pull_consume_timeout,
5458
)
55-
for nats_message in nats_messages:
56-
yield AckableMessage(
57-
data=nats_message.data,
58-
ack=nats_message.ack,
59-
)
6059
except NatsTimeoutError:
6160
continue
6261

62+
ackable_messages = (
63+
AckableMessage(
64+
data=nats_message.data,
65+
ack=nats_message.ack,
66+
)
67+
for nats_message in nats_messages
68+
)
69+
await gather(*(
70+
output.put(ackable_message)
71+
for ackable_message in ackable_messages
72+
))
73+
74+
75+
class NatsBroker(AsyncBroker):
76+
js: JetStreamContext
77+
pulling_queue: Queue[AckableMessage]
78+
79+
def __init__(
80+
self,
81+
default_subject: str = "taskiq.>",
82+
default_pull_subscribe: PullSubscribe | None = None,
83+
default_pull_consume_batch: int = 1,
84+
default_pull_consume_timeout: float | None = 5,
85+
) -> None:
86+
super().__init__()
87+
self._default_subject = default_subject
88+
self._default_pull_subscribe = (
89+
default_pull_subscribe
90+
or (lambda js, sub: js.pull_subscribe(
91+
sub,
92+
"taskiq",
93+
"TASKIQ",
94+
))
95+
)
96+
self._default_pull_consume_batch = default_pull_consume_batch
97+
self._default_pull_consume_timeout = default_pull_consume_timeout
98+
self._queue_by_task_name = dict[str, NatsQueue]()
99+
100+
@overload
101+
def task[**PmT, RT](
102+
self,
103+
task_name: Callable[PmT, RT],
104+
**labels: Any, # noqa: ANN401
105+
) -> AsyncTaskiqDecoratedTask[PmT, RT]:
106+
...
107+
108+
@overload
109+
def task[**PmT, RT](
110+
self,
111+
task_name: str | None = None,
112+
**labels: Any, # noqa: ANN401
113+
) -> Callable[
114+
[Callable[PmT, RT]],
115+
AsyncTaskiqDecoratedTask[PmT, RT],
116+
]:
117+
...
118+
119+
def task[**PmT, RT](
120+
self,
121+
task_name: str | Callable[PmT, RT] | None = None,
122+
**labels: Any,
123+
) -> Any:
124+
subject = labels.pop("subject", self._default_subject)
125+
pull_subscribe = labels.pop(
126+
"pull_subscribe", self._default_pull_subscribe,
127+
)
128+
pull_consume_batch = labels.pop(
129+
"pull_consume_batch", self._default_pull_consume_batch,
130+
)
131+
pull_consume_timeout = labels.pop(
132+
"pull_consume_timeout", self._default_pull_consume_timeout,
133+
)
134+
queue = NatsQueue(
135+
subject,
136+
pull_subscribe,
137+
pull_consume_batch,
138+
pull_consume_timeout,
139+
)
140+
141+
result = super().task(task_name, **labels)
142+
143+
if isinstance(result, AsyncTaskiqDecoratedTask):
144+
self._queue_by_task_name[result.task_name] = queue
145+
return result
146+
147+
def decorator(
148+
func: Callable[PmT, RT],
149+
) -> AsyncTaskiqDecoratedTask[PmT, RT]:
150+
task = result(func)
151+
self._queue_by_task_name[task.task_name] = queue
152+
return task
153+
154+
return decorator
155+
156+
async def startup(self) -> None:
157+
await super().startup()
158+
await gather(*(
159+
queue.startup(self.js)
160+
for queue in self._queue_by_task_name.values()
161+
))
162+
163+
async def kick(self, message: BrokerMessage) -> None:
164+
await self._queue_by_task_name[message.task_name].push(message)
165+
166+
async def listen(self) -> AsyncGenerator[AckableMessage]:
167+
async with TaskGroup() as pulling_tasks:
168+
for nats_queue in self._queue_by_task_name.values():
169+
pulling_tasks.create_task(
170+
nats_queue.pull_to(self.pulling_queue),
171+
)
63172

64-
NatsBrokers = NewType("NatsBrokers", tuple[NatsBroker, ...])
173+
while True:
174+
yield await self.pulling_queue.get()
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
from ttt.infrastructure.taskiq.tasks.common import nats_tasks
2+
from ttt.infrastructure.taskiq.tasks.complete_stars_purchase_payment_task import ( # noqa: E501
3+
complete_stars_purchase_payment_task,
4+
)
5+
from ttt.infrastructure.taskiq.tasks.make_ai_move_in_game_task import (
6+
make_ai_move_in_game_broker_task,
7+
)

0 commit comments

Comments
 (0)