Skip to content

Commit

Permalink
Test fix + consolidation of getting directory paths for checkpointing…
Browse files Browse the repository at this point in the history
… and AOF
  • Loading branch information
TalZaccai committed Mar 8, 2025
1 parent cb29622 commit 6e9d4c8
Show file tree
Hide file tree
Showing 7 changed files with 103 additions and 71 deletions.
30 changes: 12 additions & 18 deletions libs/host/GarnetServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -223,8 +223,8 @@ private void InitializeServer()
if (!setMax && !ThreadPool.SetMaxThreads(maxThreads, maxCPThreads))
throw new Exception($"Unable to call ThreadPool.SetMaxThreads with {maxThreads}, {maxCPThreads}");

StoreWrapper.DatabaseCreatorDelegate createDatabaseDelegate = (int dbId, out string storeCheckpointDir, out string aofDir) =>
CreateDatabase(dbId, opts, clusterFactory, customCommandManager, out storeCheckpointDir, out aofDir);
StoreWrapper.DatabaseCreatorDelegate createDatabaseDelegate = (int dbId) =>
CreateDatabase(dbId, opts, clusterFactory, customCommandManager);

if (!opts.DisablePubSub)
subscribeBroker = new SubscribeBroker(null, opts.PubSubPageSizeBytes(), opts.SubscriberRefreshFrequencyMs, true, logger);
Expand Down Expand Up @@ -276,12 +276,11 @@ private void InitializeServer()
}

private GarnetDatabase CreateDatabase(int dbId, GarnetServerOptions serverOptions, ClusterFactory clusterFactory,
CustomCommandManager customCommandManager, out string storeCheckpointDir, out string aofDir)
CustomCommandManager customCommandManager)
{
var store = CreateMainStore(dbId, clusterFactory, out var checkpointDir, out storeCheckpointDir);
var objectStore = CreateObjectStore(dbId, clusterFactory, customCommandManager, checkpointDir,
out var objectStoreSizeTracker);
var (aofDevice, aof) = CreateAOF(dbId, out aofDir);
var store = CreateMainStore(dbId, clusterFactory);
var objectStore = CreateObjectStore(dbId, clusterFactory, customCommandManager, out var objectStoreSizeTracker);
var (aofDevice, aof) = CreateAOF(dbId);
return new GarnetDatabase(dbId, store, objectStore, objectStoreSizeTracker, aofDevice, aof,
serverOptions.AdjustedIndexMaxCacheLines == 0,
serverOptions.AdjustedObjectStoreIndexMaxCacheLines == 0);
Expand Down Expand Up @@ -311,18 +310,15 @@ private void LoadModules(CustomCommandManager customCommandManager)
}
}

private TsavoriteKV<SpanByte, SpanByte, MainStoreFunctions, MainStoreAllocator> CreateMainStore(int dbId, IClusterFactory clusterFactory, out string checkpointDir, out string mainStoreCheckpointDir)
private TsavoriteKV<SpanByte, SpanByte, MainStoreFunctions, MainStoreAllocator> CreateMainStore(int dbId, IClusterFactory clusterFactory)
{
kvSettings = opts.GetSettings(loggerFactory, out logFactory);

checkpointDir = (opts.CheckpointDir ?? opts.LogDir) ?? string.Empty;

// Run checkpoint on its own thread to control p99
kvSettings.ThrottleCheckpointFlushDelayMs = opts.CheckpointThrottleFlushDelayMs;

var baseName = Path.Combine(checkpointDir, "Store", $"checkpoints{(dbId == 0 ? string.Empty : $"_{dbId}")}");
var baseName = opts.GetMainStoreCheckpointDirectory(dbId);
var defaultNamingScheme = new DefaultCheckpointNamingScheme(baseName);
mainStoreCheckpointDir = baseName;

kvSettings.CheckpointManager = opts.EnableCluster ?
clusterFactory.CreateCheckpointManager(opts.DeviceFactoryCreator, defaultNamingScheme, isMainStore: true, logger) :
Expand All @@ -333,7 +329,7 @@ private TsavoriteKV<SpanByte, SpanByte, MainStoreFunctions, MainStoreAllocator>
, (allocatorSettings, storeFunctions) => new(allocatorSettings, storeFunctions));
}

