Skip to content

Commit

Permalink
Improve handling of setting the send buffer vs the batched sink buffer
Browse files Browse the repository at this point in the history
Signed-off-by: Pedro Tanaka <[email protected]>
  • Loading branch information
pedro-stanaka committed Dec 19, 2024
1 parent 55e0c2b commit 5ed58a3
Show file tree
Hide file tree
Showing 8 changed files with 114 additions and 37 deletions.
11 changes: 11 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
.PHONY: test lint update
test:
bundle exec rake test

lint:
bundle exec rake lint_fix

update:
bundle update

check: update lint test
1 change: 1 addition & 0 deletions lib/statsd/instrument.rb
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,7 @@ def extended(klass)
require "statsd/instrument/helpers"
require "statsd/instrument/assertions"
require "statsd/instrument/expectation"
require "statsd/instrument/connection_behavior"
require "statsd/instrument/uds_connection"
require "statsd/instrument/udp_connection"
require "statsd/instrument/sink"
Expand Down
22 changes: 18 additions & 4 deletions lib/statsd/instrument/connection_behavior.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,31 @@ def close

def send_buffer_size
if socket
socket.getsockopt(Socket::SOL_SOCKET, Socket::SO_SNDBUF).int
send_buffer_size_from_socket(socket)
else
@max_packet_size
end
end

def type
raise NotImplementedError, "#{self.class} must implement #type"
end

private

def setup_socket(socket)
socket.setsockopt(Socket::SOL_SOCKET, Socket::SO_SNDBUF, @max_packet_size.to_i)
socket
def send_buffer_size_from_socket(original_socket)
original_socket.getsockopt(Socket::SOL_SOCKET, Socket::SO_SNDBUF).int
end

def setup_socket(original_socket)
original_socket.setsockopt(Socket::SOL_SOCKET, Socket::SO_SNDBUF, @max_packet_size.to_i)
if send_buffer_size_from_socket(original_socket) < @max_packet_size
StatsD.logger.warn do
"[#{self.class.name}] Could not set socket send buffer size to #{@max_packet_size} " \
"allowed size by environment/OS is (#{send_buffer_size_from_socket(original_socket)})."
end
end
original_socket
rescue IOError => e
StatsD.logger.debug do
"[#{self.class.name}] Failed to create socket: #{e.class}: #{e.message}"
Expand Down
13 changes: 10 additions & 3 deletions lib/statsd/instrument/environment.rb
Original file line number Diff line number Diff line change
Expand Up @@ -142,23 +142,30 @@ def default_sink_for_environment
connection = if statsd_uds_send?
StatsD::Instrument::UdsConnection.new(
statsd_socket_path,
max_packet_size: statsd_max_packet_size.to_i,
max_packet_size: statsd_max_packet_size,
)
else
host, port = statsd_addr.split(":")
StatsD::Instrument::UdpConnection.new(
host,
port.to_i,
max_packet_size: statsd_max_packet_size.to_i,
max_packet_size: statsd_max_packet_size,
)
end

sink = StatsD::Instrument::Sink.new(connection)
if statsd_batching?
current_send_buffer_size = connection.send_buffer_size
if current_send_buffer_size < statsd_max_packet_size
StatsD.logger.warn do
"[StatsD::Instrument::Environment] Send buffer size #{current_send_buffer_size} differs from " \
"max packet size #{statsd_max_packet_size}. Using send buffer size as max packet size."
end
end
return StatsD::Instrument::BatchedSink.new(
sink,
buffer_capacity: statsd_buffer_capacity,
max_packet_size: statsd_max_packet_size.to_i,
max_packet_size: [current_send_buffer_size, statsd_max_packet_size].min,
statistics_interval: statsd_batch_statistics_interval,
)
end
Expand Down
4 changes: 2 additions & 2 deletions lib/statsd/instrument/udp_connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ def type

def socket
@socket ||= begin
socket = UDPSocket.new
setup_socket(socket)&.tap do |s|
udp_socket = UDPSocket.new
setup_socket(udp_socket)&.tap do |s|
s.connect(@host, @port)
end
end
Expand Down
4 changes: 2 additions & 2 deletions lib/statsd/instrument/uds_connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ def type

def socket
@socket ||= begin
socket = Socket.new(Socket::AF_UNIX, Socket::SOCK_DGRAM)
setup_socket(socket)&.tap do |s|
unix_socket = Socket.new(Socket::AF_UNIX, Socket::SOCK_DGRAM)
setup_socket(unix_socket)&.tap do |s|
s.connect(Socket.pack_sockaddr_un(@socket_path))
end
end
Expand Down
85 changes: 59 additions & 26 deletions test/environment_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -78,55 +78,88 @@ def test_client_from_env_uses_regular_udp_sink_when_buffer_capacity_is_0
end

def test_client_from_env_uses_uds_sink_with_correct_packet_size_in_production
socket_path = "/tmp/statsd-test-#{Process.pid}.sock"

# Create a UDS server socket
server = Socket.new(Socket::AF_UNIX, Socket::SOCK_DGRAM)
server.bind(Socket.pack_sockaddr_un(socket_path))

env = StatsD::Instrument::Environment.new(
"STATSD_ENV" => "production",
"STATSD_SOCKET_PATH" => "/tmp/statsd.sock",
"STATSD_SOCKET_PATH" => socket_path,
"STATSD_MAX_PACKET_SIZE" => "65507",
"STATSD_USE_NEW_CLIENT" => "1",
)

