Skip to content

Commit

Permalink
Merge pull request #113 from redis-rb/client-latency
Browse files Browse the repository at this point in the history
Implement RedisClient#measure_round_trip_delay
  • Loading branch information
casperisfine authored May 2, 2023
2 parents 54f8247 + dc17d30 commit ff42581
Show file tree
Hide file tree
Showing 4 changed files with 111 additions and 0 deletions.
89 changes: 89 additions & 0 deletions hiredis-client/ext/redis_client/hiredis/hiredis_connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
#include "hiredis.h"
#include "net.h"
#include "hiredis_ssl.h"
#include <poll.h>

#if !defined(RUBY_ASSERT)
# define RUBY_ASSERT(condition) ((void)0)
Expand Down Expand Up @@ -797,6 +798,93 @@ static VALUE hiredis_close(VALUE self) {
return Qnil;
}

static inline double diff_timespec_ms(const struct timespec *time1, const struct timespec *time0) {
return ((time1->tv_sec - time0->tv_sec) * 1000.0)
+ (time1->tv_nsec - time0->tv_nsec) / 1000000.0;
}

static inline int timeval_to_msec(struct timeval duration) {
return duration.tv_sec * 1000 + duration.tv_usec / 1000;
}

typedef struct {
hiredis_connection_t *connection;
struct timespec start;
struct timespec end;
int return_value;
} hiredis_measure_round_trip_delay_args_t;

static const size_t pong_length = 7;

static void *hiredis_measure_round_trip_delay_safe(void *_args) {
hiredis_measure_round_trip_delay_args_t *args = _args;
hiredis_connection_t *connection = args->connection;
redisReader *reader = connection->context->reader;

if (reader->len - reader->pos != 0) {
args->return_value = REDIS_ERR;
return NULL;
}

redisAppendFormattedCommand(connection->context, "PING\r\n", 6);

clock_gettime(CLOCK_MONOTONIC, &args->start);

int wdone = 0;
do {
if (redisBufferWrite(connection->context, &wdone) == REDIS_ERR) {
args->return_value = REDIS_ERR;
return NULL;
}
} while (!wdone);

struct pollfd wfd[1];
wfd[0].fd = connection->context->fd;
wfd[0].events = POLLIN;
int retval = poll(wfd, 1, timeval_to_msec(connection->read_timeout));
if (retval == -1) {
args->return_value = REDIS_ERR_IO;
return NULL;
} else if (!retval) {
args->return_value = REDIS_ERR_IO;
return NULL;
}

redisBufferRead(connection->context);

if (reader->len - reader->pos != pong_length) {
args->return_value = REDIS_ERR;
return NULL;
}

if (strncmp(reader->buf + reader->pos, "+PONG\r\n", pong_length) != 0) {
args->return_value = REDIS_ERR;
return NULL;
}
reader->pos += pong_length;

clock_gettime(CLOCK_MONOTONIC, &args->end);
args->return_value = REDIS_OK;
return NULL;
}

static VALUE hiredis_measure_round_trip_delay(VALUE self) {
CONNECTION(self, connection);
ENSURE_CONNECTED(connection);

hiredis_measure_round_trip_delay_args_t args = {
.connection = connection,
};
rb_thread_call_without_gvl(hiredis_measure_round_trip_delay_safe, &args, RUBY_UBF_IO, 0);

if (args.return_value != REDIS_OK) {
hiredis_raise_error_and_disconnect(connection, rb_eRedisClientReadTimeoutError);
return Qnil; // unreachable;
}

return DBL2NUM(diff_timespec_ms(&args.end, &args.start));
}

RUBY_FUNC_EXPORTED void Init_hiredis_connection(void) {
// Qfalse == NULL, so we can't return Qfalse in `reply_create_bool()`
RUBY_ASSERT((void *)Qfalse == NULL);
Expand Down Expand Up @@ -843,6 +931,7 @@ RUBY_FUNC_EXPORTED void Init_hiredis_connection(void) {
rb_define_private_method(rb_cHiredisConnection, "_read", hiredis_read, 0);
rb_define_private_method(rb_cHiredisConnection, "flush", hiredis_flush, 0);
rb_define_private_method(rb_cHiredisConnection, "_close", hiredis_close, 0);
rb_define_method(rb_cHiredisConnection, "measure_round_trip_delay", hiredis_measure_round_trip_delay, 0);

VALUE rb_cHiredisSSLContext = rb_define_class_under(rb_cHiredisConnection, "SSLContext", rb_cObject);
rb_define_alloc_func(rb_cHiredisSSLContext, hiredis_ssl_context_alloc);
Expand Down
8 changes: 8 additions & 0 deletions lib/redis_client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,14 @@ def pubsub
sub
end

def measure_round_trip_delay
ensure_connected do |connection|
@middlewares.call(["PING"], config) do
connection.measure_round_trip_delay
end
end
end

def call(*command, **kwargs)
command = @command_builder.generate(command, kwargs)
result = ensure_connected do |connection|
Expand Down
6 changes: 6 additions & 0 deletions lib/redis_client/ruby_connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,12 @@ def read(timeout = nil)
raise ConnectionError, error.message
end

def measure_round_trip_delay
start = Process.clock_gettime(Process::CLOCK_MONOTONIC, :float_millisecond)
call(["PING"], @read_timeout)
Process.clock_gettime(Process::CLOCK_MONOTONIC, :float_millisecond) - start
end

private

def connect
Expand Down
8 changes: 8 additions & 0 deletions test/redis_client_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -113,4 +113,12 @@ def test_handle_async_thread_kill
assert_equal i.to_s, @redis.call("GET", "key#{i}")
end
end

def test_measure_round_trip_delay
assert_equal "OK", @redis.call("SET", "foo", "bar")
assert_instance_of Float, @redis.measure_round_trip_delay
assert_equal "OK", @redis.call("SET", "foo", "bar")
@redis.close
assert_instance_of Float, @redis.measure_round_trip_delay
end
end

0 comments on commit ff42581

Please sign in to comment.