private TsavoriteKV<byte[], IGarnetObject, ObjectStoreFunctions, ObjectStoreAllocator> CreateObjectStore(int dbId, IClusterFactory clusterFactory, CustomCommandManager customCommandManager, string checkpointDir, out CacheSizeTracker objectStoreSizeTracker)
private TsavoriteKV<byte[], IGarnetObject, ObjectStoreFunctions, ObjectStoreAllocator> CreateObjectStore(int dbId, IClusterFactory clusterFactory, CustomCommandManager customCommandManager, out CacheSizeTracker objectStoreSizeTracker)
{
objectStoreSizeTracker = null;
if (opts.DisableObjects)
Expand All @@ -345,7 +341,7 @@ private TsavoriteKV<byte[], IGarnetObject, ObjectStoreFunctions, ObjectStoreAllo
// Run checkpoint on its own thread to control p99
objKvSettings.ThrottleCheckpointFlushDelayMs = opts.CheckpointThrottleFlushDelayMs;

var baseName = Path.Combine(checkpointDir, "ObjectStore", $"checkpoints{(dbId == 0 ? string.Empty : $"_{dbId}")}");
var baseName = opts.GetObjectStoreCheckpointDirectory(dbId);
var defaultNamingScheme = new DefaultCheckpointNamingScheme(baseName);

objKvSettings.CheckpointManager = opts.EnableCluster ?
Expand All @@ -367,16 +363,14 @@ private TsavoriteKV<byte[], IGarnetObject, ObjectStoreFunctions, ObjectStoreAllo

}

