Skip to content

Commit ebc868e

Browse files
authored
Merge pull request #272 from patchlevel/DCB
add "experimental" dcb feature
2 parents b8c716c + 4318fef commit ebc868e

File tree

3 files changed

+119
-29
lines changed

3 files changed

+119
-29
lines changed

src/DependencyInjection/Configuration.php

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@
7878
* },
7979
* clock: array{freeze: ?string, service: ?string},
8080
* aggregate_handlers: array{enabled: bool, bus: string|null},
81+
* dcb: array{enabled: bool},
8182
* }
8283
*/
8384
final class Configuration implements ConfigurationInterface
@@ -105,7 +106,7 @@ public function getConfigTreeBuilder(): TreeBuilder
105106
->addDefaultsIfNotSet()
106107
->children()
107108
->enumNode('type')
108-
->values(['dbal_stream', 'in_memory', 'custom'])
109+
->values(['dbal_stream', 'dbal_taggable', 'in_memory', 'custom'])
109110
->defaultValue('dbal_stream')
110111
->end()
111112
->scalarNode('service')->defaultNull()->end()
@@ -117,7 +118,7 @@ public function getConfigTreeBuilder(): TreeBuilder
117118
->addDefaultsIfNotSet()
118119
->children()
119120
->enumNode('type')
120-
->values(['dbal_stream', 'in_memory', 'custom'])
121+
->values(['dbal_stream', 'dbal_taggable', 'in_memory', 'custom'])
121122
->end()
122123
->scalarNode('service')->defaultNull()->end()
123124
->arrayNode('options')->variablePrototype()->end()->end()
@@ -334,6 +335,10 @@ public function getConfigTreeBuilder(): TreeBuilder
334335
->end()
335336
->end()
336337

338+
->arrayNode('dcb')
339+
->canBeEnabled()
340+
->end()
341+
337342
->arrayNode('aggregate_handlers')
338343
->canBeEnabled()
339344
->addDefaultsIfNotSet()

src/DependencyInjection/PatchlevelEventSourcingExtension.php

