Skip to content

Added support for executing a sub-workflow #1363

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/WorkflowCore.Testing/WorkflowCore.Testing.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="3.1.0" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="3.1.0" />
<PackageReference Include="Microsoft.Extensions.Logging" Version="3.1.0" />
<PackageReference Include="xunit.abstractions" Version="2.0.3" />
</ItemGroup>

<ItemGroup>
Expand Down
10 changes: 10 additions & 0 deletions src/WorkflowCore/Interface/IWorkflowModifier.cs
Original file line number Diff line number Diff line change
Expand Up @@ -183,5 +183,15 @@ IStepBuilder<TData, Activity> Activity(string activityName, Expression<Func<TDat
/// <returns></returns>
IStepBuilder<TData, Activity> Activity(Expression<Func<TData, IStepExecutionContext, string>> activityName, Expression<Func<TData, object>> parameters = null,
Expression<Func<TData, DateTime>> effectiveDate = null, Expression<Func<TData, bool>> cancelCondition = null);

/// <summary>
/// Execute a sub-workflow
/// </summary>
/// <param name="subWorkflowId">Id of the sub-workflow to start</param>
/// <param name="parameters">The data to pass to the sub-workflow</param>
/// <param name="cancelCondition">A condition that when true will cancel this sub-workflow</param>
/// <returns></returns>
IStepBuilder<TData, SubWorkflowStepBody> SubWorkflow(string subWorkflowId, Expression<Func<TData, object>> parameters = null,
Expression<Func<TData, bool>> cancelCondition = null);
}
}
4 changes: 4 additions & 0 deletions src/WorkflowCore/Models/ExecutionPointer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ public IReadOnlyCollection<string> Scope
get => _scope;
set => _scope = new List<string>(value);
}

public bool IsComplete => Status == PointerStatus.Complete;

public bool HasChildren => Children?.Count > 0;
}

public enum PointerStatus
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
namespace WorkflowCore.Models.LifeCycleEvents
{
public class SubWorkflowLifeCycleEvent : LifeCycleEvent
{

}
}
49 changes: 47 additions & 2 deletions src/WorkflowCore/Primitives/SubWorkflowStepBody.cs
Original file line number Diff line number Diff line change
@@ -1,15 +1,60 @@
using System;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using WorkflowCore.Interface;
using WorkflowCore.Models;
using WorkflowCore.Models.LifeCycleEvents;

namespace WorkflowCore.Primitives
{
public class SubWorkflowStepBody : StepBody
{
private readonly IScopeProvider _scopeProvider;

public SubWorkflowStepBody(IScopeProvider scopeProvider)
{
_scopeProvider = scopeProvider;
}

public override ExecutionResult Run(IStepExecutionContext context)
{
// TODO: What is this supposed to do?
throw new NotImplementedException();
var scope = _scopeProvider.CreateScope(context);
var workflowController = scope.ServiceProvider.GetRequiredService<IWorkflowController>();
var logger = scope.ServiceProvider.GetRequiredService<ILoggerFactory>().CreateLogger(
typeof(SubWorkflowStepBody).Namespace + "." + nameof(SubWorkflowStepBody));

if (!context.ExecutionPointer.EventPublished)
{
var result = workflowController.StartWorkflow(SubWorkflowId, context.Workflow.Data, context.Workflow.Id).Result;

logger.LogDebug("Started sub workflow {Name} with id='{SubId}' from workflow {WorkflowDefinitionId} ({Id})",
SubWorkflowId, result, context.Workflow.WorkflowDefinitionId, context.Workflow.Id);

logger.LogDebug("Workflow {Name} ({SubId}) is waiting for event SubWorkflowLifeCycleEvent with key='{EventKey}'",
SubWorkflowId, result, result);

var effectiveDate = DateTime.MinValue;
return ExecutionResult.WaitForEvent(nameof(SubWorkflowLifeCycleEvent), result, effectiveDate);
}

logger.LogDebug("Sub workflow {Name} ({SubId}) completed", SubWorkflowId,
context.ExecutionPointer.EventKey);

var persistenceProvider = scope.ServiceProvider.GetRequiredService<IPersistenceProvider>();
var workflowInstance = persistenceProvider.GetWorkflowInstance(context.ExecutionPointer.EventKey).Result;
if (workflowInstance.Status == WorkflowStatus.Terminated)
{
throw new NotImplementedException(workflowInstance.Status.ToString());
}

Result = workflowInstance.Data;
return ExecutionResult.Next();
}

public string SubWorkflowId { get; set; }

public object Parameters { get; set; }

public object Result { get; set; }
}
}
3 changes: 2 additions & 1 deletion src/WorkflowCore/Primitives/WaitFor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ public override ExecutionResult Run(IStepExecutionContext context)
effectiveDate = EffectiveDate;
}

return ExecutionResult.WaitForEvent(EventName, EventKey, effectiveDate);
var eventKey = context.Workflow.Reference ?? EventKey;
return ExecutionResult.WaitForEvent(EventName, eventKey, effectiveDate);
}

