Skip to content

Commit e0b7dc9

Browse files
committed
refac: Attempt to use PriorityQueue
1 parent f213a52 commit e0b7dc9

File tree

4 files changed

+42
-16
lines changed

4 files changed

+42
-16
lines changed

statemachine/engines/async_.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import asyncio
2-
import heapq
32
from time import time
43
from typing import TYPE_CHECKING
54

@@ -63,8 +62,8 @@ async def processing_loop(self):
6362
first_result = self._sentinel
6463
try:
6564
# Execute the triggers in the queue in FIFO order until the queue is empty
66-
while self._external_queue:
67-
trigger_data = heapq.heappop(self._external_queue)
65+
while self._running and not self.empty():
66+
trigger_data = self.pop()
6867
current_time = time()
6968
if trigger_data.execution_time > current_time:
7069
self.put(trigger_data)
@@ -77,7 +76,7 @@ async def processing_loop(self):
7776
except Exception:
7877
# Whe clear the queue as we don't have an expected behavior
7978
# and cannot keep processing
80-
self._external_queue.clear()
79+
self.clear()
8180
raise
8281
finally:
8382
self._processing.release()

statemachine/engines/base.py

Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
import heapq
1+
from queue import PriorityQueue
2+
from queue import Queue
23
from threading import Lock
34
from typing import TYPE_CHECKING
45
from weakref import proxy
@@ -15,19 +16,46 @@
1516

1617
class BaseEngine:
1718
def __init__(self, sm: "StateMachine", rtc: bool = True):
18-
self.sm: StateMachine = proxy(sm)
19-
self._external_queue: list = []
2019
self._sentinel = object()
2120
self._rtc = rtc
22-
self._processing = Lock()
2321
self._running = True
22+
self._init(sm)
23+
24+
def _init(self, sm: "StateMachine"):
25+
self.sm: StateMachine = proxy(sm)
26+
self._external_queue: Queue = PriorityQueue()
27+
self._processing = Lock()
28+
29+
def __getstate__(self) -> dict:
30+
state = self.__dict__.copy()
31+
del state["_external_queue"]
32+
del state["_processing"]
33+
del state["sm"]
34+
return state
35+
36+
def __setstate__(self, state: dict) -> None:
37+
for attr, value in state.items():
38+
setattr(self, attr, value)
39+
40+
def empty(self):
41+
return self._external_queue.qsize() == 0
2442

2543
def put(self, trigger_data: TriggerData):
2644
"""Put the trigger on the queue without blocking the caller."""
2745
if not self._running and not self.sm.allow_event_without_transition:
2846
raise TransitionNotAllowed(trigger_data.event, self.sm.current_state)
2947

30-
heapq.heappush(self._external_queue, trigger_data)
48+
self._external_queue.put(trigger_data)
49+
50+
def pop(self):
51+
try:
52+
return self._external_queue.get(block=False)
53+
except Exception:
54+
return None
55+
56+
def clear(self):
57+
with self._external_queue.mutex:
58+
self._external_queue.queue.clear()
3159

3260
def start(self):
3361
if self.sm.current_state_value is not None:

statemachine/engines/sync.py

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import heapq
21
from time import sleep
32
from time import time
43
from typing import TYPE_CHECKING
@@ -50,7 +49,7 @@ def processing_loop(self):
5049
"""
5150
if not self._rtc:
5251
# The machine is in "synchronous" mode
53-
trigger_data = heapq.heappop(self._external_queue)
52+
trigger_data = self.pop()
5453
return self._trigger(trigger_data)
5554

5655
# We make sure that only the first event enters the processing critical section,
@@ -64,8 +63,8 @@ def processing_loop(self):
6463
first_result = self._sentinel
6564
try:
6665
# Execute the triggers in the queue in FIFO order until the queue is empty
67-
while self._running and self._external_queue:
68-
trigger_data = heapq.heappop(self._external_queue)
66+
while self._running and not self.empty():
67+
trigger_data = self.pop()
6968
current_time = time()
7069
if trigger_data.execution_time > current_time:
7170
self.put(trigger_data)
@@ -78,7 +77,7 @@ def processing_loop(self):
7877
except Exception:
7978
# Whe clear the queue as we don't have an expected behavior
8079
# and cannot keep processing
81-
self._external_queue.clear()
80+
self.clear()
8281
raise
8382
finally:
8483
self._processing.release()
@@ -137,7 +136,7 @@ def _activate(self, trigger_data: TriggerData, transition: "Transition"): # noq
137136
self.sm._callbacks.call(transition.after.key, *args, **kwargs)
138137

139138
if target.final:
140-
self._external_queue.clear()
139+
self.clear()
141140
self._running = False
142141

143142
if len(result) == 0:

statemachine/statemachine.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ def __setstate__(self, state):
148148
self._listeners: Dict[Any, Any] = {}
149149

150150
self._register_callbacks([])
151-
self.add_listener(*listeners.keys())
151+
self.add_listener(*listeners.values())
152152
self._engine = self._get_engine(rtc)
153153

154154
def _get_initial_state(self):

0 commit comments

Comments
 (0)