Lines changed: 78 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,10 @@
4545
use Patchlevel\EventSourcing\Console\Command\WatchCommand;
4646
use Patchlevel\EventSourcing\Console\DoctrineHelper;
4747
use Patchlevel\EventSourcing\Cryptography\DoctrineCipherKeyStore;
48+
use Patchlevel\EventSourcing\DecisionModel\DecisionModelBuilder;
49+
use Patchlevel\EventSourcing\DecisionModel\EventAppender;
50+
use Patchlevel\EventSourcing\DecisionModel\StoreDecisionModelBuilder;
51+
use Patchlevel\EventSourcing\DecisionModel\StoreEventAppender;
4852
use Patchlevel\EventSourcing\EventBus\AttributeListenerProvider;
4953
use Patchlevel\EventSourcing\EventBus\Consumer;
5054
use Patchlevel\EventSourcing\EventBus\DefaultConsumer;
@@ -70,6 +74,7 @@
7074
use Patchlevel\EventSourcing\QueryBus\QueryBus;
7175
use Patchlevel\EventSourcing\Repository\DefaultRepositoryManager;
7276
use Patchlevel\EventSourcing\Repository\MessageDecorator\ChainMessageDecorator;
77+
use Patchlevel\EventSourcing\Repository\MessageDecorator\EventTagDecorator;
7378
use Patchlevel\EventSourcing\Repository\MessageDecorator\MessageDecorator;
7479
use Patchlevel\EventSourcing\Repository\MessageDecorator\SplitStreamDecorator;
7580
use Patchlevel\EventSourcing\Repository\RepositoryManager;
@@ -80,20 +85,23 @@
8085
use Patchlevel\EventSourcing\Schema\DoctrineSchemaListener;
8186
use Patchlevel\EventSourcing\Schema\DoctrineSchemaProvider;
8287
use Patchlevel\EventSourcing\Schema\SchemaDirector;
88+
use Patchlevel\EventSourcing\Serializer\AttributeEventTagExtractor;
8389
use Patchlevel\EventSourcing\Serializer\DefaultEventSerializer;
8490
use Patchlevel\EventSourcing\Serializer\Encoder\Encoder;
8591
use Patchlevel\EventSourcing\Serializer\Encoder\JsonEncoder;
8692
use Patchlevel\EventSourcing\Serializer\EventSerializer;
93+
use Patchlevel\EventSourcing\Serializer\EventTagExtractor;
8794
use Patchlevel\EventSourcing\Serializer\Upcast\Upcaster;
8895
use Patchlevel\EventSourcing\Serializer\Upcast\UpcasterChain;
8996
use Patchlevel\EventSourcing\Snapshot\Adapter\Psr16SnapshotAdapter;
9097
use Patchlevel\EventSourcing\Snapshot\Adapter\Psr6SnapshotAdapter;
9198
use Patchlevel\EventSourcing\Snapshot\DefaultSnapshotStore;
9299
use Patchlevel\EventSourcing\Snapshot\SnapshotStore;
93100
use Patchlevel\EventSourcing\Store\InMemoryStore;
101+
use Patchlevel\EventSourcing\Store\ReadOnlyStore;
94102
use Patchlevel\EventSourcing\Store\Store;
95103
use Patchlevel\EventSourcing\Store\StreamDoctrineDbalStore;
96-
use Patchlevel\EventSourcing\Store\StreamReadOnlyStore;
104+
use Patchlevel\EventSourcing\Store\TaggableDoctrineDbalStore;
97105
use Patchlevel\EventSourcing\Subscription\Engine\CatchUpSubscriptionEngine;
98106
use Patchlevel\EventSourcing\Subscription\Engine\DefaultSubscriptionEngine;
99107
use Patchlevel\EventSourcing\Subscription\Engine\GapResolverStoreMessageLoader;
@@ -188,6 +196,7 @@ public function load(array $configs, ContainerBuilder $container): void
188196
$this->configureMigration($config, $container);
189197
$this->configureValueResolver($container);
190198
$this->configureStoreMigration($config, $container);
199+
$this->configureDCB($config, $container);
191200
}
192201

193202
/** @param Config $config */
@@ -230,6 +239,9 @@ private function configureSerializer(array $config, ContainerBuilder $container)
230239
]);
231240

232241
$container->setAlias(HeadersSerializer::class, DefaultHeadersSerializer::class);
242+
243+
$container->register(AttributeEventTagExtractor::class);
244+
$container->setAlias(EventTagExtractor::class, AttributeEventTagExtractor::class);
233245
}
234246

235247
/** @param Config $config */
@@ -627,6 +639,10 @@ private function configureMessageDecorator(ContainerBuilder $container): void
627639
->setArguments([new Reference(EventMetadataFactory::class)])
628640
->addTag('event_sourcing.message_decorator');
629641

642+
$container->register(EventTagDecorator::class)
643+
->setArguments([new Reference(EventTagExtractor::class)])
644+
->addTag('event_sourcing.message_decorator');
645+
630646
$container->registerForAutoconfiguration(MessageDecorator::class)
631647
->addTag('event_sourcing.message_decorator');
632648

@@ -721,7 +737,7 @@ private function configureStore(array $config, ContainerBuilder $container): voi
721737
$container->setAlias(Store::class, StreamDoctrineDbalStore::class);
722738