EventData = context.ExecutionPointer.EventData;
Expand Down
1 change: 1 addition & 0 deletions src/WorkflowCore/ServiceCollectionExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ public static IServiceCollection AddWorkflow(this IServiceCollection services, A
services.AddTransient<ISyncWorkflowRunner, SyncWorkflowRunner>();

services.AddTransient<Foreach>();
services.AddTransient<SubWorkflowStepBody>();

return services;
}
Expand Down
10 changes: 10 additions & 0 deletions src/WorkflowCore/Services/ExecutionResultProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,16 @@ public ExecutionResultProcessor(IExecutionPointerFactory pointerFactory, IDateTi

public void ProcessExecutionResult(WorkflowInstance workflow, WorkflowDefinition def, ExecutionPointer pointer, WorkflowStep step, ExecutionResult result, WorkflowExecutorResult workflowResult)
{
var stepInfo = $"{step.Name ?? step.BodyType.Name} ({step.Id})";

pointer.PersistenceData = result.PersistenceData;
pointer.Outcome = result.OutcomeValue;
if (result.SleepFor.HasValue)
{
pointer.SleepUntil = _datetimeProvider.UtcNow.Add(result.SleepFor.Value);
pointer.Status = PointerStatus.Sleeping;
_logger.LogDebug("Step {StepName} on workflow {WorkflowDefinitionId} ({WorkflowId}) will sleep for {SleepUntil}",
stepInfo, workflow.WorkflowDefinitionId, workflow.Id, result.SleepFor.Value);
}

if (!string.IsNullOrEmpty(result.EventName))
Expand All @@ -54,6 +58,9 @@ public void ProcessExecutionResult(WorkflowInstance workflow, WorkflowDefinition
SubscribeAsOf = result.EventAsOf,
SubscriptionData = result.SubscriptionData
});

_logger.LogDebug("Step {StepName} on workflow {WorkflowDefinitionId} ({WorkflowId}) waiting for event {EventName}",
stepInfo, workflow.WorkflowDefinitionId, workflow.Id, pointer.EventName);
}

if (result.Proceed)
Expand Down Expand Up @@ -87,6 +94,9 @@ public void ProcessExecutionResult(WorkflowInstance workflow, WorkflowDefinition
WorkflowDefinitionId = workflow.WorkflowDefinitionId,
Version = workflow.Version
});

