Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Filter by date #209

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,14 @@ ActiveJob.jobs.finished.where(job_class_name: "SomeJob")
# For adapters that support filtering by worker:
# All jobs in progress being run by a given worker
ActiveJob.jobs.in_progress.where(worker_id: 42)

# Using date filters
# You can filter by: enqueued_at, scheduled_at or finished_at
ActiveJob.jobs.pending.where(enqueued_at: 2.days.ago)
ActiveJob.jobs.pending.where(scheduled_at: Date.today)

date_range = (Time.parse("2024-11-01")..Time.parse("2024-12-01"))
ActiveJob.jobs.finished.where(finished_at: date_range)
```

Some examples of bulk operations:
Expand Down
36 changes: 31 additions & 5 deletions lib/active_job/jobs_relation.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ class ActiveJob::JobsRelation
include Enumerable

STATUSES = %i[ pending failed in_progress blocked scheduled finished ]
FILTERS = %i[ queue_name job_class_name ]
FILTERS = %i[ queue_name job_class_name finished_at scheduled_at enqueued_at ]

PROPERTIES = %i[ queue_name status offset_value limit_value job_class_name worker_id recurring_task_id ]
PROPERTIES = %i[ queue_name status offset_value limit_value job_class_name worker_id recurring_task_id finished_at scheduled_at enqueued_at ]
attr_reader *PROPERTIES, :default_page_size

delegate :last, :[], :reverse, to: :to_a
Expand All @@ -51,12 +51,15 @@ def initialize(queue_adapter: ActiveJob::Base.queue_adapter, default_page_size:
# * <tt>:queue_name</tt> - To only include the jobs in the provided queue.
# * <tt>:worker_id</tt> - To only include the jobs processed by the provided worker.
# * <tt>:recurring_task_id</tt> - To only include the jobs corresponding to runs of a recurring task.
def where(job_class_name: nil, queue_name: nil, worker_id: nil, recurring_task_id: nil)
def where(job_class_name: nil, queue_name: nil, worker_id: nil, recurring_task_id: nil, finished_at: nil, scheduled_at: nil, enqueued_at: nil)
# Remove nil arguments to avoid overriding parameters when concatenating +where+ clauses
arguments = { job_class_name: job_class_name,
queue_name: queue_name,
worker_id: worker_id,
recurring_task_id: recurring_task_id
recurring_task_id: recurring_task_id,
finished_at: finished_at,
scheduled_at: scheduled_at,
enqueued_at: enqueued_at
}.compact.collect { |key, value| [ key, value.to_s ] }.to_h

clone_with **arguments
Expand Down Expand Up @@ -264,8 +267,31 @@ def filter(jobs)
jobs.filter { |job| satisfy_filter?(job) }
end

def satisfy_date_filter?(filter_value, job_value)
return false if job_value.nil?

# Treat date ranges
if filter_value.include?("..")
start_date, end_date = filter_value.split("..").map { |date| Time.zone.parse(date) }
filter_range = (start_date..end_date)
return filter_range.cover?(job_value)
end

filter = Time.zone.parse(filter_value)
job_value >= filter
end

def satisfy_filter?(job)
filters.all? { |property| public_send(property) == job.public_send(property) }
filters.all? do |property|
filter_value = public_send(property)
job_value = job.public_send(property)

is_date_filter?(property) ? satisfy_date_filter?(filter_value, job_value) : filter_value == job_value
end
end

def is_date_filter?(property)
[ :finished_at, :scheduled_at, :enqueued_at ].include?(property)
end

def filters
Expand Down