Skip to content

Commit 7b58646

Browse files
committed
Remove CancellationToken from persistence ops
Writing persistence operations can no longer be cancelled to prevent data loss when cancelling a workflow. Fixes #953, #1032
1 parent 088bc1a commit 7b58646

File tree

15 files changed

+163
-163
lines changed

15 files changed

+163
-163
lines changed

src/WorkflowCore/Interface/Persistence/IEventRepository.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,17 +8,17 @@ namespace WorkflowCore.Interface
88
{
99
public interface IEventRepository
1010
{
11-
Task<string> CreateEvent(Event newEvent, CancellationToken cancellationToken = default);
11+
Task<string> CreateEvent(Event newEvent);
1212

1313
Task<Event> GetEvent(string id, CancellationToken cancellationToken = default);
1414

1515
Task<IEnumerable<string>> GetRunnableEvents(DateTime asAt, CancellationToken cancellationToken = default);
1616

1717
Task<IEnumerable<string>> GetEvents(string eventName, string eventKey, DateTime asOf, CancellationToken cancellationToken = default);
1818

19-
Task MarkEventProcessed(string id, CancellationToken cancellationToken = default);
19+
Task MarkEventProcessed(string id);
2020

21-
Task MarkEventUnprocessed(string id, CancellationToken cancellationToken = default);
21+
Task MarkEventUnprocessed(string id);
2222

2323
}
2424
}

