1
+ import logging
1
2
from dataclasses import dataclass
2
3
from itertools import chain
3
4
from queue import PriorityQueue
4
5
from queue import Queue
5
6
from threading import Lock
6
7
from typing import TYPE_CHECKING
8
+ from typing import Any
7
9
from typing import Callable
10
+ from typing import Dict
8
11
from typing import List
12
+ from typing import cast
9
13
from weakref import proxy
10
14
11
15
from ..event import BoundEvent
20
24
if TYPE_CHECKING :
21
25
from ..statemachine import StateMachine
22
26
27
+ logger = logging .getLogger (__name__ )
28
+
23
29
24
30
@dataclass (frozen = True , unsafe_hash = True )
25
31
class StateTransition :
@@ -76,7 +82,7 @@ def empty(self):
76
82
def put (self , trigger_data : TriggerData , internal : bool = False ):
77
83
"""Put the trigger on the queue without blocking the caller."""
78
84
if not self .running and not self .sm .allow_event_without_transition :
79
- raise TransitionNotAllowed (trigger_data .event , self .sm .current_state )
85
+ raise TransitionNotAllowed (trigger_data .event , self .sm .configuration )
80
86
81
87
if internal :
82
88
self .internal_queue .put (trigger_data )
@@ -117,7 +123,9 @@ def _conditions_match(self, transition: Transition, trigger_data: TriggerData):
117
123
self .sm ._callbacks .call (transition .validators .key , * args , ** kwargs )
118
124
return self .sm ._callbacks .all (transition .cond .key , * args , ** kwargs )
119
125
120
- def _filter_conflicting_transitions (self , transitions : OrderedSet [Transition ]):
126
+ def _filter_conflicting_transitions (
127
+ self , transitions : OrderedSet [Transition ]
128
+ ) -> OrderedSet [Transition ]:
121
129
"""
122
130
Remove transições conflitantes, priorizando aquelas com estados de origem descendentes
123
131
ou que aparecem antes na ordem do documento.
@@ -128,18 +136,18 @@ def _filter_conflicting_transitions(self, transitions: OrderedSet[Transition]):
128
136
Returns:
129
137
OrderedSet[Transition]: Conjunto de transições sem conflitos.
130
138
"""
131
- filtered_transitions = OrderedSet ()
139
+ filtered_transitions = OrderedSet [ Transition ] ()
132
140
133
141
# Ordena as transições na ordem dos estados que as selecionaram
134
142
for t1 in transitions :
135
143
t1_preempted = False
136
- transitions_to_remove = OrderedSet ()
144
+ transitions_to_remove = OrderedSet [ Transition ] ()
137
145
138
146
# Verifica conflitos com as transições já filtradas
139
147
for t2 in filtered_transitions :
140
148
# Calcula os conjuntos de saída (exit sets)
141
- t1_exit_set = self ._compute_exit_set (t1 )
142
- t2_exit_set = self ._compute_exit_set (t2 )
149
+ t1_exit_set = self ._compute_exit_set ([ t1 ] )
150
+ t2_exit_set = self ._compute_exit_set ([ t2 ] )
143
151
144
152
# Verifica interseção dos conjuntos de saída
145
153
if t1_exit_set & t2_exit_set : # Há interseção
@@ -162,7 +170,7 @@ def _filter_conflicting_transitions(self, transitions: OrderedSet[Transition]):
162
170
def _compute_exit_set (self , transitions : List [Transition ]) -> OrderedSet [StateTransition ]:
163
171
"""Compute the exit set for a transition."""
164
172
165
- states_to_exit = OrderedSet ()
173
+ states_to_exit = OrderedSet [ StateTransition ] ()
166
174
167
175
for transition in transitions :
168
176
if transition .target is None :
@@ -193,6 +201,8 @@ def get_transition_domain(self, transition: Transition) -> "State | None":
193
201
and all (state .is_descendant (transition .source ) for state in states )
194
202
):
195
203
return transition .source
204
+ elif transition .internal and transition .is_self and transition .target .is_atomic :
205
+ return transition .source
196
206
else :
197
207
return self .find_lcca ([transition .source ] + list (states ))
198
208
@@ -213,6 +223,7 @@ def find_lcca(states: List[State]) -> "State | None":
213
223
ancestors = [anc for anc in head .ancestors () if anc .is_compound ]
214
224
215
225
# Find the first ancestor that is also an ancestor of all other states in the list
226
+ ancestor : State
216
227
for ancestor in ancestors :
217
228
if all (state .is_descendant (ancestor ) for state in tail ):
218
229
return ancestor
@@ -233,13 +244,16 @@ def _select_transitions(
233
244
self , trigger_data : TriggerData , predicate : Callable
234
245
) -> OrderedSet [Transition ]:
235
246
"""Select the transitions that match the trigger data."""
236
- enabled_transitions = OrderedSet ()
247
+ enabled_transitions = OrderedSet [ Transition ] ()
237
248
238
249
# Get atomic states, TODO: sorted by document order
239
250
atomic_states = (state for state in self .sm .configuration if state .is_atomic )
240
251
241
- def first_transition_that_matches (state : State , event : Event ) -> "Transition | None" :
252
+ def first_transition_that_matches (
253
+ state : State , event : "Event | None"
254
+ ) -> "Transition | None" :
242
255
for s in chain ([state ], state .ancestors ()):
256
+ transition : Transition
243
257
for transition in s .transitions :
244
258
if (
245
259
not transition .initial
@@ -248,6 +262,8 @@ def first_transition_that_matches(state: State, event: Event) -> "Transition | N
248
262
):
249
263
return transition
250
264
265
+ return None
266
+
251
267
for state in atomic_states :
252
268
transition = first_transition_that_matches (state , trigger_data .event )
253
269
if transition is not None :
@@ -264,6 +280,7 @@ def microstep(self, transitions: List[Transition], trigger_data: TriggerData):
264
280
)
265
281
266
282
states_to_exit = self ._exit_states (transitions , trigger_data )
283
+ logger .debug ("States to exit: %s" , states_to_exit )
267
284
result += self ._execute_transition_content (transitions , trigger_data , lambda t : t .on .key )
268
285
self ._enter_states (transitions , trigger_data , states_to_exit )
269
286
self ._execute_transition_content (
@@ -304,7 +321,7 @@ def _exit_states(self, enabled_transitions: List[Transition], trigger_data: Trig
304
321
# self.history_values[history.id] = history_value
305
322
306
323
# Execute `onexit` handlers
307
- if info .source is not None and not info .transition .internal :
324
+ if info .source is not None : # TODO: and not info.transition.internal:
308
325
self .sm ._callbacks .call (info .source .exit .key , * args , ** kwargs )
309
326
310
327
# TODO: Cancel invocations
@@ -343,22 +360,29 @@ def _enter_states(
343
360
"""Enter the states as determined by the given transitions."""
344
361
states_to_enter = OrderedSet [StateTransition ]()
345
362
states_for_default_entry = OrderedSet [StateTransition ]()
346
- default_history_content = {}
363
+ default_history_content : Dict [ str , Any ] = {}
347
364
348
365
# Compute the set of states to enter
349
366
self .compute_entry_set (
350
367
enabled_transitions , states_to_enter , states_for_default_entry , default_history_content
351
368
)
352
369
353
370
# We update the configuration atomically
354
- states_targets_to_enter = OrderedSet (info .target for info in states_to_enter )
371
+ states_targets_to_enter = OrderedSet (
372
+ info .target for info in states_to_enter if info .target
373
+ )
374
+ logger .debug ("States to enter: %s" , states_targets_to_enter )
375
+
355
376
configuration = self .sm .configuration
356
- self .sm .configuration = (configuration - states_to_exit ) | states_targets_to_enter
377
+ self .sm .configuration = cast (
378
+ OrderedSet [State ], (configuration - states_to_exit ) | states_targets_to_enter
379
+ )
357
380
358
381
# Sort states to enter in entry order
359
382
# for state in sorted(states_to_enter, key=self.entry_order): # TODO: ordegin of states_to_enter # noqa: E501
360
383
for info in states_to_enter :
361
384
target = info .target
385
+ assert target
362
386
transition = info .transition
363
387
event_data = EventData (trigger_data = trigger_data , transition = transition )
364
388
event_data .state = target
@@ -376,8 +400,8 @@ def _enter_states(
376
400
# state.is_first_entry = False
377
401
378
402
# Execute `onentry` handlers
379
- if not transition .internal :
380
- self .sm ._callbacks .call (target .enter .key , * args , ** kwargs )
403
+ # TODO: if not transition.internal:
404
+ self .sm ._callbacks .call (target .enter .key , * args , ** kwargs )
381
405
382
406
# Handle default initial states
383
407
# TODO: Handle default initial states
@@ -396,11 +420,17 @@ def _enter_states(
396
420
parent = target .parent
397
421
grandparent = parent .parent
398
422
399
- self .internal_queue .put (BoundEvent (f"done.state.{ parent .id } " , _sm = self .sm ))
423
+ self .internal_queue .put (
424
+ BoundEvent (f"done.state.{ parent .id } " , _sm = self .sm ).build_trigger (
425
+ machine = self .sm
426
+ )
427
+ )
400
428
if grandparent .parallel :
401
429
if all (child .final for child in grandparent .states ):
402
430
self .internal_queue .put (
403
- BoundEvent (f"done.state.{ parent .id } " , _sm = self .sm )
431
+ BoundEvent (f"done.state.{ parent .id } " , _sm = self .sm ).build_trigger (
432
+ machine = self .sm
433
+ )
404
434
)
405
435
406
436
def compute_entry_set (
@@ -476,8 +506,19 @@ def add_descendant_states_to_enter(
476
506
# return
477
507
478
508
# Add the state to the entry set
479
- states_to_enter .add (info )
509
+ if (
510
+ not self .sm .enable_self_transition_entries
511
+ and info .transition .internal
512
+ and (
513
+ info .transition .is_self
514
+ or info .transition .target .is_descendant (info .transition .source )
515
+ )
516
+ ):
517
+ pass
518
+ else :
519
+ states_to_enter .add (info )
480
520
state = info .target
521
+ assert state
481
522
482
523
if state .is_compound :
483
524
# Handle compound states
@@ -541,6 +582,7 @@ def add_ancestor_states_to_enter(
541
582
default_history_content: A dictionary to hold temporary content for history states.
542
583
"""
543
584
state = info .target
585
+ assert state
544
586
for anc in state .ancestors (parent = ancestor ):
545
587
# Add the ancestor to the entry set
546
588
info_to_add = StateTransition (
0 commit comments