Skip to content

Commit d674ba7

Browse files
add before_started, after_completed and after_failed functionality
1 parent 2529920 commit d674ba7

File tree

11 files changed

+641
-41
lines changed

11 files changed

+641
-41
lines changed

README.md

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ end
113113
process = MyProcess.create_process Date.today, :option_1 => true
114114
```
115115

116-
_NOTE:_ The current implementation performs a naive check on the count of arguments.
116+
_NOTE:_ The current implementation performs a naïve check on the count of arguments.
117117

118118
Next, specify the tasks with their corresponding implementation methods, that make up the
119119
process, using the `task` method and providing the `method` to execute for the task.
@@ -291,6 +291,44 @@ module MyProcess
291291
end
292292
```
293293

294+
#### Before Process Started and After Process Completion or Failure
295+
296+
You may want to run further tasks asynchrously before or after a process has completed
297+
or failed. These tasks provide a way to execute logic independently of the process.
298+
299+
Specify these tasks using the `before_started`, `after_completed` or `after_failed` methods.
300+
301+
For example, using `after_completed` to set off another business process or `after_failed` to
302+
send an email to an operator.
303+
304+
```ruby
305+
module MyProcess
306+
extend Taskinator::Definition
307+
308+
# defines a process
309+
define_process do
310+
311+
# tasks, sub-process, etc.
312+
313+
# define task to execute on completion
314+
after_completed :further_process
315+
316+
# define task to execute on failure
317+
after_failed :email_operations
318+
319+
end
320+
321+
def further_process
322+
# ...
323+
end
324+
325+
def email_operations
326+
# ...
327+
end
328+
329+
end
330+
```
331+
294332
#### Complex Process Definitions
295333

296334
Any combination or nesting of `task`, `sequential`, `concurrent` and `for_each` steps are

lib/taskinator/builder.rb

Lines changed: 48 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -83,9 +83,32 @@ def job(job, options={})
8383
nil
8484
end
8585

86-
# TODO: add mailer
87-
# TODO: add complete!
88-
# TODO: add fail!
86+
# defines a task which executes the given @method before the process has started
87+
def before_started(method, options={})
88+
raise ArgumentError, 'method' if method.nil?
89+
raise NoMethodError, method unless @executor.respond_to?(method)
90+
91+
define_before_started_task(@process, method, @args, options)
92+
nil
93+
end
94+
95+
# defines a task which executes the given @method after the process has completed
96+
def after_completed(method, options={})
97+
raise ArgumentError, 'method' if method.nil?
98+
raise NoMethodError, method unless @executor.respond_to?(method)
99+
100+
define_after_completed_task(@process, method, @args, options)
101+
nil
102+
end
103+
104+
# defines a task which executes the given @method after the process has failed
105+
def after_failed(method, options={})
106+
raise ArgumentError, 'method' if method.nil?
107+
raise NoMethodError, method unless @executor.respond_to?(method)
108+
109+
define_after_failed_task(@process, method, @args, options)
110+
nil
111+
end
89112

90113
# defines a sub process task, for the given @definition
91114
# the definition specified must have input compatible arguments
@@ -104,13 +127,31 @@ def sub_process(definition, options={})
104127
private
105128