client = env.client
sink = client.sink
connection = sink.connection

assert_kind_of(StatsD::Instrument::UdsConnection, connection)
assert_equal(65507, connection.instance_variable_get(:@max_packet_size))
begin
client = env.client
sink = client.sink
connection = sink.connection

assert_kind_of(StatsD::Instrument::UdsConnection, connection)
assert_equal(65507, connection.instance_variable_get(:@max_packet_size))
ensure
server.close
File.unlink(socket_path) if File.exist?(socket_path)
end
end

def test_client_from_env_uses_default_packet_size_for_uds_when_not_specified
socket_path = "/tmp/statsd-test-#{Process.pid}-default.sock"

# Create a UDS server socket
server = Socket.new(Socket::AF_UNIX, Socket::SOCK_DGRAM)
server.bind(Socket.pack_sockaddr_un(socket_path))

env = StatsD::Instrument::Environment.new(
"STATSD_ENV" => "production",
"STATSD_SOCKET_PATH" => "/tmp/statsd.sock",
"STATSD_SOCKET_PATH" => socket_path,
"STATSD_USE_NEW_CLIENT" => "1",
)

client = env.client
sink = client.sink
connection = sink.connection

assert_kind_of(StatsD::Instrument::UdsConnection, connection)
assert_equal(
StatsD::Instrument::UdsConnection::DEFAULT_MAX_PACKET_SIZE,
connection.instance_variable_get(:@max_packet_size),
)
begin
client = env.client
sink = client.sink
connection = sink.connection

assert_kind_of(StatsD::Instrument::UdsConnection, connection)
assert_equal(
StatsD::Instrument::UdsConnection::DEFAULT_MAX_PACKET_SIZE,
connection.instance_variable_get(:@max_packet_size),
)
ensure
server.close
File.unlink(socket_path) if File.exist?(socket_path)
end
end

def test_client_from_env_uses_batched_uds_sink_with_correct_packet_size
socket_path = "/tmp/statsd-test-#{Process.pid}-batched.sock"

# Create a UDS server socket
server = Socket.new(Socket::AF_UNIX, Socket::SOCK_DGRAM)
server.bind(Socket.pack_sockaddr_un(socket_path))

env = StatsD::Instrument::Environment.new(
"STATSD_ENV" => "production",
"STATSD_SOCKET_PATH" => "/tmp/statsd.sock",
"STATSD_SOCKET_PATH" => socket_path,
"STATSD_MAX_PACKET_SIZE" => "65507",
"STATSD_BUFFER_CAPACITY" => "1000",
"STATSD_USE_NEW_CLIENT" => "1",
)

client = env.client
sink = client.sink
assert_kind_of(StatsD::Instrument::BatchedSink, sink)

underlying_sink = sink.instance_variable_get(:@sink)
connection = underlying_sink.connection
assert_kind_of(StatsD::Instrument::UdsConnection, connection)
assert_equal(65507, connection.instance_variable_get(:@max_packet_size))
begin
client = env.client
sink = client.sink
assert_kind_of(StatsD::Instrument::BatchedSink, sink)

underlying_sink = sink.instance_variable_get(:@sink)
connection = underlying_sink.connection
assert_kind_of(StatsD::Instrument::UdsConnection, connection)
assert_equal(65507, connection.instance_variable_get(:@max_packet_size))
ensure
server.close
File.unlink(socket_path) if File.exist?(socket_path)
end
end
end
11 changes: 11 additions & 0 deletions test/udp_sink_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -149,16 +149,27 @@ def test_socket_error_should_invalidate_socket
UDPSocket.stubs(:new).returns(socket = mock("socket"))

seq = sequence("connect_fail_connect_succeed")

# First attempt
socket.expects(:setsockopt)
.with(Socket::SOL_SOCKET, Socket::SO_SNDBUF, StatsD::Instrument::UdpConnection::DEFAULT_MAX_PACKET_SIZE)
.in_sequence(seq)
socket.expects(:getsockopt)
.with(Socket::SOL_SOCKET, Socket::SO_SNDBUF)
.returns(mock(int: StatsD::Instrument::UdpConnection::DEFAULT_MAX_PACKET_SIZE))
.in_sequence(seq)
socket.expects(:connect).with("localhost", 8125).in_sequence(seq)
socket.expects(:send).raises(Errno::EDESTADDRREQ).in_sequence(seq)
socket.expects(:close).in_sequence(seq)

# Second attempt after error
socket.expects(:setsockopt)
.with(Socket::SOL_SOCKET, Socket::SO_SNDBUF, StatsD::Instrument::UdpConnection::DEFAULT_MAX_PACKET_SIZE)
.in_sequence(seq)
socket.expects(:getsockopt)
.with(Socket::SOL_SOCKET, Socket::SO_SNDBUF)
.returns(mock(int: StatsD::Instrument::UdpConnection::DEFAULT_MAX_PACKET_SIZE))
.in_sequence(seq)
socket.expects(:connect).with("localhost", 8125).in_sequence(seq)
socket.expects(:send).twice.returns(1).in_sequence(seq)
socket.expects(:close).in_sequence(seq)
Expand Down

0 comments on commit 5ed58a3

Please sign in to comment.