Skip to content

Commit

Permalink
move more of the initialization code out of GarnetServer constructor
Browse files Browse the repository at this point in the history
  • Loading branch information
s3w3nofficial committed Jan 22, 2025
1 parent d7c37bf commit 4b55f59
Show file tree
Hide file tree
Showing 7 changed files with 127 additions and 91 deletions.
12 changes: 9 additions & 3 deletions benchmark/BDN.benchmark/Embedded/EmbeddedRespServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using Garnet;
using Garnet.server;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;

namespace Embedded.server
{
Expand All @@ -18,10 +19,15 @@ internal sealed class EmbeddedRespServer : GarnetServer
/// Creates an EmbeddedRespServer instance
/// </summary>
/// <param name="opts">Server options to configure the base GarnetServer instance</param>
/// <param name="loggerFactory">Logger factory to configure the base GarnetServer instance</param>
/// <param name="logger">Logger factory to configure the base GarnetServer instance</param>
/// <param name="server">Server network</param>
public EmbeddedRespServer(GarnetServerOptions opts, ILoggerFactory loggerFactory = null, GarnetServerEmbedded server = null)
: base(opts, loggerFactory, server)
/// <param name="store">Server network</param>
public EmbeddedRespServer(
IOptions<GarnetServerOptions> opts,
ILogger<GarnetServer> logger,
GarnetServerEmbedded server,
StoreWrapper store)
: base(opts, logger, server, store)
{
this.garnetServerEmbedded = server;
}
Expand Down
9 changes: 9 additions & 0 deletions benchmark/BDN.benchmark/Lua/LuaRunnerOperations.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

using BenchmarkDotNet.Attributes;
using Embedded.server;
using Garnet.host;
using Garnet.server;

namespace BDN.benchmark.Lua
Expand Down Expand Up @@ -154,6 +155,14 @@ public IEnumerable<LuaParams> LuaParamsProvider()
[GlobalSetup]
public void GlobalSetup()
{
var builder = GarnetApplication.CreateHostBuilder([], new GarnetServerOptions()
{
EnableLua = true,
QuietMode = true
});

var app = builder.Build();

server = new EmbeddedRespServer(new GarnetServerOptions() { EnableLua = true, QuietMode = true });

session = server.GetRespSession();
Expand Down
22 changes: 8 additions & 14 deletions hosting/Windows/Garnet.worker/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,13 @@
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;

class Program
{
static void Main(string[] args)
{
var builder = Host.CreateApplicationBuilder(args);
builder.Services.AddHostedService(_ => new Worker(args));
var builder = Host.CreateApplicationBuilder(args);
builder.Services.AddHostedService(_ => new Worker(args));

builder.Services.AddWindowsService(options =>
{
options.ServiceName = "Microsoft Garnet Server";
});
builder.Services.AddWindowsService(options =>
{
options.ServiceName = "Microsoft Garnet Server";
});

var host = builder.Build();
host.Run();
}
}
var host = builder.Build();
host.Run();
61 changes: 59 additions & 2 deletions libs/host/GarnetApplication.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,16 @@
// Licensed under the MIT license.

using System;
using System.Diagnostics;
using System.Reflection;
using System.Threading;
using System.Threading.Tasks;
using Garnet.common;
using Garnet.server;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;

namespace Garnet.host;

Expand All @@ -25,7 +30,53 @@ public GarnetApplication(IHost host)
public IServiceProvider Services => host.Services;

public Task StartAsync(CancellationToken cancellationToken = default)
=> host.StartAsync(cancellationToken);
{
var opts = host.Services.GetRequiredService<IOptions<GarnetServerOptions>>();
var logger = host.Services.GetRequiredService<ILogger<GarnetServer>>();

Debug.Assert(opts != null);

var version = GetVersion();

if (!opts.Value.QuietMode)
{
var red = "\u001b[31m";
var magenta = "\u001b[35m";
var normal = "\u001b[0m";

Console.WriteLine($@"{red} _________
/_||___||_\ {normal}Garnet {version} {(IntPtr.Size == 8 ? "64" : "32")} bit; {(opts.Value.EnableCluster ? "cluster" : "standalone")} mode{red}
'. \ / .' {normal}Port: {opts.Value.Port}{red}
'.\ /.' {magenta}https://aka.ms/GetGarnet{red}
'.'
{normal}");
}

Trace.Listeners.Add(new ConsoleTraceListener());

// Set up an initial memory logger to log messages from configuration parser into memory.
using var memLogProvider = new MemoryLoggerProvider();

var initLogger = (MemoryLogger)memLogProvider.CreateLogger("ArgParser");

logger?.LogInformation("Garnet {version} {bits} bit; {clusterMode} mode; Port: {port}", GetVersion(), IntPtr.Size == 8 ? "64" : "32", opts.Value.EnableCluster ? "cluster" : "standalone", opts.Value.Port);

// Flush initialization logs from memory logger
initLogger.FlushLogger(logger);

var setMax = opts.Value.ThreadPoolMaxThreads <= 0 || ThreadPool.SetMaxThreads(opts.Value.ThreadPoolMaxThreads, opts.Value.ThreadPoolMaxThreads);

if (opts.Value.ThreadPoolMinThreads > 0 && !ThreadPool.SetMinThreads(opts.Value.ThreadPoolMinThreads, opts.Value.ThreadPoolMinThreads))
throw new Exception($"Unable to call ThreadPool.SetMinThreads with {opts.Value.ThreadPoolMinThreads}");

// Retry to set max threads if it wasn't set in the previous step
if (!setMax && !ThreadPool.SetMaxThreads(opts.Value.ThreadPoolMaxThreads, opts.Value.ThreadPoolMaxThreads))
throw new Exception($"Unable to call ThreadPool.SetMaxThreads with {opts.Value.ThreadPoolMaxThreads}");

logger?.LogTrace("TLS is {tlsEnabled}", opts.Value.TlsOptions == null ? "disabled" : "enabled");

return host.StartAsync(cancellationToken);
}

public Task StopAsync(CancellationToken cancellationToken = default)
=> host.StopAsync(cancellationToken);
Expand Down Expand Up @@ -59,7 +110,7 @@ public static GarnetApplicationBuilder CreateHostBuilder(string[] args)
System.Environment.Exit(0);

// Flush logs from memory logger
//FlushMemoryLogger(this.initLogger, "ArgParser");
//initLogger.FlushLogger(logger);

throw new GarnetException(
"Encountered an error when initializing Garnet server. Please see log messages above for more details.");
Expand All @@ -74,4 +125,10 @@ public static GarnetApplicationBuilder CreateHostBuilder(string[] args, GarnetSe
{
return new (new GarnetApplicationOptions {Args = args}, options);
}

private static string GetVersion()
{
var Version = Assembly.GetExecutingAssembly().GetName().Version;
return $"{Version.Major}.{Version.Minor}.{Version.Build}";
}
}
42 changes: 24 additions & 18 deletions libs/host/GarnetApplicationBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,22 +30,6 @@ public class GarnetApplicationBuilder : IHostApplicationBuilder

internal GarnetApplicationBuilder(GarnetApplicationOptions options, GarnetServerOptions garnetServerOptions)
{
var version = GetVersion();

if (!garnetServerOptions.QuietMode)
{
var red = "\u001b[31m";
var magenta = "\u001b[35m";
var normal = "\u001b[0m";

Console.WriteLine($@"{red} _________
/_||___||_\ {normal}Garnet {version} {(IntPtr.Size == 8 ? "64" : "32")} bit; {(garnetServerOptions.EnableCluster ? "cluster" : "standalone")} mode{red}
'. \ / .' {normal}Port: {garnetServerOptions.Port}{red}
'.\ /.' {magenta}https://aka.ms/GetGarnet{red}
'.'
{normal}");
}

var configuration = new ConfigurationManager();

configuration.AddEnvironmentVariables(prefix: "GARNET_");
Expand All @@ -72,13 +56,13 @@ internal GarnetApplicationBuilder(GarnetApplicationOptions options, GarnetServer
var garnetServerOptionsWrapped = Microsoft.Extensions.Options.Options.Create(garnetServerOptions);
hostApplicationBuilder.Services.AddSingleton(garnetServerOptionsWrapped);

hostApplicationBuilder.Services.AddSingleton<GarnetServerTcp>();
hostApplicationBuilder.Services.AddSingleton<IGarnetServer, GarnetServerTcp>();
hostApplicationBuilder.Services.AddSingleton<StoreFactory>();
hostApplicationBuilder.Services.AddSingleton<CustomCommandManager>();

hostApplicationBuilder.Services.AddSingleton(sp =>
{
var server = sp.GetRequiredService<GarnetServerTcp>();
var server = sp.GetRequiredService<IGarnetServer>();
var opts = sp.GetRequiredService<IOptions<GarnetServerOptions>>();
var customCommandManager = sp.GetRequiredService<CustomCommandManager>();
var logger = sp.GetRequiredService<ILogger<StoreWrapper>>();
Expand Down Expand Up @@ -129,6 +113,28 @@ internal GarnetApplicationBuilder(GarnetApplicationOptions options, GarnetServer
customCommandManager, appendOnlyFile, opts.Value, logger, clusterFactory: clusterFactory);
});

if (!garnetServerOptions.DisablePubSub)
{
hostApplicationBuilder.Services
.AddSingleton(sp =>
{
var opts = sp.GetRequiredService<IOptions<GarnetServerOptions>>();
return SubscribeBroker<SpanByte, SpanByte, IKeySerializer<SpanByte>>.Create(opts);
});
}

hostApplicationBuilder.Services.AddSingleton(sp =>
{
var storeWrapper = sp.GetRequiredService<StoreWrapper>();
var subscriberBroker = sp.GetService<SubscribeBroker<SpanByte, SpanByte, IKeySerializer<SpanByte>>>();

return new GarnetProvider(storeWrapper, subscriberBroker);
});

hostApplicationBuilder.Services.AddSingleton<MetricsApi>();
hostApplicationBuilder.Services.AddSingleton<RegisterApi>();
hostApplicationBuilder.Services.AddSingleton<StoreApi>();

hostApplicationBuilder.Services.AddHostedService<GarnetServer>();
}

Expand Down
64 changes: 10 additions & 54 deletions libs/host/GarnetServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ public class GarnetServer : IHostedService, IDisposable
private readonly GarnetServerOptions opts;
private IGarnetServer server;
private SubscribeBroker<SpanByte, SpanByte, IKeySerializer<SpanByte>> subscribeBroker;
private readonly MemoryLogger initLogger;
private readonly ILogger<GarnetServer> logger;

/// <summary>
Expand All @@ -52,7 +51,7 @@ public class GarnetServer : IHostedService, IDisposable
public GarnetServer(
IOptions<GarnetServerOptions> opts,
ILogger<GarnetServer> logger,
GarnetServerTcp garnetServerTcp,
IGarnetServer garnetServerTcp,
StoreWrapper storeWrapper)
{
this.logger = logger;
Expand All @@ -61,53 +60,24 @@ public GarnetServer(

Trace.Listeners.Add(new ConsoleTraceListener());

// Set up an initial memory logger to log messages from configuration parser into memory.
using (var memLogProvider = new MemoryLoggerProvider())
{
this.initLogger = (MemoryLogger)memLogProvider.CreateLogger("ArgParser");
}

// Assign values to GarnetServerOptions
this.opts = opts.Value;
this.opts.AuthSettings = this.opts.AuthSettings;

this.InitializeServer();
}

private void InitializeServer()
{
Debug.Assert(opts != null);

using (logger.BeginScope("GarnetServer"))
{
logger?.LogInformation("Garnet {version} {bits} bit; {clusterMode} mode; Port: {port}", GetVersion(), IntPtr.Size == 8 ? "64" : "32", opts.EnableCluster ? "cluster" : "standalone", opts.Port);

// Flush initialization logs from memory logger
FlushMemoryLogger(this.initLogger, "ArgParser");

var setMax = opts.ThreadPoolMaxThreads <= 0 || ThreadPool.SetMaxThreads(opts.ThreadPoolMaxThreads, opts.ThreadPoolMaxThreads);

if (opts.ThreadPoolMinThreads > 0 && !ThreadPool.SetMinThreads(opts.ThreadPoolMinThreads, opts.ThreadPoolMinThreads))
throw new Exception($"Unable to call ThreadPool.SetMinThreads with {opts.ThreadPoolMinThreads}");

// Retry to set max threads if it wasn't set in the previous step
if (!setMax && !ThreadPool.SetMaxThreads(opts.ThreadPoolMaxThreads, opts.ThreadPoolMaxThreads))
throw new Exception($"Unable to call ThreadPool.SetMaxThreads with {opts.ThreadPoolMaxThreads}");

if (!opts.DisablePubSub)
subscribeBroker = new SubscribeBroker<SpanByte, SpanByte, IKeySerializer<SpanByte>>(new SpanByteKeySerializer(), null, opts.PubSubPageSizeBytes(), opts.SubscriberRefreshFrequencyMs, true);
// Create session provider for Garnet
Provider = new GarnetProvider(storeWrapper, subscribeBroker);

logger?.LogTrace("TLS is {tlsEnabled}", opts.TlsOptions == null ? "disabled" : "enabled");
// Create user facing API endpoints
Metrics = new MetricsApi(Provider);
Register = new RegisterApi(Provider);
Store = new StoreApi(storeWrapper);

// Create session provider for Garnet
Provider = new GarnetProvider(storeWrapper, subscribeBroker);

// Create user facing API endpoints
Metrics = new MetricsApi(Provider);
Register = new RegisterApi(Provider);
Store = new StoreApi(storeWrapper);

server.Register(WireFormat.ASCII, Provider);
}
server.Register(WireFormat.ASCII, Provider);
}

/// <summary>
Expand All @@ -119,7 +89,7 @@ public void Start()
server.Start();
Provider.Start();
if (!opts.QuietMode)
Console.WriteLine("* Ready to accept connections");
this.logger.LogInformation("* Ready to accept connections");
}

/// <summary>
Expand Down Expand Up @@ -158,20 +128,6 @@ private void InternalDispose()
opts.AuthSettings?.Dispose();
}

/// <summary>
/// Flushes MemoryLogger entries into a destination logger.
/// Destination logger is either created from ILoggerFactory parameter or from a default console logger.
/// </summary>
/// <param name="memoryLogger">The memory logger</param>
/// <param name="categoryName">The category name of the destination logger</param>
private void FlushMemoryLogger(MemoryLogger memoryLogger, string categoryName)
{
using (this.logger.BeginScope(categoryName))
{
memoryLogger.FlushLogger(this.logger);
}
}

public Task StartAsync(CancellationToken cancellationToken)
{
Start();
Expand Down
8 changes: 8 additions & 0 deletions libs/server/PubSub/SubscribeBroker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
using System.Threading.Tasks;
using Garnet.common;
using Garnet.networking;
using Microsoft.Extensions.Options;
using Tsavorite.core;

namespace Garnet.server
Expand All @@ -35,6 +36,13 @@ public sealed class SubscribeBroker<TKey, TValue, TKeyValueSerializer> : IDispos
readonly ManualResetEvent done = new(true);
bool disposed = false;

public static SubscribeBroker<SpanByte, SpanByte, IKeySerializer<SpanByte>> Create(
IOptions<GarnetServerOptions> options)
{
return new SubscribeBroker<SpanByte, SpanByte, IKeySerializer<SpanByte>>(new SpanByteKeySerializer(), null,
options.Value.PubSubPageSizeBytes(), options.Value.SubscriberRefreshFrequencyMs, true);
}

/// <summary>
/// Constructor
/// </summary>
Expand Down

0 comments on commit 4b55f59

Please sign in to comment.