106129
def define_step_task(process, method, args, options={})
107-
define_task(process) {
130+
add_task(process.tasks) {
108131
Task.define_step_task(process, method, args, combine_options(options))
109132
}
110133
end
111134

135+
def define_before_started_task(process, method, args, options={})
136+
add_task(process.before_started_tasks) {
137+
Task.define_hook_task(process, method, args, combine_options(options))
138+
}
139+
end
140+
141+
def define_after_completed_task(process, method, args, options={})
142+
add_task(process.after_completed_tasks) {
143+
Task.define_hook_task(process, method, args, combine_options(options))
144+
}
145+
end
146+
147+
def define_after_failed_task(process, method, args, options={})
148+
add_task(process.after_failed_tasks) {
149+
Task.define_hook_task(process, method, args, combine_options(options))
150+
}
151+
end
152+
112153
def define_job_task(process, job, args, options={})
113-
define_task(process) {
154+
add_task(process.tasks) {
114155
Task.define_job_task(process, job, args, combine_options(options))
115156
}
116157
end
@@ -119,8 +160,8 @@ def define_sub_process_task(process, sub_process, options={})
119160
Task.define_sub_process_task(process, sub_process, combine_options(options))
120161
end
121162

122-
def define_task(process)
123-
process.tasks << task = yield
163+
def add_task(list)
164+
list << task = yield
124165
task
125166
end
126167

lib/taskinator/persistence.rb

Lines changed: 99 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -274,16 +274,19 @@ def visit_process(attribute)
274274
end
275275

276276
def visit_tasks(tasks)
277-
tasks.each do |task|
278-
RedisSerializationVisitor.new(@conn, task, @base_visitor).visit
279-
@conn.rpush "#{@key}:tasks", task.uuid
280-
unless task.is_a?(Task::SubProcess)
281-
incr_task_count unless self == @base_visitor
282-
@base_visitor.incr_task_count
283-
end
284-
end
285-
@conn.set("#{@key}.count", tasks.count)
286-
@conn.set("#{@key}.pending", tasks.count)
277+
_visit_tasks(tasks)
278+
end
279+
280+
def visit_before_started_tasks(tasks)
281+
_visit_tasks(tasks, ':before_started')
282+
end
283+
284+
def visit_after_completed_tasks(tasks)
285+
_visit_tasks(tasks, ':after_completed')
286+
end
287+
288+
def visit_after_failed_tasks(tasks)
289+
_visit_tasks(tasks, ':after_failed')
287290
end
288291

289292
def visit_attribute(attribute)
@@ -334,6 +337,21 @@ def task_count
334337
def incr_task_count
335338
@task_count += 1
336339
end
340+
341+
private
342+
343+
def _visit_tasks(tasks, list='')
344+
tasks.each do |task|
345+
RedisSerializationVisitor.new(@conn, task, @base_visitor).visit
346+
@conn.rpush "#{@key}#{list}:tasks", task.uuid
347+
unless task.is_a?(Task::SubProcess)
348+
incr_task_count unless self == @base_visitor
349+
@base_visitor.incr_task_count
350+
end
351+
end
352+
@conn.set("#{@key}#{list}.count", tasks.count)
353+
@conn.set("#{@key}#{list}.pending", tasks.count)
354+
end
337355
end
338356

339357
class XmlSerializationVisitor < Taskinator::Visitor::Base
@@ -386,17 +404,19 @@ def visit_process(attribute)
386404
end
387405

388406
def visit_tasks(tasks)
389-
builder.tag!('tasks', :count => tasks.count) do |xml|
390-
tasks.each do |task|
391-
xml.tag!('task', :key => task.key) do |xml2|
392-
XmlSerializationVisitor.new(xml2, task, @base_visitor).visit
393-
unless task.is_a?(Task::SubProcess)
394-
incr_task_count unless self == @base_visitor
395-
@base_visitor.incr_task_count
396-
end
397-
end
398-
end
399-
end
407+
_visit_tasks(tasks)
408+
end
409+
410+
def visit_before_started_tasks(tasks)
411+
_visit_tasks(tasks, 'before_started')
412+
end
413+
414+
def visit_after_completed_tasks(tasks)
415+
_visit_tasks(tasks, 'after_completed')
416+
end
417+
418+
def visit_after_failed_tasks(tasks)
419+
_visit_tasks(tasks, 'after_failed')
400420
end
401421

402422
def visit_attribute(attribute)
@@ -446,6 +466,22 @@ def task_count
446466
def incr_task_count
447467
@task_count += 1
448468
end
469+
470+
private
471+
472+
def _visit_tasks(tasks, list='tasks')
473+
builder.tag!(list, :count => tasks.count) do |xml|
474+
tasks.each do |task|
475+
xml.tag!('task', :key => task.key) do |xml2|
476+
XmlSerializationVisitor.new(xml2, task, @base_visitor).visit
477+
unless task.is_a?(Task::SubProcess)
478+
incr_task_count unless self == @base_visitor
479+
@base_visitor.incr_task_count
480+
end
481+
end
482+
end
483+
end
484+
end
449485
end
450486

451487
class UnknownTypeError < StandardError
@@ -541,11 +577,19 @@ def visit_process(attribute)
541577
end
542578

543579
def visit_tasks(tasks)
544-
# tasks are a linked list, so just get the first one
545-
Taskinator.redis do |conn|
546-
uuid = conn.lindex("#{@key}:tasks", 0)
547-
tasks.attach(lazy_instance_for(Task, uuid), conn.get("#{@key}.count").to_i) if uuid
548-
end
580+
_visit_tasks(tasks)
581+
end
582+
583+
def visit_before_started_tasks(tasks)
584+
_visit_tasks(tasks, ':before_started')
585+
end
586+
587+
def visit_after_completed_tasks(tasks)
588+
_visit_tasks(tasks, ':after_completed')
589+
end
590+
591+
def visit_after_failed_tasks(tasks)
592+
_visit_tasks(tasks, ':after_failed')
549593
end
550594

551595
def visit_process_reference(attribute)
@@ -607,6 +651,14 @@ def visit_args(attribute)
607651

608652
private
609653

654+
def _visit_tasks(tasks, list='')
655+
# tasks are a linked list, so just get the first one
656+
Taskinator.redis do |conn|
657+
uuid = conn.lindex("#{@key}#{list}:tasks", 0)
658+
tasks.attach(lazy_instance_for(Task, uuid), conn.get("#{@key}#{list}.count").to_i) if uuid
659+
end
660+
end
661+
610662
#
611663
# creates a proxy for the instance which
612664
# will only fetch the instance when used
@@ -649,14 +701,31 @@ def visit_process(attribute)
649701
end
650702

651703
def visit_tasks(tasks)
652-
@conn.expire "#{@key}:tasks", expire_in
653-
@conn.expire "#{@key}.count", expire_in
654-
@conn.expire "#{@key}.pending", expire_in
704+
_visit_tasks(tasks)
705+
end
706+
707+
def visit_before_started_tasks(tasks)
708+
_visit_tasks(tasks, ':before_started')
709+
end
710+
711+
def visit_after_completed_tasks(tasks)
712+
_visit_tasks(tasks, ':after_completed')
713+
end
714+
715+
def visit_after_failed_tasks(tasks)
716+
_visit_tasks(tasks, ':after_failed')
717+
end
718+
719+
private
720+
721+
def _visit_tasks(tasks, list='')
722+
@conn.expire "#{@key}#{list}:tasks", expire_in
723+
@conn.expire "#{@key}#{list}.count", expire_in
724+
@conn.expire "#{@key}#{list}.pending", expire_in
655725
tasks.each do |task|
656726
RedisCleanupVisitor.new(@conn, task, expire_in).visit
657727
end
658728
end
659-
660729
end
661730

662731
# lazily loads the object specified by the type and uuid

lib/taskinator/process.rb

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,18 @@ def tasks
5555
@tasks ||= Tasks.new
5656
end
5757

58+
def before_started_tasks
59+
@before_started_tasks ||= Tasks.new
60+
end
61+
62+
def after_completed_tasks
63+
@after_completed_tasks ||= Tasks.new
64+
end
65+
66+
def after_failed_tasks
67+
@after_failed_tasks ||= Tasks.new
68+
end
69+
5870
def no_tasks_defined?
5971
tasks.empty?
6072
end
@@ -64,6 +76,9 @@ def accept(visitor)
6476
visitor.visit_task_reference(:parent)
6577
visitor.visit_type(:definition)
6678
visitor.visit_tasks(tasks)
79+
visitor.visit_before_started_tasks(before_started_tasks)
80+
visitor.visit_after_completed_tasks(after_completed_tasks)
81+
visitor.visit_after_failed_tasks(after_failed_tasks)
6782
visitor.visit_args(:options)
6883
visitor.visit_attribute(:scope)
6984
visitor.visit_attribute(:queue)
@@ -92,6 +107,9 @@ def enqueue!
92107
def start!
93108
return if paused? || cancelled?
94109

110+
# enqueue before started tasks independently
111+
before_started_tasks.each(&:enqueue!)
112+
95113
transition(:processing) do
96114
instrument('taskinator.process.processing', processing_payload) do
97115
start
@@ -132,6 +150,9 @@ def complete!
132150
end
133151
end
134152
end
153+
154+
# enqueue completion tasks independently
155+
after_completed_tasks.each(&:enqueue!)
135156
end
136157

137158
# TODO: add retry method - to pick up from a failed task
@@ -159,6 +180,9 @@ def fail!(error)
159180
parent.fail!(error) unless parent.nil?
160181
end
161182
end
183+
184+
# enqueue completion tasks independently
185+
after_failed_tasks.each(&:enqueue!)
162186
end
163187

164188
def task_failed(task, error)

0 commit comments

Comments
 (0)