_logger.LogDebug("Step {StepName} on workflow {WorkflowDefinitionId} ({WorkflowId}) completed",
stepInfo, workflow.WorkflowDefinitionId, workflow.Id);
}
else
{
Expand Down
29 changes: 29 additions & 0 deletions src/WorkflowCore/Services/FluentBuilders/StepBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -588,5 +588,34 @@ public IStepBuilder<TData, Activity> Activity(Expression<Func<TData, IStepExecut
Step.Outcomes.Add(new ValueOutcome { NextStep = newStep.Id });
return stepBuilder;
}

public IStepBuilder<TData, SubWorkflowStepBody> SubWorkflow(
string subWorkflowId,
Expression<Func<TData, object>> parameters = null,
Expression<Func<TData, bool>> cancelCondition = null)
{
var newStep = new WorkflowStep<SubWorkflowStepBody>();
newStep.CancelCondition = cancelCondition;

WorkflowBuilder.AddStep(newStep);
var stepBuilder = new StepBuilder<TData, SubWorkflowStepBody>(WorkflowBuilder, newStep);
stepBuilder.Input((step) => step.SubWorkflowId, (data) => subWorkflowId);

if (parameters != null)
stepBuilder.Input((step) => step.Parameters, parameters);

// use the result of the sub workflow as an output
// merge it with parent workflow data
stepBuilder.Output((body, data) =>
{
foreach (var prop in typeof(TData).GetProperties())
{
prop.SetValue(data, prop.GetValue(body.Result));
}
});

Step.Outcomes.Add(new ValueOutcome { NextStep = newStep.Id });
return stepBuilder;
}
}
}
6 changes: 6 additions & 0 deletions src/WorkflowCore/Services/FluentBuilders/WorkflowBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -296,10 +296,16 @@ public IStepBuilder<TData, Activity> Activity(string activityName, Expression<Fu
{
return Start().Activity(activityName, parameters, effectiveDate, cancelCondition);
}

public IStepBuilder<TData, Activity> Activity(Expression<Func<TData, IStepExecutionContext, string>> activityName, Expression<Func<TData, object>> parameters = null, Expression<Func<TData, DateTime>> effectiveDate = null, Expression<Func<TData, bool>> cancelCondition = null)
{
return Start().Activity(activityName, parameters, effectiveDate, cancelCondition);
}

public IStepBuilder<TData, SubWorkflowStepBody> SubWorkflow(string subWorkflowId, Expression<Func<TData, object>> parameters = null, Expression<Func<TData, bool>> cancelCondition = null)
{
return Start().SubWorkflow(subWorkflowId, parameters, cancelCondition);
}

private IStepBuilder<TData, InlineStepBody> Start()
{
Expand Down
45 changes: 42 additions & 3 deletions src/WorkflowCore/Services/WorkflowExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public async Task<WorkflowExecutorResult> Execute(WorkflowInstance workflow, Can
WorkflowId = workflow.Id,
ExecutionPointerId = pointer.Id,
ErrorTime = _datetimeProvider.UtcNow,
Message = ex.Message
Message = ex.ToString()
});

_executionResultProcessor.HandleStepException(workflow, def, pointer, step, ex);
Expand Down Expand Up @@ -156,9 +156,11 @@ private async Task ExecuteStep(WorkflowInstance workflow, WorkflowStep step, Exe
CancellationToken = cancellationToken
};

var stepInfo = $"{step.Name ?? step.BodyType.Name} ({step.Id})";

using (var scope = _scopeProvider.CreateScope(context))
{
_logger.LogDebug("Starting step {StepName} on workflow {WorkflowId}", step.Name, workflow.Id);
_logger.LogDebug("Starting step {StepName} on workflow {WorkflowDefinitionId} ({WorkflowId})", stepInfo, workflow.WorkflowDefinitionId, workflow.Id);

IStepBody body = step.ConstructBody(scope.ServiceProvider);
var stepExecutor = scope.ServiceProvider.GetRequiredService<IStepExecutor>();
Expand Down Expand Up @@ -221,6 +223,13 @@ private async Task DetermineNextExecutionTime(WorkflowInstance workflow, Workflo

if (workflow.Status == WorkflowStatus.Complete)
{
await OnComplete(workflow, def);
return;
}

if (workflow.Status == WorkflowStatus.Terminated)
{
await OnTerminated(workflow, def);
return;
}

Expand All @@ -236,7 +245,7 @@ private async Task DetermineNextExecutionTime(WorkflowInstance workflow, Workflo
workflow.NextExecution = Math.Min(pointerSleep, workflow.NextExecution ?? pointerSleep);
}

foreach (var pointer in workflow.ExecutionPointers.Where(x => x.Active && (x.Children ?? new List<string>()).Count > 0))
foreach (var pointer in workflow.ExecutionPointers.Where(x => x.Active && x.HasChildren))
{
if (!workflow.ExecutionPointers.FindByScope(pointer.Id).All(x => x.EndTime.HasValue))
continue;
Expand All @@ -256,6 +265,11 @@ private async Task DetermineNextExecutionTime(WorkflowInstance workflow, Workflo
return;
}

await OnComplete(workflow, def);
}

private async Task OnComplete(WorkflowInstance workflow, WorkflowDefinition def)
{
workflow.Status = WorkflowStatus.Complete;
workflow.CompleteTime = _datetimeProvider.UtcNow;

Expand All @@ -264,6 +278,8 @@ private async Task DetermineNextExecutionTime(WorkflowInstance workflow, Workflo
var middlewareRunner = scope.ServiceProvider.GetRequiredService<IWorkflowMiddlewareRunner>();
await middlewareRunner.RunPostMiddleware(workflow, def);
}

_logger.LogDebug("Workflow {WorkflowDefinitionId} ({Id}) completed", workflow.WorkflowDefinitionId, workflow.Id);

_publisher.PublishNotification(new WorkflowCompleted
{
Expand All @@ -274,5 +290,28 @@ private async Task DetermineNextExecutionTime(WorkflowInstance workflow, Workflo
Version = workflow.Version
});
}

private async Task OnTerminated(WorkflowInstance workflow, WorkflowDefinition def)
{
workflow.Status = WorkflowStatus.Terminated;
workflow.CompleteTime = _datetimeProvider.UtcNow;

using (var scope = _serviceProvider.CreateScope())
{
var middlewareRunner = scope.ServiceProvider.GetRequiredService<IWorkflowMiddlewareRunner>();
await middlewareRunner.RunPostMiddleware(workflow, def);
}

_logger.LogDebug("Workflow {WorkflowDefinitionId} ({Id}) terminated", workflow.WorkflowDefinitionId, workflow.Id);

_publisher.PublishNotification(new WorkflowTerminated
{
EventTimeUtc = _datetimeProvider.UtcNow,
Reference = workflow.Reference,
WorkflowInstanceId = workflow.Id,
WorkflowDefinitionId = workflow.WorkflowDefinitionId,
Version = workflow.Version
});
}
}
}
9 changes: 9 additions & 0 deletions src/WorkflowCore/Services/WorkflowHost.cs
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,15 @@ public Task<bool> TerminateWorkflow(string workflowId)

public void HandleLifeCycleEvent(LifeCycleEvent evt)
{
switch (evt)
{
// publish the event as sub workflow lifecycle event
case WorkflowCompleted _:
case WorkflowTerminated _:
_workflowController.PublishEvent(nameof(SubWorkflowLifeCycleEvent), evt.WorkflowInstanceId, evt.Reference);
break;
}

OnLifeCycleEvent?.Invoke(evt);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ public async Task<WorkflowInstance> GetWorkflowInstance(string Id, CancellationT
{
using (var db = ConstructDbContext())
{
if (!Guid.TryParse(Id, out _))
{

}
var uid = new Guid(Id);
var raw = await db.Set<PersistedWorkflow>()
.Include(wf => wf.ExecutionPointers)
Expand Down
Loading