Skip to content

Commit 5769fba

Browse files
committed
refactor: separate emit logic into internal method and add emitAsync for asynchronous processing
1 parent 54383b8 commit 5769fba

File tree

1 file changed

+32
-6
lines changed

1 file changed

+32
-6
lines changed

packages/core/src/emitter/transactional-event-emitter.ts

Lines changed: 32 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,32 +24,31 @@ export class TransactionalEventEmitter {
2424
@Inject(EVENT_CONFIGURATION_RESOLVER_TOKEN) private eventConfigurationResolver: EventConfigurationResolverContract,
2525
) {}
2626

27-
async emit(
27+
private async emitInternal(
2828
event: InboxOutboxEvent,
2929
entities: {
3030
operation: TransactionalEventEmitterOperations;
3131
entity: object;
3232
}[],
3333
customDatabaseDriverPersister?: DatabaseDriverPersister,
34+
awaitProcessor: boolean = false,
3435
): Promise<void> {
3536
const eventOptions: InboxOutboxModuleEventOptions = this.options.events.find((optionEvent) => optionEvent.name === event.name);
36-
3737
if (!eventOptions) {
3838
throw new Error(`Event ${event.name} is not configured. Did you forget to add it to the module options?`);
3939
}
40-
40+
4141
const databaseDriver = this.databaseDriverFactory.create(this.eventConfigurationResolver);
4242
const currentTimestamp = new Date().getTime();
43-
43+
4444
const inboxOutboxTransportEvent = databaseDriver.createInboxOutboxTransportEvent(
4545
event.name,
4646
event,
4747
currentTimestamp + eventOptions.listeners.expiresAtTTL,
4848
currentTimestamp + eventOptions.listeners.readyToRetryAfterTTL,
4949
);
50-
5150
const persister = customDatabaseDriverPersister ?? databaseDriver;
52-
51+
5352
entities.forEach((entity) => {
5453
if (entity.operation === TransactionalEventEmitterOperations.persist) {
5554
persister.persist(entity.entity);
@@ -62,9 +61,36 @@ export class TransactionalEventEmitter {
6261
persister.persist(inboxOutboxTransportEvent);
6362
await persister.flush();
6463

64+
if (awaitProcessor) {
65+
await this.inboxOutboxEventProcessor.process(eventOptions, inboxOutboxTransportEvent, this.getListeners(event.name));
66+
return;
67+
}
68+
6569
this.inboxOutboxEventProcessor.process(eventOptions, inboxOutboxTransportEvent, this.getListeners(event.name));
6670
}
6771

72+
async emit(
73+
event: InboxOutboxEvent,
74+
entities: {
75+
operation: TransactionalEventEmitterOperations;
76+
entity: object;
77+
}[],
78+
customDatabaseDriverPersister?: DatabaseDriverPersister,
79+
): Promise<void> {
80+
return this.emitInternal(event, entities, customDatabaseDriverPersister, false);
81+
}
82+
83+
async emitAsync(
84+
event: InboxOutboxEvent,
85+
entities: {
86+
operation: TransactionalEventEmitterOperations;
87+
entity: object;
88+
}[],
89+
customDatabaseDriverPersister?: DatabaseDriverPersister,
90+
): Promise<void> {
91+
return this.emitInternal(event, entities, customDatabaseDriverPersister, true);
92+
}
93+
6894
addListener<TPayload>(eventName: string, listener: IListener<TPayload>): void {
6995
const previousListeners = this.listeners.get(eventName) || [];
7096
if (previousListeners.some((previousListener) => previousListener.getName() === listener.getName())) {

0 commit comments

Comments
 (0)