Skip to content

Commit

Permalink
Support nested batches
Browse files Browse the repository at this point in the history
* Parent batches will not complete until all child batches have been completed
  • Loading branch information
jpcamara committed Sep 25, 2024
1 parent 7d951b3 commit d01a9b2
Showing 1 changed file with 29 additions and 15 deletions.
44 changes: 29 additions & 15 deletions app/models/solid_queue/job_batch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ class JobBatch < Record
belongs_to :job, foreign_key: :job_id, optional: true
belongs_to :parent_job_batch, foreign_key: :parent_job_batch_id, class_name: "SolidQueue::JobBatch", optional: true
has_many :jobs, foreign_key: :batch_id
has_many :children, foreign_key: :parent_job_batch_id, class_name: "SolidQueue::JobBatch"

serialize :on_finish_active_job, coder: JSON
serialize :on_success_active_job, coder: JSON
Expand All @@ -21,28 +22,33 @@ def current_batch_id
end

def enqueue(attributes = {})
previous_batch_id = current_batch_id.presence || nil

job_batch = nil
transaction do
job_batch = create!(batch_attributes(attributes))
ActiveSupport::IsolatedExecutionState[:current_batch_id] = job_batch.id
yield job_batch
wrap_in_batch_context(job_batch.id) do
yield job_batch
end
end

job_batch
ensure
ActiveSupport::IsolatedExecutionState[:current_batch_id] = previous_batch_id
end

def dispatch_finished_batches
incomplete.order(:id).pluck(:id).each do |id|
transaction do
where(id: id).non_blocking_lock.each(&:finish)
where(id: id).includes(:children, :jobs).non_blocking_lock.each(&:finish)
end
end
end

def wrap_in_batch_context(batch_id)
previous_batch_id = current_batch_id.presence || nil
ActiveSupport::IsolatedExecutionState[:current_batch_id] = batch_id
yield
ensure
ActiveSupport::IsolatedExecutionState[:current_batch_id] = previous_batch_id
end

private

def batch_attributes(attributes)
Expand All @@ -62,6 +68,8 @@ def batch_attributes(attributes)
attributes[:on_failure_active_job] = as_active_job(on_failure_klass).serialize
end

attributes[:parent_job_batch_id] = current_batch_id if current_batch_id.present?

attributes
end

Expand All @@ -74,16 +82,13 @@ def as_active_job(active_job_klass)
def enqueue(attributes = {})
raise "You cannot enqueue a batch that is already finished" if finished?

previous_batch_id = self.class.current_batch_id.presence || nil

transaction do
ActiveSupport::IsolatedExecutionState[:current_batch_id] = id
yield self
self.class.wrap_in_batch_context(id) do
yield self
end
end

self
ensure
ActiveSupport::IsolatedExecutionState[:current_batch_id] = previous_batch_id
end

def finished?
Expand All @@ -110,6 +115,10 @@ def finish
return unless status.in?([ :finished, :failed ])
end

children.find_each do |child|
return unless child.finished?
end

if on_finish_active_job.present?
perform_completion_job(:on_finish_active_job, attrs)
end
Expand All @@ -118,7 +127,10 @@ def finish
perform_completion_job(:on_success_active_job, attrs)
end

update!({ finished_at: Time.zone.now }.merge(attrs))
transaction do
parent_job_batch.touch(:changed_at, :last_changed_at) if parent_job_batch_id.present?
update!({ finished_at: Time.zone.now }.merge(attrs))
end
end

private
Expand All @@ -133,7 +145,9 @@ def perform_completion_job(job_field, attrs)
active_job = ActiveJob::Base.deserialize(send(job_field))
active_job.send(:deserialize_arguments_if_needed)
active_job.arguments = [ self ] + Array.wrap(active_job.arguments)
ActiveJob.perform_all_later([ active_job ])
self.class.wrap_in_batch_context(id) do
ActiveJob.perform_all_later([ active_job ])
end
active_job.provider_job_id = Job.find_by(active_job_id: active_job.job_id).id
attrs[job_field] = active_job.serialize
end
Expand Down

0 comments on commit d01a9b2

Please sign in to comment.