diff --git a/lib/redis_client/cluster.rb b/lib/redis_client/cluster.rb index 55cd483..71a0ac4 100644 --- a/lib/redis_client/cluster.rb +++ b/lib/redis_client/cluster.rb @@ -19,7 +19,6 @@ def initialize(config = nil, pool: nil, concurrency: nil, **kwargs) @config = config.nil? ? ClusterConfig.new(**kwargs) : config @concurrent_worker = ::RedisClient::Cluster::ConcurrentWorker.create(**(concurrency || {})) @command_builder = @config.command_builder - @pool = pool @kwargs = kwargs @router = nil diff --git a/lib/redis_client/cluster/router.rb b/lib/redis_client/cluster/router.rb index f3efe47..e32924c 100644 --- a/lib/redis_client/cluster/router.rb +++ b/lib/redis_client/cluster/router.rb @@ -17,18 +17,57 @@ class Cluster class Router ZERO_CURSOR_FOR_SCAN = '0' TSF = ->(f, x) { f.nil? ? x : f.call(x) }.curry + DEDICATED_ACTIONS = lambda do # rubocop:disable Metrics/BlockLength + action = Struct.new('RedisCommandRoutingAction', :method_name, :reply_transformer, keyword_init: true) + pick_first = ->(reply) { reply.first } # rubocop:disable Style/SymbolProc + multiple_key_action = action.new(method_name: :send_multiple_keys_command) + all_node_first_action = action.new(method_name: :send_command_to_all_nodes, reply_transformer: pick_first) + primary_first_action = action.new(method_name: :send_command_to_primaries, reply_transformer: pick_first) + not_supported_action = action.new(method_name: :fail_not_supported_command) + keyless_action = action.new(method_name: :fail_keyless_command) + { + 'ping' => action.new(method_name: :send_ping_command, reply_transformer: pick_first), + 'wait' => action.new(method_name: :send_wait_command), + 'keys' => action.new(method_name: :send_command_to_replicas, reply_transformer: ->(reply) { reply.flatten.sort_by(&:to_s) }), + 'dbsize' => action.new(method_name: :send_command_to_replicas, reply_transformer: ->(reply) { reply.select { |e| e.is_a?(Integer) }.sum }), + 'scan' => action.new(method_name: :send_scan_command), + 'lastsave' => action.new(method_name: :send_command_to_all_nodes, reply_transformer: ->(reply) { reply.sort_by(&:to_i) }), + 'role' => action.new(method_name: :send_command_to_all_nodes), + 'config' => action.new(method_name: :send_config_command), + 'client' => action.new(method_name: :send_client_command), + 'cluster' => action.new(method_name: :send_cluster_command), + 'memory' => action.new(method_name: :send_memory_command), + 'script' => action.new(method_name: :send_script_command), + 'pubsub' => action.new(method_name: :send_pubsub_command), + 'watch' => action.new(method_name: :send_watch_command), + 'mget' => multiple_key_action, + 'mset' => multiple_key_action, + 'del' => multiple_key_action, + 'acl' => all_node_first_action, + 'auth' => all_node_first_action, + 'bgrewriteaof' => all_node_first_action, + 'bgsave' => all_node_first_action, + 'quit' => all_node_first_action, + 'save' => all_node_first_action, + 'flushall' => primary_first_action, + 'flushdb' => primary_first_action, + 'readonly' => not_supported_action, + 'readwrite' => not_supported_action, + 'shutdown' => not_supported_action, + 'discard' => keyless_action, + 'exec' => keyless_action, + 'multi' => keyless_action, + 'unwatch' => keyless_action + }.each_with_object({}) do |(k, v), acc| + acc[k] = v + acc[k.upcase] = v + end + end.call.freeze - private_constant :ZERO_CURSOR_FOR_SCAN, :TSF + private_constant :ZERO_CURSOR_FOR_SCAN, :TSF, :DEDICATED_ACTIONS attr_reader :config - Action = Struct.new( - 'RedisCommandRoutingAction', - :method_name, - :reply_transformer, - keyword_init: true - ) - def initialize(config, concurrent_worker, pool: nil, **kwargs) @config = config @concurrent_worker = concurrent_worker @@ -38,16 +77,15 @@ def initialize(config, concurrent_worker, pool: nil, **kwargs) @node.reload! @command = ::RedisClient::Cluster::Command.load(@node.replica_clients.shuffle, slow_command_timeout: config.slow_command_timeout) @command_builder = @config.command_builder - @dedicated_actions = build_dedicated_actions rescue ::RedisClient::Cluster::InitialSetupError => e e.with_config(config) raise end def send_command(method, command, *args, &block) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity - return assign_node_and_send_command(method, command, args, &block) unless @dedicated_actions.key?(command.first) + return assign_node_and_send_command(method, command, args, &block) unless DEDICATED_ACTIONS.key?(command.first) - action = @dedicated_actions[command.first] + action = DEDICATED_ACTIONS[command.first] return send(action.method_name, method, command, args, &block) if action.reply_transformer.nil? reply = send(action.method_name, method, command, args) @@ -257,53 +295,6 @@ def close private - def build_dedicated_actions # rubocop:disable Metrics/AbcSize - pick_first = ->(reply) { reply.first } # rubocop:disable Style/SymbolProc - multiple_key_action = Action.new(method_name: :send_multiple_keys_command) - all_node_first_action = Action.new(method_name: :send_command_to_all_nodes, reply_transformer: pick_first) - primary_first_action = Action.new(method_name: :send_command_to_primaries, reply_transformer: pick_first) - not_supported_action = Action.new(method_name: :fail_not_supported_command) - keyless_action = Action.new(method_name: :fail_keyless_command) - actions = { - 'ping' => Action.new(method_name: :send_ping_command, reply_transformer: pick_first), - 'wait' => Action.new(method_name: :send_wait_command), - 'keys' => Action.new(method_name: :send_command_to_replicas, reply_transformer: ->(reply) { reply.flatten.sort_by(&:to_s) }), - 'dbsize' => Action.new(method_name: :send_command_to_replicas, reply_transformer: ->(reply) { reply.select { |e| e.is_a?(Integer) }.sum }), - 'scan' => Action.new(method_name: :send_scan_command), - 'lastsave' => Action.new(method_name: :send_command_to_all_nodes, reply_transformer: ->(reply) { reply.sort_by(&:to_i) }), - 'role' => Action.new(method_name: :send_command_to_all_nodes), - 'config' => Action.new(method_name: :send_config_command), - 'client' => Action.new(method_name: :send_client_command), - 'cluster' => Action.new(method_name: :send_cluster_command), - 'memory' => Action.new(method_name: :send_memory_command), - 'script' => Action.new(method_name: :send_script_command), - 'pubsub' => Action.new(method_name: :send_pubsub_command), - 'watch' => Action.new(method_name: :send_watch_command), - 'mget' => multiple_key_action, - 'mset' => multiple_key_action, - 'del' => multiple_key_action, - 'acl' => all_node_first_action, - 'auth' => all_node_first_action, - 'bgrewriteaof' => all_node_first_action, - 'bgsave' => all_node_first_action, - 'quit' => all_node_first_action, - 'save' => all_node_first_action, - 'flushall' => primary_first_action, - 'flushdb' => primary_first_action, - 'readonly' => not_supported_action, - 'readwrite' => not_supported_action, - 'shutdown' => not_supported_action, - 'discard' => keyless_action, - 'exec' => keyless_action, - 'multi' => keyless_action, - 'unwatch' => keyless_action - }.freeze - actions.each_with_object({}) do |(k, v), acc| - acc[k] = v - acc[k.upcase] = v - end.freeze - end - def send_command_to_all_nodes(method, command, args, &block) @node.call_all(method, command, args, &block) end