Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Workers filters #115

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 20 additions & 1 deletion app/controllers/mission_control/jobs/workers_controller.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
class MissionControl::Jobs::WorkersController < MissionControl::Jobs::ApplicationController
before_action :ensure_exposed_workers
before_action :set_filters, only: :index

helper_method :active_filters?, :workers_filter_param

def index
@workers_page = MissionControl::Jobs::Page.new(workers_relation, page: params[:page].to_i)
Expand All @@ -18,6 +21,22 @@ def ensure_exposed_workers
end

def workers_relation
MissionControl::Jobs::Current.server.workers_relation
MissionControl::Jobs::Current.server.workers_relation.where(**@worker_filters)
end

def active_filters?
@worker_filters.any?
end

def workers_filter_param
if @worker_filters&.any?
{ filter: @worker_filters }
else
{}
end
end

def set_filters
@worker_filters = { hostname: params.dig(:filter, :hostname).presence, pid: params.dig(:filter, :pid), queues: params.dig(:filter, :queues) }.compact
end
end
24 changes: 24 additions & 0 deletions app/views/mission_control/jobs/workers/_filters.html.erb
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
<div class="filter level-left">
<div class="field is-grouped">
<div class="control">
<%= form_for :filter, url: application_workers_path(MissionControl::Jobs::Current.application), method: :get,
data: { controller: "form", action: "input->form#debouncedSubmit" } do |form| %>

<div class="select is-rounded">
<%= form.text_field :hostname, value: @worker_filters[:hostname], class: "input", list: "job-classes", placeholder: "Filter by hostname..." %>
</div>

<div class="select is-rounded">
<%= form.text_field :name, value: @worker_filters[:pid], class: "input", list: "job-classes", placeholder: "Filter by pid..." %>
</div>

<%= hidden_field_tag :server_id, MissionControl::Jobs::Current.server.id %>

<% end %>
</div>

<div class="control">
<%= link_to "Clear", application_workers_path(MissionControl::Jobs::Current.application, hostname: nil, pid: nil), class: "button" %>
</div>
</div>
</div>
15 changes: 13 additions & 2 deletions app/views/mission_control/jobs/workers/index.html.erb
Original file line number Diff line number Diff line change
@@ -1,6 +1,17 @@
<% navigation(title: "Workers", section: :workers) %>

<%= render "mission_control/jobs/workers/workers_page", workers_page: @workers_page %>
<% if @workers_page.empty? && !active_filters? %>
<%= blank_status_notice "There are no active workers" %>
<% else %>
<div class="level is-flex-wrap-wrap">
<%= render "mission_control/jobs/workers/filters" %>
</div>

<%= render "mission_control/jobs/shared/pagination_toolbar", page: @workers_page, filter_param: {} %>
<% if @workers_page.empty? %>
<%= blank_status_notice "No workers found with the given filters" %>
<% else %>
<%= render "mission_control/jobs/workers/workers_page", workers_page: @workers_page %>

<%= render "mission_control/jobs/shared/pagination_toolbar", page: @workers_page, filter_param: workers_filter_param %>
<% end %>
<% end %>
25 changes: 23 additions & 2 deletions lib/active_job/queue_adapters/solid_queue_ext/workers.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,29 @@ def find_worker(worker_id)
end

private
def solid_queue_processes_from_workers_relation(relation)
SolidQueue::Process.where(kind: "Worker").offset(relation.offset_value).limit(relation.limit_value)

def solid_queue_processes_from_workers_relation(workers_relation)
SolidQueue::Process.where(kind: "Worker")
.then { |workers| filter_by_hostname(workers, workers_relation.hostname) }
.then { |workers| filter_by_pid(workers, workers_relation.pid) }
.then { |workers| limit(workers, workers_relation.limit_value) }
.then { |workers| offset(workers, workers_relation.offset_value) }
end

def filter_by_hostname(workers, hostname)
hostname.present? ? workers.where(hostname: hostname) : workers
end

def filter_by_pid(workers, pid)
pid.present? ? workers.where(pid: pid) : workers
end

def limit(workers, limit)
workers.limit(limit)
end

def offset(workers, offset)
workers.offset(offset)
end

def worker_from_solid_queue_process(process)
Expand Down
33 changes: 32 additions & 1 deletion lib/mission_control/jobs/workers_relation.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,30 @@ class MissionControl::Jobs::WorkersRelation

delegate :last, :[], :to_s, :reverse, to: :to_a

attr_reader :hostname, :pid, :queues

ALL_WORKERS_LIMIT = 100_000_000 # When no limit value it defaults to "all workers"
IN_MEMORY_FILTERS = %w[ queues ]

def initialize(queue_adapter:)
@queue_adapter = queue_adapter

set_defaults
end

# Returns a +MissionControl::Jobs::WorkersRelation+ with the configured filtering options.
#
# === Options
# * <tt>:hostname</tt> - To only include the workers of a given hostname.
# * <tt>:pid</tt> - To only include the workers of a given pid.
# * <tt>:queues</tt> - To only include workers processing jobs from a the given queues.
def where(hostname: nil, pid: nil, queues: nil)
# Remove nil arguments to avoid overriding parameters when concatenating +where+ clauses
arguments = { hostname: hostname, pid: pid, queues: queues }.compact

clone_with **arguments
end

def offset(offset)
clone_with offset_value: offset
end
Expand Down Expand Up @@ -51,13 +67,28 @@ def empty?
alias size count

private
attr_writer :hostname, :pid, :queues

def set_defaults
self.offset_value = 0
self.limit_value = ALL_WORKERS_LIMIT
end

def workers
@workers ||= @queue_adapter.fetch_workers(self)
return @workers if @workers

@workers = @queue_adapter.fetch_workers(self)
perform_in_memory_filtering(@workers)
@workers
end

def perform_in_memory_filtering(workers)
workers.then { |workers| filter_by_queues(workers) }
filter_by_queues(workers) if queues.present?
end

def filter_by_queues(workers)
queues.present? workers.filter { |worker| worker.metadata['queues'].match?(queues) }
end

def query_count
Expand Down