723739
if ($config['store']['read_only']) {
724-
$container->register(StreamReadOnlyStore::class)
740+
$container->register(ReadOnlyStore::class)
725741
->setDecoratedService(Store::class)
726742
->setArguments([
727743
new Reference('.inner'),
@@ -731,6 +747,27 @@ private function configureStore(array $config, ContainerBuilder $container): voi
731747
return;
732748
}
733749

750+
if ($config['store']['type'] === 'dbal_taggable') {
751+
$container->register(TaggableDoctrineDbalStore::class)
752+
->setArguments([
753+
new Reference('event_sourcing.dbal_connection'),
754+
new Reference(EventSerializer::class),
755+
new Reference(EventRegistry::class),
756+
new Reference(HeadersSerializer::class),
757+
new Reference('event_sourcing.clock'),
758+
$config['store']['options'],
759+
])
760+
->addTag('event_sourcing.doctrine_schema_configurator');
761+
762+
$container->setAlias(Store::class, TaggableDoctrineDbalStore::class);
763+
764+
if ($config['store']['read_only']) {
765+
throw new InvalidArgumentException('Taggable store does not support read only');
766+
}
767+
768+
return;
769+
}
770+
734771
throw new InvalidArgumentException(sprintf('Unknown store type "%s"', $config['store']['type']));
735772
}
736773

@@ -784,6 +821,20 @@ private function configureStoreMigration(array $config, ContainerBuilder $contai
784821
return;
785822
}
786823

824+
if ($config['store']['migrate_to_new_store']['type'] === 'dbal_taggable') {
825+
$container->register($id, TaggableDoctrineDbalStore::class)
826+
->setArguments([
827+
new Reference('event_sourcing.dbal_connection'),
828+
new Reference(EventSerializer::class),
829+
new Reference(HeadersSerializer::class),
830+
new Reference('event_sourcing.clock'),
831+
$config['store']['migrate_to_new_store']['options'],
832+
])
833+
->addTag('event_sourcing.doctrine_schema_configurator');
834+
835+
return;
836+
}
837+
787838
throw new InvalidArgumentException(sprintf('Unknown store type "%s"', $config['store']['type']));
788839
}
789840

@@ -1135,6 +1186,31 @@ private function configureCryptography(array $config, ContainerBuilder $containe
11351186
$container->setAlias(PayloadCryptographer::class, PersonalDataPayloadCryptographer::class);
11361187
}
11371188

1189+
/** @param Config $config */
1190+
private function configureDCB(array $config, ContainerBuilder $container): void
1191+
{
1192+
if (!$config['dcb']['enabled']) {
1193+
return;
1194+
}
1195+
1196+
if ($config['store']['type'] !== 'dbal_taggable') {
1197+
throw new InvalidArgumentException(
1198+
'DCB requires a taggable store, please use "dbal_taggable" as store type.',
1199+
);
1200+
}
1201+
1202+
$container->register(StoreDecisionModelBuilder::class)
1203+
->setArguments([new Reference(TaggableDoctrineDbalStore::class)]);
1204+
$container->setAlias(DecisionModelBuilder::class, StoreDecisionModelBuilder::class);
1205+
1206+
$container->register(StoreEventAppender::class)
1207+
->setArguments([
1208+
new Reference(TaggableDoctrineDbalStore::class),
1209+
new Reference(EventTagExtractor::class),
1210+
]);
1211+
$container->setAlias(EventAppender::class, StoreEventAppender::class);
1212+
}
1213+
11381214
private function configureValueResolver(ContainerBuilder $container): void
11391215
{
11401216
$container->register(IdentifierValueResolver::class)

tests/Unit/PatchlevelEventSourcingBundleTest.php

Lines changed: 34 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,12 @@
3434
use Patchlevel\EventSourcing\Console\Command\SubscriptionStatusCommand;
3535
use Patchlevel\EventSourcing\Console\Command\SubscriptionTeardownCommand;
3636
use Patchlevel\EventSourcing\Console\Command\WatchCommand;
37+
use Patchlevel\EventSourcing\Serializer\AttributeEventTagExtractor;
38+
use Patchlevel\EventSourcing\DecisionModel\DecisionModelBuilder;
39+
use Patchlevel\EventSourcing\DecisionModel\EventAppender;
40+
use Patchlevel\EventSourcing\Serializer\EventTagExtractor;
41+
use Patchlevel\EventSourcing\DecisionModel\StoreDecisionModelBuilder;
42+
use Patchlevel\EventSourcing\DecisionModel\StoreEventAppender;
3743
use Patchlevel\EventSourcing\EventBus\DefaultEventBus;
3844
use Patchlevel\EventSourcing\EventBus\EventBus;
3945
use Patchlevel\EventSourcing\EventBus\Psr14EventBus;
@@ -61,8 +67,7 @@
6167
use Patchlevel\EventSourcing\Store\InMemoryStore;
6268
use Patchlevel\EventSourcing\Store\Store;
6369
use Patchlevel\EventSourcing\Store\StreamDoctrineDbalStore;
64-
use Patchlevel\EventSourcing\Store\StreamReadOnlyStore;
65-
use Patchlevel\EventSourcing\Store\StreamStore;
70+
use Patchlevel\EventSourcing\Store\ReadOnlyStore;
6671
use Patchlevel\EventSourcing\Subscription\Engine\CatchUpSubscriptionEngine;
6772
use Patchlevel\EventSourcing\Subscription\Engine\DefaultSubscriptionEngine;
6873
use Patchlevel\EventSourcing\Subscription\Engine\GapResolverStoreMessageLoader;
@@ -154,6 +159,7 @@ public function testMinimalConfig(): void
154159
self::assertInstanceOf(EventRegistry::class, $container->get(EventRegistry::class));
155160
self::assertInstanceOf(SystemClock::class, $container->get('event_sourcing.clock'));
156161
self::assertInstanceOf(DefaultSubscriptionEngine::class, $container->get(SubscriptionEngine::class));
162+
self::assertInstanceOf(AttributeEventTagExtractor::class, $container->get(EventTagExtractor::class));
157163

158164
self::assertFalse($container->has(EventBus::class));
159165

@@ -278,7 +284,7 @@ public function testStreamStore(): void
278284
]
279285
);
280286

