Skip to content

Commit

Permalink
perf: lessen wasting memory allocation for Pub/Sub (#234)
Browse files Browse the repository at this point in the history
  • Loading branch information
supercaracal authored Aug 12, 2023
1 parent 1285c1f commit 90f178b
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 12 deletions.
25 changes: 13 additions & 12 deletions lib/redis_client/cluster/pub_sub.rb
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ def subscribe(client, timeout)
def initialize(router, command_builder)
@router = router
@command_builder = command_builder
@states = {}
@state_list = []
@state_dict = {}
end

def call(*args, **kwargs)
Expand All @@ -55,21 +56,21 @@ def call_v(command)
end

def close
@states.each_value(&:close)
@states.clear
@state_list.each(&:close)
@state_list.clear
@state_dict.clear
end

def next_event(timeout = nil)
return if @states.empty?
return if @state_list.empty?

max_duration = calc_max_duration(timeout)
starting = obtain_current_time
clients = @states.values
loop do
break if max_duration > 0 && obtain_current_time - starting > max_duration

clients.shuffle!
clients.each do |pubsub|
@state_list.shuffle!
@state_list.each do |pubsub|
message = pubsub.take_message(timeout)
return message if message
end
Expand All @@ -80,26 +81,26 @@ def next_event(timeout = nil)

def _call(command)
node_key = @router.find_node_key(command)
add_state(node_key)
try_call(node_key, command)
end

def try_call(node_key, command, retry_count: 1)
@states[node_key].call(command)
add_state(node_key).call(command)
rescue ::RedisClient::CommandError => e
raise if !e.message.start_with?('MOVED') || retry_count <= 0

# for sharded pub/sub
node_key = e.message.split[2]
add_state(node_key)
retry_count -= 1
retry
end

def add_state(node_key)
return @states[node_key] if @states.key?(node_key)
return @state_dict[node_key] if @state_dict.key?(node_key)

@states[node_key] = State.new(@router.find_node(node_key).pubsub)
state = State.new(@router.find_node(node_key).pubsub)
@state_list << state
@state_dict[node_key] = state
end

def obtain_current_time
Expand Down
9 changes: 9 additions & 0 deletions test/redis_client/test_cluster.rb
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,15 @@ def test_pubsub_without_subscription
pubsub.close
end

def test_pubsub_with_wrong_command
pubsub = @client.pubsub
assert_nil(pubsub.call('SUBWAY'))
assert_nil(pubsub.call_v(%w[SUBSCRIBE]))
assert_instance_of(::RedisClient::CommandError, pubsub.next_event, 'unknown command')
assert_instance_of(::RedisClient::CommandError, pubsub.next_event, 'wrong number of arguments')
pubsub.close
end

def test_global_pubsub
sub = Fiber.new do |pubsub|
channel = 'my-global-channel'
Expand Down

0 comments on commit 90f178b

Please sign in to comment.