From fb2ce6909e6161da7880f7be68739a6bac654525 Mon Sep 17 00:00:00 2001 From: Marco Date: Fri, 5 Sep 2025 12:15:06 -0400 Subject: [PATCH 1/4] Move On Cancellation Logic to After Failure Hook In order to allow individual jobs in the JobGroup to perform cleanup activities upon job failure, a JobGroups on cancellation job must be enqueued in the after failure delayed job lifecycle hook. If both were to be performed in the same hook, the job's failure hook and the on cancellation job may run at the same time since hook execution order is never guaranteed. This is particular important if the on cancellation job depends on state set in the failure hook of an individual job. This also forces the Delayed::Worker.destroy_failed_jobs to be set to false in order to prevent a race condition where a JobGroup could be completed (instead of cancelled) if a successful job occurs at the same time as a failing job. --- CHANGELOG.md | 10 ++++++++++ README.md | 10 ++++++++++ lib/delayed/job_groups/errors.rb | 16 ++++++++++++++++ lib/delayed/job_groups/plugin.rb | 8 +++++++- lib/delayed/job_groups/railtie.rb | 9 +++++++++ lib/delayed/job_groups/version.rb | 2 +- lib/delayed_job_groups_plugin.rb | 2 ++ spec/spec_helper.rb | 8 +++++--- 8 files changed, 60 insertions(+), 5 deletions(-) create mode 100644 lib/delayed/job_groups/errors.rb diff --git a/CHANGELOG.md b/CHANGELOG.md index 7c8c5af..3d6d5a6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,15 @@ # Changelog +## 1.0.0 +### Breaking Changes +- This library will fail to load if `Delayed::Worker.destroy_failed_jobs` is set to true. + Delayed::Job sets this option to true by default, you will need to configure it to false + in order to include this library. +### Changes +- Moves `on_cancellation` logic from the before delayed job lifecycle hook to the after hook. +- Wrapped the job group cancel hook in a lock to prevent concurrently failing jobs from enqueueing + multiple on cancellation jobs. + ## 0.14.0 - Reverts changes made in version 0.13. diff --git a/README.md b/README.md index 74be389..f68eba3 100644 --- a/README.md +++ b/README.md @@ -35,6 +35,15 @@ Run the required database migrations: $ rails generate delayed_job_groups_plugin:install $ rake db:migrate +## Upgrading from 0.14.0 + +This library is now incompatible with Delayed::Job's default setting for `destroy_failed_jobs`. +In order to use Delayed Job Groups, you must set it to false while configuring Delayed::Job: + +```ruby +Delayed::Worker.destroy_failed_jobs = false +``` + ## Upgrading from 0.1.2 run the following generator to create a migration for the new configuration column. @@ -42,6 +51,7 @@ run the following generator to create a migration for the new configuration colu # add `default: true, null: false` to the generated migration for the failure_cancels_group column $ rake db:migrate + ## Usage Creating a job group and queueing some jobs: diff --git a/lib/delayed/job_groups/errors.rb b/lib/delayed/job_groups/errors.rb new file mode 100644 index 0000000..ee0393f --- /dev/null +++ b/lib/delayed/job_groups/errors.rb @@ -0,0 +1,16 @@ +# frozen_string_literal: true + +module Delayed + module JobGroups + ConfigurationError = Class.new(StandardError) + + class IncompatibleWithDelayedJobError < ConfigurationError + DEFAULT_MESSAGE = 'DelayedJobGroupsPlugin is incompatible with Delayed::Job ' \ + 'when `destroy_failed_jobs` is set to `true`' + + def initialize(msg = DEFAULT_MESSAGE) + super(msg) + end + end + end +end diff --git a/lib/delayed/job_groups/plugin.rb b/lib/delayed/job_groups/plugin.rb index 99495bc..48b07eb 100644 --- a/lib/delayed/job_groups/plugin.rb +++ b/lib/delayed/job_groups/plugin.rb @@ -17,7 +17,13 @@ def job.max_attempts end end - lifecycle.before(:failure) do |_worker, job| + # In order to allow individual jobs in the JobGroup to perform cleanup activities upon + # job failure, a JobGroups on cancellation job must be enqueued in the after failure + # delayed job lifecycle hook. If both were to be performed in the same hook, the job's + # failure hook and the on cancellation job may run at the same time since hook execution + # order is never guaranteed. This is particular important if the on cancellation job + # depends on state set in the failure hook of an individual job. + lifecycle.after(:failure) do |_worker, job| # If a job in the job group fails, then cancel the whole job group. # Need to check that the job group is present since another # job may have concurrently cancelled it. diff --git a/lib/delayed/job_groups/railtie.rb b/lib/delayed/job_groups/railtie.rb index 7a7a210..1965ba1 100644 --- a/lib/delayed/job_groups/railtie.rb +++ b/lib/delayed/job_groups/railtie.rb @@ -4,6 +4,15 @@ module Delayed module JobGroups class Railtie < ::Rails::Railtie config.after_initialize do + + # On cancellation checks are performed in the after failure delayed job lifecycle, however + # https://github.com/collectiveidea/delayed_job/blob/master/lib/delayed/worker.rb#L268 may + # delete jobs before the hook runs. This could cause a successful job in the same group to + # complete the group instead of the group being cancelled. Therefore, we must ensure that + # the Delayed::Worker.destroy_failed_jobs is set to false, guaranteeing that the group is + # never empty if failure occurs. + raise Delayed::JobGroups::IncompatibleWithDelayedJobError if Delayed::Worker.destroy_failed_jobs + Delayed::Backend::ActiveRecord::Job.include(Delayed::JobGroups::JobExtensions) end end diff --git a/lib/delayed/job_groups/version.rb b/lib/delayed/job_groups/version.rb index 674b79d..c6ece2a 100644 --- a/lib/delayed/job_groups/version.rb +++ b/lib/delayed/job_groups/version.rb @@ -2,6 +2,6 @@ module Delayed module JobGroups - VERSION = '0.14.0' + VERSION = '1.0.0' end end diff --git a/lib/delayed_job_groups_plugin.rb b/lib/delayed_job_groups_plugin.rb index 5179ed7..52d8663 100644 --- a/lib/delayed_job_groups_plugin.rb +++ b/lib/delayed_job_groups_plugin.rb @@ -4,6 +4,7 @@ require 'active_record' require 'delayed_job' require 'delayed_job_active_record' +require 'delayed/job_groups/errors' require 'delayed/job_groups/compatibility' require 'delayed/job_groups/complete_stuck_job_groups_job' require 'delayed/job_groups/job_extensions' @@ -18,6 +19,7 @@ else # Do the same as in the railtie Delayed::Backend::ActiveRecord::Job.include(Delayed::JobGroups::JobExtensions) + raise Delayed::JobGroups::IncompatibleWithDelayedJobError if Delayed::Worker.destroy_failed_jobs end Delayed::Worker.plugins << Delayed::JobGroups::Plugin diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index 16e4f2d..e50f64c 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -13,6 +13,11 @@ require 'rspec/its' require 'database_cleaner' +require 'delayed_job' + +Delayed::Worker.read_ahead = 1 +Delayed::Worker.destroy_failed_jobs = false + require 'delayed_job_groups_plugin' require 'factory_bot' require 'yaml' @@ -23,9 +28,6 @@ FileUtils.makedirs('log') -Delayed::Worker.read_ahead = 1 -Delayed::Worker.destroy_failed_jobs = false - Delayed::Worker.logger = Logger.new('log/test.log') Delayed::Worker.logger.level = Logger::DEBUG ActiveRecord::Base.logger = Delayed::Worker.logger From c3f53e30044caf9f62017d08921641130a8b5d4e Mon Sep 17 00:00:00 2001 From: Marco Date: Fri, 12 Sep 2025 16:12:22 -0400 Subject: [PATCH 2/4] Lock while enqueuing on cancellation job --- lib/delayed/job_groups/job_group.rb | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/lib/delayed/job_groups/job_group.rb b/lib/delayed/job_groups/job_group.rb index e719b4d..e3adbbc 100644 --- a/lib/delayed/job_groups/job_group.rb +++ b/lib/delayed/job_groups/job_group.rb @@ -62,8 +62,16 @@ def unblock end def cancel - Delayed::Job.enqueue(on_cancellation_job, on_cancellation_job_options || {}) if on_cancellation_job - destroy + with_lock do + Delayed::Job.enqueue(on_cancellation_job, on_cancellation_job_options || {}) if on_cancellation_job + destroy + end + rescue ActiveRecord::RecordNotFound + # The first failing job to attempt cancelling the job group will enqueue the + # on cancellation job and destroy the group. Any other concurrently failing job + # in the same group can therefore silently return if the job group has already + # been destroyed. + nil end def check_for_completion(skip_pending_jobs_check: false) From 77f801405b0d32f268f0c618dbe38049777d60be Mon Sep 17 00:00:00 2001 From: Marco Date: Fri, 12 Sep 2025 16:12:22 -0400 Subject: [PATCH 3/4] Raise error if group is already cancelled --- lib/delayed/job_groups/job_group.rb | 2 ++ spec/delayed/job_groups/job_group_spec.rb | 34 ++++++++++++++++++++++- 2 files changed, 35 insertions(+), 1 deletion(-) diff --git a/lib/delayed/job_groups/job_group.rb b/lib/delayed/job_groups/job_group.rb index e3adbbc..68954f5 100644 --- a/lib/delayed/job_groups/job_group.rb +++ b/lib/delayed/job_groups/job_group.rb @@ -63,6 +63,8 @@ def unblock def cancel with_lock do + return if destroyed? + Delayed::Job.enqueue(on_cancellation_job, on_cancellation_job_options || {}) if on_cancellation_job destroy end diff --git a/spec/delayed/job_groups/job_group_spec.rb b/spec/delayed/job_groups/job_group_spec.rb index 92da6b3..7e5b7e4 100644 --- a/spec/delayed/job_groups/job_group_spec.rb +++ b/spec/delayed/job_groups/job_group_spec.rb @@ -299,11 +299,28 @@ end describe "#cancel" do + subject(:job_group) do + create( + :job_group, + on_completion_job: on_completion_job, + on_completion_job_options: on_completion_job_options, + on_cancellation_job: on_cancellation_job, + on_cancellation_job_options: on_cancellation_job_options, + blocked: blocked + ) + end + let!(:queued_job) { Delayed::Job.create!(job_group_id: job_group.id) } let!(:running_job) { Delayed::Job.create!(job_group_id: job_group.id, locked_at: Time.now, locked_by: 'test') } + let(:before_hook) {} + let(:on_cancellation_job) { 'dummy job' } + let(:on_cancellation_job_options) do + { foo: 'bar' } + end + let(:cancel) { true } before do - job_group.cancel + job_group.cancel if cancel end it "destroys the job group" do @@ -317,6 +334,21 @@ it "does not destroy running jobs" do expect(running_job).not_to have_been_destroyed end + + context "when already cancelled" do + let(:cancel) { false } + + it "skips cancel block if already cancelled" do + other = Delayed::JobGroups::JobGroup.find(job_group.id) + job_group.cancel + + other.cancel + other.cancel + + expect(Delayed::Job) + .to have_received(:enqueue).with(on_cancellation_job, on_cancellation_job_options).once + end + end end describe "#failure_cancels_group?" do From e709a9dc69c0088e4890eef5d4f9831e88dbec75 Mon Sep 17 00:00:00 2001 From: Marco Date: Fri, 26 Sep 2025 14:50:39 -0400 Subject: [PATCH 4/4] Set Delayed::Worker option in initializer --- lib/delayed/job_groups/railtie.rb | 4 ++- .../install_generator.rb | 32 +++++++++++++++++++ 2 files changed, 35 insertions(+), 1 deletion(-) diff --git a/lib/delayed/job_groups/railtie.rb b/lib/delayed/job_groups/railtie.rb index 1965ba1..efb39ca 100644 --- a/lib/delayed/job_groups/railtie.rb +++ b/lib/delayed/job_groups/railtie.rb @@ -11,7 +11,9 @@ class Railtie < ::Rails::Railtie # complete the group instead of the group being cancelled. Therefore, we must ensure that # the Delayed::Worker.destroy_failed_jobs is set to false, guaranteeing that the group is # never empty if failure occurs. - raise Delayed::JobGroups::IncompatibleWithDelayedJobError if Delayed::Worker.destroy_failed_jobs + if Delayed::Worker.destroy_failed_jobs && ActiveRecord::Base.connection.table_exists?(:delayed_job_groups) + raise Delayed::JobGroups::IncompatibleWithDelayedJobError + end Delayed::Backend::ActiveRecord::Job.include(Delayed::JobGroups::JobExtensions) end diff --git a/lib/generators/delayed_job_groups_plugin/install_generator.rb b/lib/generators/delayed_job_groups_plugin/install_generator.rb index 8b9767d..3c77b80 100644 --- a/lib/generators/delayed_job_groups_plugin/install_generator.rb +++ b/lib/generators/delayed_job_groups_plugin/install_generator.rb @@ -18,5 +18,37 @@ def self.next_migration_number(dirname) ActiveRecord::Generators::Base.next_migration_number(dirname) end + def create_initializer + initializer_file = File.join('config/initializers', 'delayed_job_config.rb') + configuration_on_matcher = /Delayed::Worker\.destroy_failed_jobs\s*=\s*true/ + configuration_off_matcher = /Delayed::Worker\.destroy_failed_jobs\s*=\s*false/ + + say 'Attempting to initialize delayed_job_config initializer...', :green + + if File.exist?(initializer_file) + say 'delayed_job_config initializer already exists... checking destroy_failed_jobs options', :green + contents = File.read(initializer_file) + if contents.match(configuration_on_matcher) + say 'Delayed::Worker.destroy_failed_jobs is set to true', :red + say 'This library requires the option to be set to false, updating config now!', :yellow + + gsub_file initializer_file, configuration_on_matcher, 'Delayed::Worker.destroy_failed_jobs = false' + elsif contents.match(configuration_off_matcher) + say 'Delayed::Worker.destroy_failed_jobs is set to false; nothing to do!', :green + else + say 'Delayed::Worker.destroy_failed_jobs is not set' + say 'This library requires the option to be set to false, updating config now!', :yellow + inject_into_file initializer_file, "Delayed::Worker.destroy_failed_jobs = false\n" + end + else + create_file initializer_file do + <<~RUBY + # frozen_string_literal: true + + Delayed::Worker.destroy_failed_jobs = false + RUBY + end + end + end end end