Skip to content

Commit 6a76fa2

Browse files
committed
ref: make infinite interactors not infinite (#61)
1 parent 10d2312 commit 6a76fa2

29 files changed

+324
-321
lines changed

deploy/dev/docker-compose.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ services:
3939
TTT_MATCHMAKING_MAX_WORKERS: 4
4040
TTT_MATCHMAKING_WORKER_MAX_USERS: 100
4141
TTT_MATCHMAKING_WORKER_CREATION_INTERVAL_SECONDS: 0.5
42+
43+
TTT_AUTO_CANCEL_INVITATIONS_TO_GAME_INTERVAL_SECONDS: 1
4244
secrets:
4345
- secrets
4446
command: ttt-dev

deploy/prod/docker-compose.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,12 @@ services:
3535
TTT_NATS_URL: nats://${NATS_TOKEN}@nats:4222
3636

3737
TTT_GEMINI_URL: ${GEMINI_URL}
38+
39+
TTT_MATCHMAKING_MAX_WORKERS: 4
40+
TTT_MATCHMAKING_WORKER_MAX_USERS: 100
41+
TTT_MATCHMAKING_WORKER_CREATION_INTERVAL_SECONDS: 0.5
42+
43+
TTT_AUTO_CANCEL_INVITATIONS_TO_GAME_INTERVAL_SECONDS: 1
3844
secrets:
3945
- secrets
4046
networks:

src/ttt/application/stars_purchase/complete_stars_purchase_payment.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ class CompleteStarsPurchasePayment:
3232
stars_purchases: StarsPurchases
3333

3434
async def __call__(self) -> None:
35-
async for paid_payment in self.inbox.stream():
35+
async for paid_payment in self.inbox:
3636
current_datetime = await self.clock.current_datetime()
3737

3838
async with self.transaction:
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
from abc import ABC, abstractmethod
2-
from collections.abc import AsyncIterable
2+
from collections.abc import AsyncIterator
33

44
from ttt.application.stars_purchase.dto.common import PaidStarsPurchasePayment
55

@@ -9,4 +9,4 @@ class PaidStarsPurchasePaymentInbox(ABC):
99
async def push(self, payment: PaidStarsPurchasePayment) -> None: ...
1010

1111
@abstractmethod
12-
def stream(self) -> AsyncIterable[PaidStarsPurchasePayment]: ...
12+
def __aiter__(self) -> AsyncIterator[PaidStarsPurchasePayment]: ...

src/ttt/application/stars_purchase/ports/stars_purchase_payment_gateway.py

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
from abc import ABC, abstractmethod
2-
from collections.abc import AsyncIterable
32
from uuid import UUID
43

5-
from ttt.application.stars_purchase.dto.common import PaidStarsPurchasePayment
64
from ttt.entities.core.stars_purchase.stars_purchase import StarsPurchase
75

86

@@ -18,8 +16,3 @@ async def stop_payment_due_to_dublicate(self, payment_id: UUID) -> None: ...
1816

1917
@abstractmethod
2018
async def stop_payment_due_to_error(self, payment_id: UUID) -> None: ...
21-
22-
@abstractmethod
23-
def paid_payment_stream(
24-
self,
25-
) -> AsyncIterable[PaidStarsPurchasePayment]: ...

src/ttt/application/stars_purchase/start_stars_purchase_payment_completion.py

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from dataclasses import dataclass
22

3+
from ttt.application.stars_purchase.dto.common import PaidStarsPurchasePayment
34
from ttt.application.stars_purchase.ports.paid_stars_purchase_payment_inbox import ( # noqa: E501
45
PaidStarsPurchasePaymentInbox,
56
)
@@ -21,12 +22,11 @@ class StartStarsPurchasePaymentCompletion:
2122
views: StarsPurchaseViews
2223
log: StarsPurchaseLog
2324

24-
async def __call__(self) -> None:
25-
async for paid_payment in self.payment_gateway.paid_payment_stream():
26-
await self.inbox.push(paid_payment)
27-
await self.views.stars_purchase_will_be_completed_view(
28-
paid_payment.user_id,
29-
)
30-
await self.log.stars_purchase_payment_completion_started(
31-
paid_payment,
32-
)
25+
async def __call__(self, paid_payment: PaidStarsPurchasePayment) -> None:
26+
await self.inbox.push(paid_payment)
27+
await self.log.stars_purchase_payment_completion_started(
28+
paid_payment,
29+
)
30+
await self.views.stars_purchase_will_be_completed_view(
31+
paid_payment.user_id,
32+
)

src/ttt/infrastructure/adapters/paid_stars_purchase_payment_inbox.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from collections.abc import AsyncIterable
1+
from collections.abc import AsyncIterator
22
from dataclasses import dataclass
33

44
from ttt.application.stars_purchase.dto.common import PaidStarsPurchasePayment
@@ -17,6 +17,6 @@ class InNatsPaidStarsPurchasePaymentInbox(PaidStarsPurchasePaymentInbox):
1717
async def push(self, payment: PaidStarsPurchasePayment) -> None:
1818
await self._inbox.push(payment)
1919

20-
async def stream(self) -> AsyncIterable[PaidStarsPurchasePayment]:
20+
async def __aiter__(self) -> AsyncIterator[PaidStarsPurchasePayment]:
2121
async for payment in self._inbox:
2222
yield payment

src/ttt/infrastructure/background_tasks.py

Lines changed: 0 additions & 38 deletions
This file was deleted.

src/ttt/infrastructure/buffer.py

Lines changed: 0 additions & 29 deletions
This file was deleted.

src/ttt/infrastructure/nats/messages.py

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -7,21 +7,20 @@
77

88
async def at_least_once_messages(
99
subscription: JetStreamContext.PullSubscription,
10-
batch: int = 1,
10+
max_len: int = 1,
1111
timeout: float | None = 5, # noqa: ASYNC109
1212
heartbeat: float | None = None,
1313
) -> AsyncIterator[Msg]:
14-
while True:
15-
try:
16-
messages = await subscription.fetch(batch, timeout, heartbeat)
17-
except NatsTimeoutError:
18-
continue
14+
try:
15+
messages = await subscription.fetch(max_len, timeout, heartbeat)
16+
except NatsTimeoutError:
17+
return
1918

20-
for message in messages:
21-
try:
22-
yield message
23-
except BaseException as error:
24-
await message.nak()
25-
raise error from error
26-
else:
27-
await message.ack()
19+
for message in messages:
20+
try:
21+
yield message
22+
except BaseException as error:
23+
await message.nak()
24+
raise error from error
25+
else:
26+
await message.ack()

0 commit comments

Comments
 (0)