private (IDevice, TsavoriteLog) CreateAOF(int dbId, out string aofDir)
private (IDevice, TsavoriteLog) CreateAOF(int dbId)
{
aofDir = null;

if (opts.EnableAOF)
{
if (opts.FastAofTruncate && opts.CommitFrequencyMs != -1)
throw new Exception("Need to set CommitFrequencyMs to -1 (manual commits) with MainMemoryReplication");

opts.GetAofSettings(dbId, out var aofSettings, out aofDir);
opts.GetAofSettings(dbId, out var aofSettings);
var aofDevice = aofSettings.LogDevice;
var appendOnlyFile = new TsavoriteLog(aofSettings, logger: this.loggerFactory?.CreateLogger("TsavoriteLog [aof]"));
if (opts.CommitFrequencyMs < 0 && opts.WaitForCommit)
Expand Down
15 changes: 6 additions & 9 deletions libs/server/Databases/DatabaseManagerFactory.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

using System.IO;
using System.Linq;

namespace Garnet.server
Expand Down Expand Up @@ -35,23 +34,21 @@ private static bool ShouldCreateMultipleDatabaseManager(GarnetServerOptions serv
return false;

// If there are multiple databases to recover, create a multi database manager, otherwise create a single database manager.
using (createDatabaseDelegate(0, out var checkpointDir, out var aofDir))
using (createDatabaseDelegate(0))
{
// Check if there are multiple databases to recover from checkpoint
var checkpointDirInfo = new DirectoryInfo(checkpointDir);
var checkpointDirBaseName = checkpointDirInfo.Name;
var checkpointParentDir = checkpointDirInfo.Parent!.FullName;
var checkpointParentDir = serverOptions.MainStoreCheckpointBaseDirectory;
var checkpointDirBaseName = serverOptions.GetCheckpointDirectoryName(0);

if (MultiDatabaseManager.TryGetSavedDatabaseIds(checkpointParentDir, checkpointDirBaseName,
out var dbIds) && dbIds.Any(id => id != 0))
return true;

// Check if there are multiple databases to recover from AOF
if (aofDir != null)
if (serverOptions.EnableAOF)
{
var aofDirInfo = new DirectoryInfo(aofDir);
var aofDirBaseName = aofDirInfo.Name;
var aofParentDir = aofDirInfo.Parent!.FullName;
var aofParentDir = serverOptions.AppendOnlyFileBaseDirectory;
var aofDirBaseName = serverOptions.GetAppendOnlyFileDirectoryName(0);

if (MultiDatabaseManager.TryGetSavedDatabaseIds(aofParentDir, aofDirBaseName,
out dbIds) && dbIds.Any(id => id != 0))
Expand Down
31 changes: 8 additions & 23 deletions libs/server/Databases/MultiDatabaseManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,16 +46,6 @@ internal class MultiDatabaseManager : DatabaseManagerBase
// Reusable array for storing database IDs for checkpointing
int[] dbIdsToCheckpoint;

// Path of serialization for the DB IDs file used when committing / recovering to / from AOF
readonly string aofParentDir;

readonly string aofDirBaseName;

// Path of serialization for the DB IDs file used when committing / recovering to / from a checkpoint
readonly string checkpointParentDir;

readonly string checkpointDirBaseName;

public MultiDatabaseManager(StoreWrapper.DatabaseCreatorDelegate createDatabaseDelegate,
StoreWrapper storeWrapper, bool createDefaultDatabase = true) : base(createDatabaseDelegate, storeWrapper)
{
Expand All @@ -68,18 +58,7 @@ public MultiDatabaseManager(StoreWrapper.DatabaseCreatorDelegate createDatabaseD
// Create default database of index 0 (unless specified otherwise)
if (createDefaultDatabase)
{
var db = createDatabaseDelegate(0, out var storeCheckpointDir, out var aofDir);

var checkpointDirInfo = new DirectoryInfo(storeCheckpointDir);
checkpointDirBaseName = checkpointDirInfo.Name;
checkpointParentDir = checkpointDirInfo.Parent!.FullName;

if (aofDir != null)
{
var aofDirInfo = new DirectoryInfo(aofDir);
aofDirBaseName = aofDirInfo.Name;
aofParentDir = aofDirInfo.Parent!.FullName;
}
var db = createDatabaseDelegate(0);

// Set new database in map
if (!TryAddDatabase(0, ref db))
Expand All @@ -106,6 +85,9 @@ public override void RecoverCheckpoint(bool replicaRecover = false, bool recover
throw new GarnetException(
$"Unexpected call to {nameof(MultiDatabaseManager)}.{nameof(RecoverCheckpoint)} with {nameof(replicaRecover)} == true.");

var checkpointParentDir = StoreWrapper.serverOptions.MainStoreCheckpointBaseDirectory;
var checkpointDirBaseName = StoreWrapper.serverOptions.GetCheckpointDirectoryName(0);

int[] dbIdsToRecover;
try
{
Expand Down Expand Up @@ -367,6 +349,9 @@ public override async Task WaitForCommitToAofAsync(CancellationToken token = def
/// <inheritdoc/>
public override void RecoverAOF()
{
var aofParentDir = StoreWrapper.serverOptions.AppendOnlyFileBaseDirectory;
var aofDirBaseName = StoreWrapper.serverOptions.GetAppendOnlyFileDirectoryName(0);

int[] dbIdsToRecover;
try
{
Expand Down Expand Up @@ -662,7 +647,7 @@ public override ref GarnetDatabase TryGetOrAddDatabase(int dbId, out bool succes
return ref databasesMapSnapshot[dbId];
}

var db = CreateDatabaseDelegate(dbId, out _, out _);
var db = CreateDatabaseDelegate(dbId);
if (!databases.TrySetValueUnsafe(dbId, ref db, false))
return ref GarnetDatabase.Empty;
}
Expand Down
2 changes: 1 addition & 1 deletion libs/server/Databases/SingleDatabaseManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public SingleDatabaseManager(StoreWrapper.DatabaseCreatorDelegate createDatabase
// Create default database of index 0 (unless specified otherwise)
if (createDefaultDatabase)
{
defaultDatabase = createDatabaseDelegate(0, out _, out _);
defaultDatabase = createDatabaseDelegate(0);
}
}

Expand Down
79 changes: 62 additions & 17 deletions libs/server/Servers/GarnetServerOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,64 @@ public class GarnetServerOptions : ServerOptions
/// </summary>
public bool AllowMultiDb => !EnableCluster && MaxDatabases > 1;

/// <summary>
/// Gets the base directory for storing checkpoints
/// </summary>
public string CheckpointBaseDirectory => (CheckpointDir ?? LogDir) ?? string.Empty;

/// <summary>
/// Gets the base directory for storing main-store checkpoints
/// </summary>
public string MainStoreCheckpointBaseDirectory => Path.Combine(CheckpointBaseDirectory, "Store");

/// <summary>
/// Gets the base directory for storing object-store checkpoints
/// </summary>
public string ObjectStoreCheckpointBaseDirectory => Path.Combine(CheckpointBaseDirectory, "ObjectStore");

/// <summary>
/// Get the directory name for database checkpoints
/// </summary>
/// <param name="dbId">Database Id</param>
/// <returns>Directory name</returns>
public string GetCheckpointDirectoryName(int dbId) => $"checkpoints{(dbId == 0 ? string.Empty : $"_{dbId}")}";

/// <summary>
/// Get the directory for main-store database checkpoints
/// </summary>
/// <param name="dbId">Database Id</param>
/// <returns>Directory</returns>
public string GetMainStoreCheckpointDirectory(int dbId) =>
Path.Combine(MainStoreCheckpointBaseDirectory, GetCheckpointDirectoryName(dbId));

/// <summary>
/// Get the directory for object-store database checkpoints
/// </summary>
/// <param name="dbId">Database Id</param>
/// <returns>Directory</returns>
public string GetObjectStoreCheckpointDirectory(int dbId) =>
Path.Combine(ObjectStoreCheckpointBaseDirectory, GetCheckpointDirectoryName(dbId));

/// <summary>
/// Gets the base directory for storing AOF commits
/// </summary>
public string AppendOnlyFileBaseDirectory => CheckpointDir ?? string.Empty;

/// <summary>
/// Get the directory name for database AOF commits
/// </summary>
/// <param name="dbId">Database Id</param>
/// <returns>Directory name</returns>
public string GetAppendOnlyFileDirectoryName(int dbId) => $"AOF{(dbId == 0 ? string.Empty : $"_{dbId}")}";

/// <summary>
/// Get the directory for database AOF commits
/// </summary>
/// <param name="dbId">Database Id</param>
/// <returns>Directory</returns>
public string GetAppendOnlyFileDirectory(int dbId) =>
Path.Combine(AppendOnlyFileBaseDirectory, GetAppendOnlyFileDirectoryName(dbId));

/// <summary>
/// Constructor
/// </summary>
Expand Down Expand Up @@ -742,8 +800,7 @@ public KVSettings<byte[], IGarnetObject> GetObjectStoreSettings(ILogger logger,
/// </summary>
/// <param name="dbId">DB ID</param>
/// <param name="tsavoriteLogSettings">Tsavorite log settings</param>
/// <param name="aofDir"></param>
public void GetAofSettings(int dbId, out TsavoriteLogSettings tsavoriteLogSettings, out string aofDir)
public void GetAofSettings(int dbId, out TsavoriteLogSettings tsavoriteLogSettings)
{
tsavoriteLogSettings = new TsavoriteLogSettings
{
Expand All @@ -762,7 +819,7 @@ public void GetAofSettings(int dbId, out TsavoriteLogSettings tsavoriteLogSettin
throw new Exception("AOF Page size cannot be more than the AOF memory size.");
}

aofDir = Path.Combine(CheckpointDir ?? string.Empty, $"AOF{(dbId == 0 ? string.Empty : $"_{dbId}")}");
var aofDir = GetAppendOnlyFileDirectory(dbId);
tsavoriteLogSettings.LogCommitManager = new DeviceLogCommitCheckpointManager(
FastAofTruncate ? new NullNamedDeviceFactoryCreator() : DeviceFactoryCreator,
new DefaultCheckpointNamingScheme(aofDir),
Expand Down Expand Up @@ -855,20 +912,8 @@ IDevice GetAofDevice(int dbId)
throw new Exception("Cannot use null device for AOF when cluster is enabled and you are not using main memory replication");
if (UseAofNullDevice) return new NullDevice();

return GetInitializedDeviceFactory(CheckpointDir)
.Get(new FileDescriptor($"AOF{(dbId == 0 ? string.Empty : $"_{dbId}")}", "aof.log"));
}

/// <summary>
/// Get device for logging database IDs
/// </summary>
/// <returns></returns>
public IDevice GetDatabaseIdsDevice()
{
if (MaxDatabases == 1) return new NullDevice();

return GetInitializedDeviceFactory(CheckpointDir)
.Get(new FileDescriptor($"databases", "ids.dat"));
return GetInitializedDeviceFactory(AppendOnlyFileBaseDirectory)
.Get(new FileDescriptor(GetAppendOnlyFileDirectoryName(dbId), "aof.log"));
}
}
}
2 changes: 1 addition & 1 deletion libs/server/StoreWrapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ public sealed class StoreWrapper
/// <summary>
/// Definition for delegate creating a new logical database
/// </summary>
public delegate GarnetDatabase DatabaseCreatorDelegate(int dbId, out string storeCheckpointDir, out string aofDir);
public delegate GarnetDatabase DatabaseCreatorDelegate(int dbId);

/// <summary>
/// Number of active databases
Expand Down
15 changes: 13 additions & 2 deletions test/Garnet.test/Resp/ACL/RespCommandTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7054,8 +7054,19 @@ await CheckCommandsAsync(

static async Task DoSwapDbAsync(GarnetClient client)
{
string val = await client.ExecuteForStringResultAsync("SWAPDB", ["0", "0"]);
ClassicAssert.AreEqual("OK", val);
try
{
// Currently SWAPDB does not support calling the command when multiple clients are connected to the server.
await client.ExecuteForStringResultAsync("SWAPDB", ["0", "1"]);
Assert.Fail("Shouldn't reach here, calling SWAPDB should fail.");
}
catch (Exception ex)
{
if (ex.Message == Encoding.ASCII.GetString(CmdStrings.RESP_ERR_NOAUTH))
throw;

ClassicAssert.AreEqual(Encoding.ASCII.GetString(CmdStrings.RESP_ERR_DBSWAP_UNSUPPORTED), ex.Message);
}
}
}

Expand Down

0 comments on commit 6e9d4c8

Please sign in to comment.