From dc17d304ce280f99a732270458c70d1918598138 Mon Sep 17 00:00:00 2001 From: Jean Boussier Date: Thu, 27 Apr 2023 12:01:00 +0200 Subject: [PATCH] Implement RedisClient#measure_round_trip_delay Latency is returned as a Float in milliseconds. For the Ruby driver, it's equivalent to measuring how long `call("PING")` takes, however with the Hiredis driver, the latency is measured without ever holding the GVL which allows to give a much more accurate measure that isn't impacted by GVL contention. Ref: https://github.com/redis/hiredis-rb/issues/74 Test script: ```ruby def fibonacci( n ) return n if ( 0..1 ).include? n ( fibonacci( n - 1 ) + fibonacci( n - 2 ) ) end require "redis-client" if ENV["DRIVER"] == "hiredis" require "hiredis-client" end client = RedisClient.new threads = 10.times.map do Thread.new do loop do fibonacci(30) end end end 5.times do puts "latency: #{client.measure_round_trip_delay}ms" sleep 1 end ``` ``` $ DRIVER=ruby bundle exec ruby -I hiredis-client/lib/ /tmp/measure-latency.rb latency: 1033.9850000143051ms latency: 1039.7799999713898ms latency: 1040.0930000543594ms latency: 1050.2749999761581ms latency: 1044.6280000209808ms ``` ``` $ DRIVER=hiredis bundle exec ruby -I hiredis-client/lib/ /tmp/measure-latency.rb latency: 0.307ms latency: 0.351ms latency: 0.236ms latency: 0.221ms latency: 0.321ms ``` --- .../redis_client/hiredis/hiredis_connection.c | 89 +++++++++++++++++++ lib/redis_client.rb | 8 ++ lib/redis_client/ruby_connection.rb | 6 ++ test/redis_client_test.rb | 8 ++ 4 files changed, 111 insertions(+) diff --git a/hiredis-client/ext/redis_client/hiredis/hiredis_connection.c b/hiredis-client/ext/redis_client/hiredis/hiredis_connection.c index 44795ce..84bb281 100644 --- a/hiredis-client/ext/redis_client/hiredis/hiredis_connection.c +++ b/hiredis-client/ext/redis_client/hiredis/hiredis_connection.c @@ -39,6 +39,7 @@ #include "hiredis.h" #include "net.h" #include "hiredis_ssl.h" +#include #if !defined(RUBY_ASSERT) # define RUBY_ASSERT(condition) ((void)0) @@ -797,6 +798,93 @@ static VALUE hiredis_close(VALUE self) { return Qnil; } +static inline double diff_timespec_ms(const struct timespec *time1, const struct timespec *time0) { + return ((time1->tv_sec - time0->tv_sec) * 1000.0) + + (time1->tv_nsec - time0->tv_nsec) / 1000000.0; +} + +static inline int timeval_to_msec(struct timeval duration) { + return duration.tv_sec * 1000 + duration.tv_usec / 1000; +} + +typedef struct { + hiredis_connection_t *connection; + struct timespec start; + struct timespec end; + int return_value; +} hiredis_measure_round_trip_delay_args_t; + +static const size_t pong_length = 7; + +static void *hiredis_measure_round_trip_delay_safe(void *_args) { + hiredis_measure_round_trip_delay_args_t *args = _args; + hiredis_connection_t *connection = args->connection; + redisReader *reader = connection->context->reader; + + if (reader->len - reader->pos != 0) { + args->return_value = REDIS_ERR; + return NULL; + } + + redisAppendFormattedCommand(connection->context, "PING\r\n", 6); + + clock_gettime(CLOCK_MONOTONIC, &args->start); + + int wdone = 0; + do { + if (redisBufferWrite(connection->context, &wdone) == REDIS_ERR) { + args->return_value = REDIS_ERR; + return NULL; + } + } while (!wdone); + + struct pollfd wfd[1]; + wfd[0].fd = connection->context->fd; + wfd[0].events = POLLIN; + int retval = poll(wfd, 1, timeval_to_msec(connection->read_timeout)); + if (retval == -1) { + args->return_value = REDIS_ERR_IO; + return NULL; + } else if (!retval) { + args->return_value = REDIS_ERR_IO; + return NULL; + } + + redisBufferRead(connection->context); + + if (reader->len - reader->pos != pong_length) { + args->return_value = REDIS_ERR; + return NULL; + } + + if (strncmp(reader->buf + reader->pos, "+PONG\r\n", pong_length) != 0) { + args->return_value = REDIS_ERR; + return NULL; + } + reader->pos += pong_length; + + clock_gettime(CLOCK_MONOTONIC, &args->end); + args->return_value = REDIS_OK; + return NULL; +} + +static VALUE hiredis_measure_round_trip_delay(VALUE self) { + CONNECTION(self, connection); + ENSURE_CONNECTED(connection); + + hiredis_measure_round_trip_delay_args_t args = { + .connection = connection, + }; + rb_thread_call_without_gvl(hiredis_measure_round_trip_delay_safe, &args, RUBY_UBF_IO, 0); + + if (args.return_value != REDIS_OK) { + hiredis_raise_error_and_disconnect(connection, rb_eRedisClientReadTimeoutError); + return Qnil; // unreachable; + } + + return DBL2NUM(diff_timespec_ms(&args.end, &args.start)); +} + RUBY_FUNC_EXPORTED void Init_hiredis_connection(void) { // Qfalse == NULL, so we can't return Qfalse in `reply_create_bool()` RUBY_ASSERT((void *)Qfalse == NULL); @@ -843,6 +931,7 @@ RUBY_FUNC_EXPORTED void Init_hiredis_connection(void) { rb_define_private_method(rb_cHiredisConnection, "_read", hiredis_read, 0); rb_define_private_method(rb_cHiredisConnection, "flush", hiredis_flush, 0); rb_define_private_method(rb_cHiredisConnection, "_close", hiredis_close, 0); + rb_define_method(rb_cHiredisConnection, "measure_round_trip_delay", hiredis_measure_round_trip_delay, 0); VALUE rb_cHiredisSSLContext = rb_define_class_under(rb_cHiredisConnection, "SSLContext", rb_cObject); rb_define_alloc_func(rb_cHiredisSSLContext, hiredis_ssl_context_alloc); diff --git a/lib/redis_client.rb b/lib/redis_client.rb index c81e55e..f8b13c0 100644 --- a/lib/redis_client.rb +++ b/lib/redis_client.rb @@ -204,6 +204,14 @@ def pubsub sub end + def measure_round_trip_delay + ensure_connected do |connection| + @middlewares.call(["PING"], config) do + connection.measure_round_trip_delay + end + end + end + def call(*command, **kwargs) command = @command_builder.generate(command, kwargs) result = ensure_connected do |connection| diff --git a/lib/redis_client/ruby_connection.rb b/lib/redis_client/ruby_connection.rb index b8714e4..8b360b2 100644 --- a/lib/redis_client/ruby_connection.rb +++ b/lib/redis_client/ruby_connection.rb @@ -101,6 +101,12 @@ def read(timeout = nil) raise ConnectionError, error.message end + def measure_round_trip_delay + start = Process.clock_gettime(Process::CLOCK_MONOTONIC, :float_millisecond) + call(["PING"], @read_timeout) + Process.clock_gettime(Process::CLOCK_MONOTONIC, :float_millisecond) - start + end + private def connect diff --git a/test/redis_client_test.rb b/test/redis_client_test.rb index 499047e..297d247 100644 --- a/test/redis_client_test.rb +++ b/test/redis_client_test.rb @@ -113,4 +113,12 @@ def test_handle_async_thread_kill assert_equal i.to_s, @redis.call("GET", "key#{i}") end end + + def test_measure_round_trip_delay + assert_equal "OK", @redis.call("SET", "foo", "bar") + assert_instance_of Float, @redis.measure_round_trip_delay + assert_equal "OK", @redis.call("SET", "foo", "bar") + @redis.close + assert_instance_of Float, @redis.measure_round_trip_delay + end end