src/WorkflowCore/Interface/Persistence/IPersistenceProvider.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ namespace WorkflowCore.Interface
99
public interface IPersistenceProvider : IWorkflowRepository, ISubscriptionRepository, IEventRepository, IScheduledCommandRepository
1010
{
1111

12-
Task PersistErrors(IEnumerable<ExecutionError> errors, CancellationToken cancellationToken = default);
12+
Task PersistErrors(IEnumerable<ExecutionError> errors);
1313

1414
void EnsureStoreExists();
1515

src/WorkflowCore/Interface/Persistence/ISubscriptionRepository.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,19 +8,19 @@ namespace WorkflowCore.Interface
88
{
99
public interface ISubscriptionRepository
1010
{
11-
Task<string> CreateEventSubscription(EventSubscription subscription, CancellationToken cancellationToken = default);
11+
Task<string> CreateEventSubscription(EventSubscription subscription);
1212

1313
Task<IEnumerable<EventSubscription>> GetSubscriptions(string eventName, string eventKey, DateTime asOf, CancellationToken cancellationToken = default);
1414

15-
Task TerminateSubscription(string eventSubscriptionId, CancellationToken cancellationToken = default);
15+
Task TerminateSubscription(string eventSubscriptionId);
1616

1717
Task<EventSubscription> GetSubscription(string eventSubscriptionId, CancellationToken cancellationToken = default);
1818

1919
Task<EventSubscription> GetFirstOpenSubscription(string eventName, string eventKey, DateTime asOf, CancellationToken cancellationToken = default);
2020

21-
Task<bool> SetSubscriptionToken(string eventSubscriptionId, string token, string workerId, DateTime expiry, CancellationToken cancellationToken = default);
21+
Task<bool> SetSubscriptionToken(string eventSubscriptionId, string token, string workerId, DateTime expiry);
2222

23-
Task ClearSubscriptionToken(string eventSubscriptionId, string token, CancellationToken cancellationToken = default);
23+
Task ClearSubscriptionToken(string eventSubscriptionId, string token);
2424

2525
}
2626
}

src/WorkflowCore/Interface/Persistence/IWorkflowRepository.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,9 @@ namespace WorkflowCore.Interface
88
{
99
public interface IWorkflowRepository
1010
{
11-
Task<string> CreateNewWorkflow(WorkflowInstance workflow, CancellationToken cancellationToken = default);
11+
Task<string> CreateNewWorkflow(WorkflowInstance workflow);
1212

13-
Task PersistWorkflow(WorkflowInstance workflow, CancellationToken cancellationToken = default);
13+
Task PersistWorkflow(WorkflowInstance workflow);
1414

1515
Task<IEnumerable<string>> GetRunnableInstances(DateTime asAt, CancellationToken cancellationToken = default);
1616

src/WorkflowCore/Services/BackgroundTasks/EventConsumer.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ protected override async Task ProcessItem(string itemId, CancellationToken cance
5959
if (activity == null)
6060
{
6161
Logger.LogWarning($"Activity already processed - {(evt.EventData as ActivityResult).SubscriptionId}");
62-
await _eventRepository.MarkEventProcessed(itemId, cancellationToken);
62+
await _eventRepository.MarkEventProcessed(itemId);
6363
return;
6464
}
6565
subs = new List<EventSubscription> { activity };
@@ -77,7 +77,7 @@ protected override async Task ProcessItem(string itemId, CancellationToken cance
7777

7878
if (complete)
7979
{
80-
await _eventRepository.MarkEventProcessed(itemId, cancellationToken);
80+
await _eventRepository.MarkEventProcessed(itemId);
8181
}
8282
else
8383
{
@@ -135,8 +135,8 @@ private async Task<bool> SeedSubscription(Event evt, EventSubscription sub, Hash
135135
p.Active = true;
136136
}
137137
workflow.NextExecution = 0;
138-
await _workflowRepository.PersistWorkflow(workflow, cancellationToken);
139-
await _subscriptionRepository.TerminateSubscription(sub.Id, cancellationToken);
138+
await _workflowRepository.PersistWorkflow(workflow);
139+
await _subscriptionRepository.TerminateSubscription(sub.Id);
140140
return true;
141141
}
142142
catch (Exception ex)

src/WorkflowCore/Services/BackgroundTasks/WorkflowConsumer.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ protected override async Task ProcessItem(string itemId, CancellationToken cance
5555
finally
5656
{
5757
WorkflowActivity.Enrich(result);
58-
await _persistenceStore.PersistWorkflow(workflow, cancellationToken);
58+
await _persistenceStore.PersistWorkflow(workflow);
5959
await QueueProvider.QueueWork(itemId, QueueType.Index);
6060
_greylist.Remove($"wf:{itemId}");
6161
}
@@ -71,7 +71,7 @@ protected override async Task ProcessItem(string itemId, CancellationToken cance
7171
await SubscribeEvent(sub, _persistenceStore, cancellationToken);
7272
}
7373

74-
await _persistenceStore.PersistErrors(result.Errors, cancellationToken);
74+
await _persistenceStore.PersistErrors(result.Errors);
7575

7676
if ((workflow.Status == WorkflowStatus.Runnable) && workflow.NextExecution.HasValue)
7777
{
@@ -103,7 +103,7 @@ private async Task SubscribeEvent(EventSubscription subscription, IPersistencePr
103103
//TODO: move to own class
104104
Logger.LogDebug("Subscribing to event {0} {1} for workflow {2} step {3}", subscription.EventName, subscription.EventKey, subscription.WorkflowId, subscription.StepId);
105105

106-
await persistenceStore.CreateEventSubscription(subscription, cancellationToken);
106+
await persistenceStore.CreateEventSubscription(subscription);
107107
if (subscription.EventName != Event.EventTypeActivity)
108108
{
109109
var events = await persistenceStore.GetEvents(subscription.EventName, subscription.EventKey, subscription.SubscribeAsOf, cancellationToken);
@@ -131,7 +131,7 @@ private async Task SubscribeEvent(EventSubscription subscription, IPersistencePr
131131
else
132132
{
133133
_greylist.Remove(eventKey);
134-
await persistenceStore.MarkEventUnprocessed(evt, cancellationToken);
134+
await persistenceStore.MarkEventUnprocessed(evt);
135135
await QueueProvider.QueueWork(evt, QueueType.Event);
136136
}
137137
}

src/WorkflowCore/Services/DefaultProviders/MemoryPersistenceProvider.cs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ public class MemoryPersistenceProvider : ISingletonMemoryProvider
2626

2727
public bool SupportsScheduledCommands => false;
2828

29-
public async Task<string> CreateNewWorkflow(WorkflowInstance workflow, CancellationToken _ = default)
29+
public async Task<string> CreateNewWorkflow(WorkflowInstance workflow)
3030
{
3131
lock (_instances)
3232
{
@@ -36,7 +36,7 @@ public async Task<string> CreateNewWorkflow(WorkflowInstance workflow, Cancellat
3636
}
3737
}
3838

39-
public async Task PersistWorkflow(WorkflowInstance workflow, CancellationToken _ = default)
39+
public async Task PersistWorkflow(WorkflowInstance workflow)
4040
{
4141
lock (_instances)
4242
{
@@ -107,7 +107,7 @@ public async Task<IEnumerable<WorkflowInstance>> GetWorkflowInstances(WorkflowSt
107107
}
108108

109109

110-
public async Task<string> CreateEventSubscription(EventSubscription subscription, CancellationToken _ = default)
110+
public async Task<string> CreateEventSubscription(EventSubscription subscription)
111111
{
112112
lock (_subscriptions)
113113
{
@@ -126,7 +126,7 @@ public async Task<IEnumerable<EventSubscription>> GetSubscriptions(string eventN
126126
}
127127
}
128128

129-
public async Task TerminateSubscription(string eventSubscriptionId, CancellationToken _ = default)
129+
public async Task TerminateSubscription(string eventSubscriptionId)
130130
{
131131
lock (_subscriptions)
132132
{
@@ -154,7 +154,7 @@ public Task<EventSubscription> GetFirstOpenSubscription(string eventName, string
154154
}
155155
}
156156

157-
public Task<bool> SetSubscriptionToken(string eventSubscriptionId, string token, string workerId, DateTime expiry, CancellationToken _ = default)
157+
public Task<bool> SetSubscriptionToken(string eventSubscriptionId, string token, string workerId, DateTime expiry)
158158
{
159159
lock (_subscriptions)
160160
{
@@ -167,7 +167,7 @@ public Task<bool> SetSubscriptionToken(string eventSubscriptionId, string token,
167167
}
168168
}
169169

170-
public Task ClearSubscriptionToken(string eventSubscriptionId, string token, CancellationToken _ = default)
170+
public Task ClearSubscriptionToken(string eventSubscriptionId, string token)
171171
{
172172
lock (_subscriptions)
173173
{
@@ -186,7 +186,7 @@ public void EnsureStoreExists()
186186
{
187187
}
188188

189-
public async Task<string> CreateEvent(Event newEvent, CancellationToken _ = default)
189+
public async Task<string> CreateEvent(Event newEvent)
190190
{
191191
lock (_events)
192192
{
@@ -196,7 +196,7 @@ public async Task<string> CreateEvent(Event newEvent, CancellationToken _ = defa
196196
}
197197
}
198198

199-
public async Task MarkEventProcessed(string id, CancellationToken _ = default)
199+
public async Task MarkEventProcessed(string id)
200200
{
201201
lock (_events)
202202
{
@@ -238,7 +238,7 @@ public async Task<IEnumerable<string>> GetEvents(string eventName, string eventK
238238
}
239239
}
240240

241-
public async Task MarkEventUnprocessed(string id, CancellationToken _ = default)
241+
public async Task MarkEventUnprocessed(string id)
242242
{
243243
lock (_events)
244244
{
@@ -250,7 +250,7 @@ public async Task MarkEventUnprocessed(string id, CancellationToken _ = default)
250250
}
251251
}
252252

253-
public async Task PersistErrors(IEnumerable<ExecutionError> errors, CancellationToken _ = default)
253+
public async Task PersistErrors(IEnumerable<ExecutionError> errors)
254254
{
255255
lock (errors)
256256
{

src/WorkflowCore/Services/DefaultProviders/TransientMemoryPersistenceProvider.cs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,11 @@ public TransientMemoryPersistenceProvider(ISingletonMemoryProvider innerService)
1818
_innerService = innerService;
1919
}
2020

21-
public Task<string> CreateEvent(Event newEvent, CancellationToken _ = default) => _innerService.CreateEvent(newEvent);
21+
public Task<string> CreateEvent(Event newEvent) => _innerService.CreateEvent(newEvent);
2222

23-
public Task<string> CreateEventSubscription(EventSubscription subscription, CancellationToken _ = default) => _innerService.CreateEventSubscription(subscription);
23+
public Task<string> CreateEventSubscription(EventSubscription subscription) => _innerService.CreateEventSubscription(subscription);
2424

25-
public Task<string> CreateNewWorkflow(WorkflowInstance workflow, CancellationToken _ = default) => _innerService.CreateNewWorkflow(workflow);
25+
public Task<string> CreateNewWorkflow(WorkflowInstance workflow) => _innerService.CreateNewWorkflow(workflow);
2626

2727
public void EnsureStoreExists() => _innerService.EnsureStoreExists();
2828

@@ -42,22 +42,22 @@ public TransientMemoryPersistenceProvider(ISingletonMemoryProvider innerService)
4242

4343
public Task<IEnumerable<WorkflowInstance>> GetWorkflowInstances(WorkflowStatus? status, string type, DateTime? createdFrom, DateTime? createdTo, int skip, int take) => _innerService.GetWorkflowInstances(status, type, createdFrom, createdTo, skip, take);
4444

45-
public Task MarkEventProcessed(string id, CancellationToken _ = default) => _innerService.MarkEventProcessed(id);
45+
public Task MarkEventProcessed(string id) => _innerService.MarkEventProcessed(id);
4646

47-
public Task MarkEventUnprocessed(string id, CancellationToken _ = default) => _innerService.MarkEventUnprocessed(id);
47+
public Task MarkEventUnprocessed(string id) => _innerService.MarkEventUnprocessed(id);
4848

49-
public Task PersistErrors(IEnumerable<ExecutionError> errors, CancellationToken _ = default) => _innerService.PersistErrors(errors);
49+
public Task PersistErrors(IEnumerable<ExecutionError> errors) => _innerService.PersistErrors(errors);
5050

51-
public Task PersistWorkflow(WorkflowInstance workflow, CancellationToken _ = default) => _innerService.PersistWorkflow(workflow);
51+
public Task PersistWorkflow(WorkflowInstance workflow) => _innerService.PersistWorkflow(workflow);
5252

53-
public Task TerminateSubscription(string eventSubscriptionId, CancellationToken _ = default) => _innerService.TerminateSubscription(eventSubscriptionId);
53+
public Task TerminateSubscription(string eventSubscriptionId) => _innerService.TerminateSubscription(eventSubscriptionId);
5454
public Task<EventSubscription> GetSubscription(string eventSubscriptionId, CancellationToken _ = default) => _innerService.GetSubscription(eventSubscriptionId);
5555

5656
public Task<EventSubscription> GetFirstOpenSubscription(string eventName, string eventKey, DateTime asOf, CancellationToken _ = default) => _innerService.GetFirstOpenSubscription(eventName, eventKey, asOf);
5757

58-
public Task<bool> SetSubscriptionToken(string eventSubscriptionId, string token, string workerId, DateTime expiry, CancellationToken _ = default) => _innerService.SetSubscriptionToken(eventSubscriptionId, token, workerId, expiry);
58+
public Task<bool> SetSubscriptionToken(string eventSubscriptionId, string token, string workerId, DateTime expiry) => _innerService.SetSubscriptionToken(eventSubscriptionId, token, workerId, expiry);
5959

60-
public Task ClearSubscriptionToken(string eventSubscriptionId, string token, CancellationToken _ = default) => _innerService.ClearSubscriptionToken(eventSubscriptionId, token);
60+
public Task ClearSubscriptionToken(string eventSubscriptionId, string token) => _innerService.ClearSubscriptionToken(eventSubscriptionId, token);
6161

6262
public Task ScheduleCommand(ScheduledCommand command)
6363
{

src/WorkflowCore/Services/SyncWorkflowRunner.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ public async Task<WorkflowInstance> RunWorkflowSync<TData>(string workflowId, in
7272
var id = Guid.NewGuid().ToString();
7373

7474
if (persistSate)
75-
id = await _persistenceStore.CreateNewWorkflow(wf, token);
75+
id = await _persistenceStore.CreateNewWorkflow(wf);
7676
else
7777
wf.Id = id;
7878

@@ -89,7 +89,7 @@ public async Task<WorkflowInstance> RunWorkflowSync<TData>(string workflowId, in
8989
{
9090
await _executor.Execute(wf, token);
9191
if (persistSate)
92-
await _persistenceStore.PersistWorkflow(wf, token);
92+
await _persistenceStore.PersistWorkflow(wf);
9393
}
9494
}
9595
finally
@@ -103,4 +103,4 @@ public async Task<WorkflowInstance> RunWorkflowSync<TData>(string workflowId, in
103103
return wf;
104104
}
105105
}
106-
}
106+
}

0 commit comments

Comments
 (0)