66
77use Closure ;
88use Patchlevel \EventSourcing \Aggregate \AggregateHeader ;
9+ use Patchlevel \EventSourcing \Clock \SystemClock ;
910use Patchlevel \EventSourcing \Message \HeaderNotFound ;
1011use Patchlevel \EventSourcing \Message \Message ;
12+ use Patchlevel \EventSourcing \Metadata \Event \EventRegistry ;
1113use Patchlevel \EventSourcing \Store \Criteria \AggregateIdCriterion ;
1214use Patchlevel \EventSourcing \Store \Criteria \AggregateNameCriterion ;
1315use Patchlevel \EventSourcing \Store \Criteria \ArchivedCriterion ;
1416use Patchlevel \EventSourcing \Store \Criteria \Criteria ;
17+ use Patchlevel \EventSourcing \Store \Criteria \EventsCriterion ;
1518use Patchlevel \EventSourcing \Store \Criteria \FromIndexCriterion ;
1619use Patchlevel \EventSourcing \Store \Criteria \FromPlayheadCriterion ;
1720use Patchlevel \EventSourcing \Store \Criteria \StreamCriterion ;
21+ use Patchlevel \EventSourcing \Store \Criteria \ToIndexCriterion ;
22+ use Patchlevel \EventSourcing \Store \Header \EventIdHeader ;
23+ use Patchlevel \EventSourcing \Store \Header \IndexHeader ;
1824use Patchlevel \EventSourcing \Store \Header \PlayheadHeader ;
25+ use Patchlevel \EventSourcing \Store \Header \RecordedOnHeader ;
1926use Patchlevel \EventSourcing \Store \Header \StreamNameHeader ;
27+ use Psr \Clock \ClockInterface ;
28+ use Ramsey \Uuid \Uuid ;
29+ use Throwable ;
2030
2131use function array_filter ;
2232use function array_map ;
23- use function array_push ;
2433use function array_reverse ;
2534use function array_slice ;
2635use function array_unique ;
3544
3645final class InMemoryStore implements StreamStore
3746{
38- /** @param array<positive-int|0, Message> $messages */
47+ /** @var array<0|positive-int, Message> */
48+ private array $ messages = [];
49+
50+ /** @param list<Message> $messages */
3951 public function __construct (
40- private array $ messages = [],
52+ array $ messages = [],
53+ private readonly EventRegistry |null $ eventRegistry = null ,
54+ private readonly ClockInterface $ clock = new SystemClock (),
4155 ) {
56+ $ this ->save (...$ messages );
4257 }
4358
4459 public function load (
@@ -71,7 +86,27 @@ public function count(Criteria|null $criteria = null): int
7186
7287 public function save (Message ...$ messages ): void
7388 {
74- array_push ($ this ->messages , ...$ messages );
89+ $ this ->transactional (function () use ($ messages ): void {
90+ $ count = count ($ this ->messages );
91+
92+ foreach ($ messages as $ message ) {
93+ $ count ++;
94+
95+ if (!$ message ->hasHeader (IndexHeader::class)) {
96+ $ message = $ message ->withHeader (new IndexHeader ($ count ));
97+ }
98+
99+ if (!$ message ->hasHeader (EventIdHeader::class)) {
100+ $ message = $ message ->withHeader (new EventIdHeader (Uuid::uuid7 ()->toString ()));
101+ }
102+
103+ if (!$ message ->hasHeader (RecordedOnHeader::class)) {
104+ $ message = $ message ->withHeader (new RecordedOnHeader ($ this ->clock ->now ()));
105+ }
106+
107+ $ this ->messages [] = $ message ;
108+ }
109+ });
75110 }
76111
77112 /**
@@ -81,7 +116,14 @@ public function save(Message ...$messages): void
81116 */
82117 public function transactional (Closure $ function ): void
83118 {
84- $ function ();
119+ $ messages = $ this ->messages ;
120+ try {
121+ $ function ();
122+ } catch (Throwable $ e ) {
123+ $ this ->messages = $ messages ;
124+
125+ throw $ e ;
126+ }
85127 }
86128
87129 /** @return list<string> */
@@ -134,9 +176,11 @@ private function filter(Criteria|null $criteria): array
134176 return $ this ->messages ;
135177 }
136178
179+ $ eventRegistry = $ this ->eventRegistry ;
180+
137181 return array_filter (
138182 $ this ->messages ,
139- static function (Message $ message, int $ index ) use ($ criteria ): bool {
183+ static function (Message $ message ) use ($ criteria, $ eventRegistry ): bool {
140184 foreach ($ criteria ->all () as $ criterion ) {
141185 switch ($ criterion ::class) {
142186 case AggregateIdCriterion::class:
@@ -222,7 +266,35 @@ static function (Message $message, int $index) use ($criteria): bool {
222266
223267 break ;
224268 case FromIndexCriterion::class:
225- if ($ index < $ criterion ->fromIndex ) {
269+ try {
270+ $ index = $ message ->header (IndexHeader::class)->index ;
271+ } catch (HeaderNotFound ) {
272+ return false ;
273+ }
274+
275+ if ($ index <= $ criterion ->fromIndex ) {
276+ return false ;
277+ }
278+
279+ break ;
280+ case ToIndexCriterion::class:
281+ try {
282+ $ index = $ message ->header (IndexHeader::class)->index ;
283+ } catch (HeaderNotFound ) {
284+ return false ;
285+ }
286+
287+ if ($ index >= $ criterion ->toIndex ) {
288+ return false ;
289+ }
290+
291+ break ;
292+ case EventsCriterion::class:
293+ if ($ eventRegistry === null ) {
294+ throw new MissingEventRegistry ($ criterion ::class);
295+ }
296+
297+ if (!in_array ($ eventRegistry ->eventName ($ message ->event ()::class), $ criterion ->events )) {
226298 return false ;
227299 }
228300
0 commit comments