-
Notifications
You must be signed in to change notification settings - Fork 7
Prevent on_cancellation_job & on_completion_job deserialization failure blocking cleanup #24
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 6 commits
7843212
844e506
630af06
593cdfa
a4dd750
fd11003
08bd18d
26f58af
8903a2b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,9 @@ | ||
# frozen_string_literal: true | ||
|
||
module Delayed | ||
module JobGroups | ||
class Configuration | ||
attr_accessor :error_reporter | ||
end | ||
end | ||
end |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -48,7 +48,21 @@ def unblock | |
end | ||
|
||
def cancel | ||
Delayed::Job.enqueue(on_cancellation_job, on_cancellation_job_options || {}) if on_cancellation_job | ||
job = nil | ||
job_options = nil | ||
|
||
# Deserialization of the job or its options can fail | ||
begin | ||
job = on_cancellation_job | ||
job_options = on_cancellation_job_options | ||
rescue StandardError => e | ||
Delayed::Worker.logger.info('Failed to deserialize the on_cancellation_job or on_cancellation_job_options ' \ | ||
"for job_group_id=#{id}. Skipping on_cancellation_job to clean up job group.") | ||
error_reporter.call(e) if error_reporter | ||
end | ||
|
||
Delayed::Job.enqueue(job, job_options || {}) if job | ||
|
||
destroy | ||
end | ||
|
||
|
@@ -80,9 +94,26 @@ def ready_for_completion? | |
end | ||
|
||
def complete | ||
Delayed::Job.enqueue(on_completion_job, on_completion_job_options || {}) if on_completion_job | ||
job = nil | ||
job_options = nil | ||
|
||
# Deserialization of the job or its options can fail | ||
begin | ||
job = on_completion_job | ||
job_options = on_completion_job_options | ||
rescue StandardError => e | ||
|
||
Delayed::Worker.logger.info('Failed to deserialize the on_completion_job or on_completion_job_options for ' \ | ||
"job_group_id=#{id}. Skipping on_completion_job to clean up job group.") | ||
error_reporter.call(e) if error_reporter | ||
|
||
end | ||
|
||
Delayed::Job.enqueue(job, job_options || {}) if job | ||
destroy | ||
end | ||
|
||
def error_reporter | ||
Delayed::JobGroups.configuration.error_reporter | ||
end | ||
end | ||
end | ||
end |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,6 +2,6 @@ | |
|
||
module Delayed | ||
module JobGroups | ||
VERSION = '0.7.0' | ||
VERSION = '0.8.0' | ||
end | ||
end |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -140,6 +140,47 @@ | |
expect(job_group).to have_been_destroyed | ||
end | ||
end | ||
|
||
context "on_completion_job refers to missing class" do | ||
# The on_completion_job needs the class to be defined this way in order to serialize it | ||
# rubocop:disable RSpec/LeakyConstantDeclaration,Style/ClassAndModuleChildren,Lint/ConstantDefinitionInBlock | ||
module Delayed::JobGroups::JobGroupTestHelper | ||
|
||
class OnCompletionJob | ||
|
||
end | ||
end | ||
# rubocop:enable RSpec/LeakyConstantDeclaration,Style/ClassAndModuleChildren,Lint/ConstantDefinitionInBlock | ||
|
||
let(:error_reporter) { Proc.new { |_error| } } | ||
|
||
around do |example| | ||
original_error_reporter = Delayed::JobGroups.configuration.error_reporter | ||
Delayed::JobGroups.configuration.error_reporter = error_reporter | ||
example.run | ||
Delayed::JobGroups.configuration.error_reporter = original_error_reporter | ||
end | ||
|
||
before { allow(error_reporter).to receive(:call) } | ||
|
||
it "handles missing on_completion_job" do | ||
on_completion_job = Delayed::JobGroups::JobGroupTestHelper::OnCompletionJob.new | ||
job_group = Delayed::JobGroups::JobGroup.create!(on_completion_job: on_completion_job, | ||
on_completion_job_options: {}) | ||
job = Delayed::Job.create!(job_group_id: job_group.id) | ||
job_group.mark_queueing_complete | ||
job.destroy | ||
|
||
# Remove the class for on_completion_job | ||
Delayed::JobGroups::JobGroupTestHelper.module_eval do | ||
remove_const 'OnCompletionJob' | ||
end | ||
|
||
# Deserialization fails | ||
expect { Delayed::JobGroups::JobGroup.check_for_completion(job_group.id) }.not_to raise_error | ||
expect(error_reporter).to have_received(:call) | ||
expect(job_group).to have_been_destroyed | ||
end | ||
end | ||
end | ||
|
||
describe "#enqueue" do | ||
|
@@ -212,20 +253,60 @@ | |
let!(:queued_job) { Delayed::Job.create!(job_group_id: job_group.id) } | ||
let!(:running_job) { Delayed::Job.create!(job_group_id: job_group.id, locked_at: Time.now, locked_by: 'test') } | ||
|
||
before do | ||
job_group.cancel | ||
end | ||
context "with no on_cancellation_job" do | ||
before do | ||
job_group.cancel | ||
end | ||
|
||
it "destroys the job group" do | ||
expect(job_group).to have_been_destroyed | ||
end | ||
it "destroys the job group" do | ||
expect(job_group).to have_been_destroyed | ||
end | ||
|
||
it "destroys queued jobs" do | ||
expect(queued_job).to have_been_destroyed | ||
it "destroys queued jobs" do | ||
expect(queued_job).to have_been_destroyed | ||
end | ||
|
||
it "does not destroy running jobs" do | ||
expect(running_job).not_to have_been_destroyed | ||
end | ||
end | ||
|
||
it "does not destroy running jobs" do | ||
expect(running_job).not_to have_been_destroyed | ||
context "on_cancellation_job refers to missing class" do | ||
# The on_cancellation_job needs the class to be defined this way in order to serialize it | ||
# rubocop:disable RSpec/LeakyConstantDeclaration,Style/ClassAndModuleChildren,Lint/ConstantDefinitionInBlock | ||
module Delayed::JobGroups::JobGroupTestHelper | ||
class OnCancellationJob | ||
|
||
end | ||
end | ||
# rubocop:enable RSpec/LeakyConstantDeclaration,Style/ClassAndModuleChildren,Lint/ConstantDefinitionInBlock | ||
|
||
let(:error_reporter) { Proc.new { |_error| } } | ||
|
||
around do |example| | ||
original_error_reporter = Delayed::JobGroups.configuration.error_reporter | ||
Delayed::JobGroups.configuration.error_reporter = error_reporter | ||
example.run | ||
Delayed::JobGroups.configuration.error_reporter = original_error_reporter | ||
end | ||
|
||
before { allow(error_reporter).to receive(:call) } | ||
|
||
it "handles missing on_cancellation_job" do | ||
on_cancellation_job = Delayed::JobGroups::JobGroupTestHelper::OnCancellationJob.new | ||
job_group = Delayed::JobGroups::JobGroup.create!(on_cancellation_job: on_cancellation_job, | ||
on_cancellation_job_options: {}) | ||
|
||
# Remove the class for on_cancellation_job | ||
Delayed::JobGroups::JobGroupTestHelper.module_eval do | ||
remove_const 'OnCancellationJob' | ||
end | ||
|
||
# Deserialization fails | ||
expect { job_group.cancel }.not_to raise_error | ||
expect(error_reporter).to have_received(:call) | ||
expect(job_group).to have_been_destroyed | ||
end | ||
end | ||
end | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we only catch
Delayed::DeserializationError
s so we'll crash and retry for other types of errors?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In my scenario, I got an
ArgumentError
. Checking out https://github.com/collectiveidea/delayed_job/blob/master/lib/delayed/backend/base.rb#L73, it looks like quite a long list of errors that can be generated byYAML.load_dj(handler)
. I'm happy to copy paste all of those errors in here if that's preferred.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yuck. I was hoping it was just a single exception to catch. It's not ideal but I think copying that list of errors will avoid mistakenly rescuing some classes of errors.