281-
self::assertInstanceOf(StreamStore::class, $container->get(Store::class));
287+
self::assertInstanceOf(StreamDoctrineDbalStore::class, $container->get(Store::class));
282288
}
283289

284290
public function testInMemoryStore(): void
@@ -318,7 +324,7 @@ public function testReadOnlyStore(): void
318324
]
319325
);
320326

321-
self::assertInstanceOf(StreamReadOnlyStore::class, $container->get(Store::class));
327+
self::assertInstanceOf(ReadOnlyStore::class, $container->get(Store::class));
322328
}
323329

324330
public function testMigrateStore(): void
@@ -365,27 +371,6 @@ public function testMigrateStore(): void
365371
);
366372
}
367373

368-
public function testStreamReadOnlyStore(): void
369-
{
370-
$container = new ContainerBuilder();
371-
$this->compileContainer(
372-
$container,
373-
[
374-
'patchlevel_event_sourcing' => [
375-
'connection' => [
376-
'service' => 'doctrine.dbal.eventstore_connection',
377-
],
378-
'store' => [
379-
'type' => 'dbal_stream',
380-
'read_only' => true,
381-
]
382-
],
383-
]
384-
);
385-
386-
self::assertInstanceOf(StreamReadOnlyStore::class, $container->get(Store::class));
387-
}
388-
389374
public function testSymfonyEventBus(): void
390375
{
391376
$eventBus = $this->prophesize(MessageBusInterface::class)->reveal();
@@ -662,6 +647,30 @@ public function testQueryBus(): void
662647
self::assertEquals('foo', $handler->{$tag['method']}(new QueryFoo('foo')));
663648
}
664649

650+
651+
public function testDCB(): void
652+
{
653+
$container = new ContainerBuilder();
654+
655+
$this->compileContainer(
656+
$container,
657+
[
658+
'patchlevel_event_sourcing' => [
659+
'connection' => [
660+
'service' => 'doctrine.dbal.eventstore_connection',
661+
],
662+
'store' => [
663+
'type' => 'dbal_taggable',
664+
],
665+
'dcb' => true,
666+
],
667+
]
668+
);
669+
670+
self::assertInstanceOf(StoreEventAppender::class, $container->get(EventAppender::class));
671+
self::assertInstanceOf(StoreDecisionModelBuilder::class, $container->get(DecisionModelBuilder::class));
672+
}
673+
665674
public function testMessageLoader(): void
666675
{
667676
$container = new ContainerBuilder();

0 commit comments

Comments
 (0)