Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multi-Database Support #1005

Draft
wants to merge 98 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
98 commits
Select commit Hold shift + click to select a range
e472d91
Added MaxDatabases option
TalZaccai Jan 28, 2025
ba2f091
wip
TalZaccai Jan 29, 2025
b1f44aa
wip
TalZaccai Jan 29, 2025
d2a259f
wip
TalZaccai Jan 29, 2025
1825fbb
wip
TalZaccai Jan 30, 2025
c4d98b4
wip
TalZaccai Jan 30, 2025
41bdbd7
wip
TalZaccai Jan 31, 2025
6bca54c
wip
TalZaccai Jan 31, 2025
21aa35a
wip
TalZaccai Feb 1, 2025
e8f465d
aof
TalZaccai Feb 5, 2025
cabd3a6
wip
TalZaccai Feb 5, 2025
cf85d17
wip
TalZaccai Feb 5, 2025
547dd94
wip
TalZaccai Feb 5, 2025
d1fae46
wip
TalZaccai Feb 6, 2025
71efd73
wip
TalZaccai Feb 6, 2025
c31b7bf
bugfixes
TalZaccai Feb 6, 2025
a4a7850
wip
TalZaccai Feb 6, 2025
eeefc70
tests
TalZaccai Feb 7, 2025
8ae5ad2
merging from latest main
TalZaccai Feb 7, 2025
1887d0a
format
TalZaccai Feb 7, 2025
2701635
small fix
TalZaccai Feb 7, 2025
84a30bc
wip
TalZaccai Feb 8, 2025
b6ede42
Merge branch 'main' into talzacc/multi_db
TalZaccai Feb 8, 2025
ee68ec5
test
TalZaccai Feb 8, 2025
4c720ea
Merge branch 'talzacc/multi_db' of https://github.com/microsoft/garne…
TalZaccai Feb 8, 2025
535203e
simplify dispose logic for gcs at AofSyncTask
vazois Feb 10, 2025
b2a6347
merge with latest main + ignore multithreaded tests
TalZaccai Feb 10, 2025
e216a1f
Merge branch 'talzacc/multi_db' of https://github.com/microsoft/garne…
TalZaccai Feb 10, 2025
6e5645e
fix
TalZaccai Feb 10, 2025
aeddd71
format
TalZaccai Feb 10, 2025
78251c6
fix
TalZaccai Feb 11, 2025
2c82c2d
wip
TalZaccai Feb 11, 2025
989e74e
test
TalZaccai Feb 11, 2025
98ee966
Merge branch 'main' into talzacc/multi_db
TalZaccai Feb 11, 2025
3fb53be
readding multithreading tests
TalZaccai Feb 11, 2025
a9d8f61
test
TalZaccai Feb 11, 2025
44de386
test
TalZaccai Feb 11, 2025
0964240
wip
TalZaccai Feb 11, 2025
cc6dfa8
test
TalZaccai Feb 11, 2025
2a40dfa
format
TalZaccai Feb 11, 2025
de48e83
wip
TalZaccai Feb 12, 2025
d2ff6e3
Added FLUSHALL + tests
TalZaccai Feb 12, 2025
64e610a
Ignore LC MT test
TalZaccai Feb 12, 2025
6782af6
format
TalZaccai Feb 12, 2025
d0bf7f6
Added DB ID to client info
TalZaccai Feb 12, 2025
0532ede
SAVE, BGSAVE, LASTSAVE with ID - not tested yet
TalZaccai Feb 12, 2025
ca47b00
ensure waitForSync before dispose
vazois Feb 12, 2025
e881dbc
wip
TalZaccai Feb 12, 2025
0cc431a
wip
TalZaccai Feb 14, 2025
9624d31
wip
TalZaccai Feb 14, 2025
ae6026c
Adding HCOLLECT info & docs to GarnetCommandDocs/Info & updating Resp…
TalZaccai Feb 14, 2025
68955c0
Merge branch 'talzacc/cmd_info_docs_update' into talzacc/multi_db
TalZaccai Feb 14, 2025
b8e75e1
Updated command info & docs for SAVE, BGSAVE, LASTSAVE & COMMITAOF
TalZaccai Feb 14, 2025
94778d4
format
TalZaccai Feb 14, 2025
8cafce3
bugfixes
TalZaccai Feb 14, 2025
ab138ed
Adding SPUBLISH, SSUBSCRIBE TO GarnetCommandsInfo.json
TalZaccai Feb 14, 2025
697365f
Merge branch 'talzacc/cmd_info_docs_update' into talzacc/multi_db
TalZaccai Feb 14, 2025
923bafe
wip
TalZaccai Feb 19, 2025
8ab2775
wip
TalZaccai Feb 19, 2025
90824ce
wip
TalZaccai Feb 19, 2025
2ddec8a
IDatabaseManager refactor - wip (broken)
TalZaccai Feb 21, 2025
3869062
wip (broken)
TalZaccai Feb 21, 2025
0a20f87
wip
TalZaccai Feb 24, 2025
d7f5dd7
wip
TalZaccai Feb 26, 2025
b4d1e80
quit code change
TalZaccai Feb 27, 2025
987bdb9
wip
TalZaccai Feb 28, 2025
2acfcc4
wip
TalZaccai Feb 28, 2025
32565ce
Fixing SWAPDB
TalZaccai Mar 3, 2025
1ab57d4
wip
TalZaccai Mar 3, 2025
102d65a
wip
TalZaccai Mar 3, 2025
c27ab01
add explicit fail on fail to dispose
vazois Mar 4, 2025
56a8625
replayStoreWrapper should not create database
vazois Mar 4, 2025
9f5ff7b
wip
TalZaccai Mar 4, 2025
b19dd07
Merge branch 'talzacc/multi_db_refactor' of https://github.com/micros…
TalZaccai Mar 4, 2025
bc1db5f
wip - tests passing
TalZaccai Mar 5, 2025
32d5c06
wip
TalZaccai Mar 5, 2025
db7e587
format
TalZaccai Mar 5, 2025
c6b1029
merging with latest main
TalZaccai Mar 5, 2025
8476e35
format
TalZaccai Mar 5, 2025
52e8339
some merge fixes
TalZaccai Mar 5, 2025
d6567a1
format
TalZaccai Mar 5, 2025
8b3e300
bugfix
TalZaccai Mar 5, 2025
e5c9cfe
Merge branch 'talzacc/multi_db_refactor' into talzacc/multi_db
TalZaccai Mar 5, 2025
53dc7c1
fix
TalZaccai Mar 5, 2025
9583a97
Added ServerOperations to BDN + allocation improvements
TalZaccai Mar 7, 2025
a5750a1
Merge branch 'main' into talzacc/multi_db
TalZaccai Mar 7, 2025
1943070
Added website docs
TalZaccai Mar 7, 2025
6fa0671
Merge branch 'talzacc/multi_db' of https://github.com/microsoft/garne…
TalZaccai Mar 7, 2025
4f49303
Fixing ACL test
TalZaccai Mar 7, 2025
4e71803
Merge branch 'main' into talzacc/multi_db
TalZaccai Mar 7, 2025
1b135d9
Initialize databaseManager as single database unless multi explicitly…
TalZaccai Mar 8, 2025
8348e8a
merging from latest main
TalZaccai Mar 8, 2025
fbecd33
merge fix + format
TalZaccai Mar 8, 2025
ada9801
Merge branch 'main' into talzacc/multi_db
TalZaccai Mar 8, 2025
84f397e
merging from main + removing unnecessary usings
TalZaccai Mar 8, 2025
cb29622
bugfix
TalZaccai Mar 8, 2025
6e9d4c8
Test fix + consolidation of getting directory paths for checkpointing…
TalZaccai Mar 8, 2025
62159e6
Updating DB IDs after swap
TalZaccai Mar 8, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci-bdnbenchmark.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ jobs:
os: [ ubuntu-latest, windows-latest ]
framework: [ 'net8.0' ]
configuration: [ 'Release' ]
test: [ 'Operations.BasicOperations', 'Operations.ObjectOperations', 'Operations.HashObjectOperations', 'Operations.SortedSetOperations', 'Cluster.ClusterMigrate', 'Cluster.ClusterOperations', 'Lua.LuaScripts', 'Lua.LuaScriptCacheOperations','Lua.LuaRunnerOperations','Operations.CustomOperations', 'Operations.RawStringOperations', 'Operations.ScriptOperations', 'Operations.ModuleOperations', 'Operations.PubSubOperations', 'Network.BasicOperations', 'Network.RawStringOperations' ]
test: [ 'Operations.BasicOperations', 'Operations.ObjectOperations', 'Operations.HashObjectOperations', 'Operations.SortedSetOperations', 'Cluster.ClusterMigrate', 'Cluster.ClusterOperations', 'Lua.LuaScripts', 'Lua.LuaScriptCacheOperations','Lua.LuaRunnerOperations','Operations.CustomOperations', 'Operations.RawStringOperations', 'Operations.ScriptOperations', 'Operations.ModuleOperations', 'Operations.PubSubOperations', 'Operations.ServerOperations', 'Network.BasicOperations', 'Network.RawStringOperations' ]
steps:
- name: Check out code
uses: actions/checkout@v4
Expand Down
52 changes: 52 additions & 0 deletions benchmark/BDN.benchmark/Operations/ServerOperations.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

using BenchmarkDotNet.Attributes;
using Embedded.server;

namespace BDN.benchmark.Operations
{
/// <summary>
/// Benchmark for ServerOperations
/// </summary>
[MemoryDiagnoser]
public unsafe class ServerOperations : OperationsBase
{
static ReadOnlySpan<byte> SELECTUNSELECT => "*2\r\n$6\r\nSELECT\r\n$1\r\n1\r\n*2\r\n$6\r\nSELECT\r\n$1\r\n0\r\n"u8;
Request selectUnselect;

static ReadOnlySpan<byte> SWAPDB => "*3\r\n$6\r\nSWAPDB\r\n$1\r\n1\r\n$1\r\n0\r\n"u8;
Request swapDb;

public override void GlobalSetup()
{
base.GlobalSetup();

SetupOperation(ref selectUnselect, SELECTUNSELECT);
SetupOperation(ref swapDb, SWAPDB);

// Pre-populate data in DB0
SlowConsumeMessage("*3\r\n$3\r\nSET\r\n$1\r\na\r\n$1\r\na\r\n"u8);
SlowConsumeMessage("*3\r\n$5\r\nLPUSH\r\n$1\r\nd\r\n$1\r\nf\r\n"u8);
SlowConsumeMessage("*3\r\n$4\r\nSADD\r\n$1\r\ne\r\n$1\r\nb\r\n"u8);

// Pre-populate data in DB1
SlowConsumeMessage("*2\r\n$6\r\nSELECT\r\n$1\r\n1\r\n"u8);
SlowConsumeMessage("*3\r\n$3\r\nSET\r\n$1\r\nb\r\n$1\r\nb\r\n"u8);
SlowConsumeMessage("*3\r\n$5\r\nLPUSH\r\n$1\r\nf\r\n$1\r\nh\r\n"u8);
SlowConsumeMessage("*3\r\n$4\r\nSADD\r\n$1\r\ng\r\n$1\r\ni\r\n"u8);
}

[Benchmark]
public void SelectUnselect()
{
Send(selectUnselect);
}

[Benchmark]
public void SwapDb()
{
Send(swapDb);
}
}
}
4 changes: 2 additions & 2 deletions libs/cluster/Server/Migration/MigrateSessionKeys.cs
Original file line number Diff line number Diff line change
Expand Up @@ -182,14 +182,14 @@ public bool MigrateKeys()
return false;

// Migrate main store keys
_gcs.InitializeIterationBuffer(clusterProvider.storeWrapper.loggingFrequncy);
_gcs.InitializeIterationBuffer(clusterProvider.storeWrapper.loggingFrequency);
if (!MigrateKeysFromMainStore())
return false;

// Migrate object store keys
if (!clusterProvider.serverOptions.DisableObjects)
{
_gcs.InitializeIterationBuffer(clusterProvider.storeWrapper.loggingFrequncy);
_gcs.InitializeIterationBuffer(clusterProvider.storeWrapper.loggingFrequency);
if (!MigrateKeysFromObjectStore())
return false;
}
Expand Down
2 changes: 1 addition & 1 deletion libs/cluster/Session/RespClusterMigrateCommands.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ private void TrackImportProgress(int keyCount, bool isMainStore, bool completed
{
totalKeyCount += keyCount;
var duration = TimeSpan.FromTicks(Stopwatch.GetTimestamp() - lastLog);
if (completed || lastLog == 0 || duration >= clusterProvider.storeWrapper.loggingFrequncy)
if (completed || lastLog == 0 || duration >= clusterProvider.storeWrapper.loggingFrequency)
{
logger?.LogTrace("[{op}]: isMainStore:({storeType}) totalKeyCount:({totalKeyCount})", completed ? "COMPLETED" : "IMPORTING", isMainStore, totalKeyCount.ToString("N0"));
lastLog = Stopwatch.GetTimestamp();
Expand Down
7 changes: 6 additions & 1 deletion libs/host/Configuration/Options.cs
Original file line number Diff line number Diff line change
Expand Up @@ -569,6 +569,10 @@ internal sealed class Options
[Option("unixsocketperm", Required = false, HelpText = "Unix socket permissions in octal (Unix platforms only)")]
public int UnixSocketPermission { get; set; }

[IntRangeValidation(1, 256, isRequired: false)]
[Option("max-databases", Required = false, HelpText = "Max number of logical databases allowed in a single Garnet server instance")]
public int MaxDatabases { get; set; }

/// <summary>
/// This property contains all arguments that were not parsed by the command line argument parser
/// </summary>
Expand Down Expand Up @@ -813,7 +817,8 @@ public GarnetServerOptions GetServerOptions(ILogger logger = null)
SkipRDBRestoreChecksumValidation = SkipRDBRestoreChecksumValidation.GetValueOrDefault(),
LuaOptions = EnableLua.GetValueOrDefault() ? new LuaOptions(LuaMemoryManagementMode, LuaScriptMemoryLimit, LuaScriptTimeoutMs == 0 ? Timeout.InfiniteTimeSpan : TimeSpan.FromMilliseconds(LuaScriptTimeoutMs), LuaLoggingMode, logger) : null,
UnixSocketPath = UnixSocketPath,
UnixSocketPermission = unixSocketPermissions
UnixSocketPermission = unixSocketPermissions,
MaxDatabases = MaxDatabases,
};
}

Expand Down
3 changes: 3 additions & 0 deletions libs/host/Configuration/Redis/RedisOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ internal class RedisOptions

[RedisOption("slowlog-max-len", nameof(Options.SlowLogMaxEntries))]
public Option<int> SlowLogMaxLen { get; set; }

[RedisOption("databases", nameof(Options.MaxDatabases))]
public Option<int> Databases { get; set; }
}

/// <summary>
Expand Down
147 changes: 77 additions & 70 deletions libs/host/GarnetServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,6 @@ static string GetVersion()

private readonly GarnetServerOptions opts;
private IGarnetServer server;
private TsavoriteKV<SpanByte, SpanByte, MainStoreFunctions, MainStoreAllocator> store;
private TsavoriteKV<byte[], IGarnetObject, ObjectStoreFunctions, ObjectStoreAllocator> objectStore;
private IDevice aofDevice;
private TsavoriteLog appendOnlyFile;
private SubscribeBroker subscribeBroker;
private KVSettings<SpanByte, SpanByte> kvSettings;
private KVSettings<byte[], IGarnetObject> objKvSettings;
Expand Down Expand Up @@ -227,35 +223,44 @@ private void InitializeServer()
if (!setMax && !ThreadPool.SetMaxThreads(maxThreads, maxCPThreads))
throw new Exception($"Unable to call ThreadPool.SetMaxThreads with {maxThreads}, {maxCPThreads}");

CreateMainStore(clusterFactory, out var checkpointDir);
CreateObjectStore(clusterFactory, customCommandManager, checkpointDir, out var objectStoreSizeTracker);
StoreWrapper.DatabaseCreatorDelegate createDatabaseDelegate = (int dbId) =>
CreateDatabase(dbId, opts, clusterFactory, customCommandManager);

if (!opts.DisablePubSub)
subscribeBroker = new SubscribeBroker(null, opts.PubSubPageSizeBytes(), opts.SubscriberRefreshFrequencyMs, true, logger);

CreateAOF();

logger?.LogTrace("TLS is {tlsEnabled}", opts.TlsOptions == null ? "disabled" : "enabled");

if (logger != null)
{
var configMemoryLimit = (store.IndexSize * 64) + store.Log.MaxMemorySizeBytes + (store.ReadCache?.MaxMemorySizeBytes ?? 0) + (appendOnlyFile?.MaxMemorySizeBytes ?? 0);
if (objectStore != null)
configMemoryLimit += objectStore.IndexSize * 64 + objectStore.Log.MaxMemorySizeBytes + (objectStore.ReadCache?.MaxMemorySizeBytes ?? 0) + (objectStoreSizeTracker?.TargetSize ?? 0) + (objectStoreSizeTracker?.ReadCacheTargetSize ?? 0);
logger.LogInformation("Total configured memory limit: {configMemoryLimit}", configMemoryLimit);
}

if (opts.EndPoint is UnixDomainSocketEndPoint)
{
// Delete existing unix socket file, if it exists.
File.Delete(opts.UnixSocketPath);
}

// Create Garnet TCP server if none was provided.
this.server ??= new GarnetServerTcp(opts.EndPoint, 0, opts.TlsOptions, opts.NetworkSendThrottleMax, opts.NetworkConnectionLimit, opts.UnixSocketPath, opts.UnixSocketPermission, logger);
this.server ??= new GarnetServerTcp(opts.EndPoint, 0, opts.TlsOptions, opts.NetworkSendThrottleMax,
opts.NetworkConnectionLimit, opts.UnixSocketPath, opts.UnixSocketPermission, logger);

storeWrapper = new StoreWrapper(version, RedisProtocolVersion, server, store, objectStore, objectStoreSizeTracker,
customCommandManager, appendOnlyFile, opts, subscribeBroker, clusterFactory: clusterFactory, loggerFactory: loggerFactory);
storeWrapper = new StoreWrapper(version, RedisProtocolVersion, server,
customCommandManager, opts, subscribeBroker,
createDatabaseDelegate: createDatabaseDelegate,
clusterFactory: clusterFactory,
loggerFactory: loggerFactory);

if (logger != null)
{
var configMemoryLimit = (storeWrapper.store.IndexSize * 64) +
storeWrapper.store.Log.MaxMemorySizeBytes +
(storeWrapper.store.ReadCache?.MaxMemorySizeBytes ?? 0) +
(storeWrapper.appendOnlyFile?.MaxMemorySizeBytes ?? 0);
if (storeWrapper.objectStore != null)
configMemoryLimit += storeWrapper.objectStore.IndexSize * 64 +
storeWrapper.objectStore.Log.MaxMemorySizeBytes +
(storeWrapper.objectStore.ReadCache?.MaxMemorySizeBytes ?? 0) +
(storeWrapper.objectStoreSizeTracker?.TargetSize ?? 0) +
(storeWrapper.objectStoreSizeTracker?.ReadCacheTargetSize ?? 0);
logger.LogInformation("Total configured memory limit: {configMemoryLimit}", configMemoryLimit);
}

// Create session provider for Garnet
Provider = new GarnetProvider(storeWrapper, subscribeBroker);
Expand All @@ -270,6 +275,17 @@ private void InitializeServer()
LoadModules(customCommandManager);
}

private GarnetDatabase CreateDatabase(int dbId, GarnetServerOptions serverOptions, ClusterFactory clusterFactory,
CustomCommandManager customCommandManager)
{
var store = CreateMainStore(dbId, clusterFactory);
var objectStore = CreateObjectStore(dbId, clusterFactory, customCommandManager, out var objectStoreSizeTracker);
var (aofDevice, aof) = CreateAOF(dbId);
return new GarnetDatabase(dbId, store, objectStore, objectStoreSizeTracker, aofDevice, aof,
serverOptions.AdjustedIndexMaxCacheLines == 0,
serverOptions.AdjustedObjectStoreIndexMaxCacheLines == 0);
}

private void LoadModules(CustomCommandManager customCommandManager)
{
if (opts.LoadModuleCS == null)
Expand All @@ -294,82 +310,77 @@ private void LoadModules(CustomCommandManager customCommandManager)
}
}

private void CreateMainStore(IClusterFactory clusterFactory, out string checkpointDir)
private TsavoriteKV<SpanByte, SpanByte, MainStoreFunctions, MainStoreAllocator> CreateMainStore(int dbId, IClusterFactory clusterFactory)
{
kvSettings = opts.GetSettings(loggerFactory, out logFactory);

checkpointDir = opts.CheckpointDir ?? opts.LogDir;

// Run checkpoint on its own thread to control p99
kvSettings.ThrottleCheckpointFlushDelayMs = opts.CheckpointThrottleFlushDelayMs;

if (opts.EnableCluster)
{
kvSettings.CheckpointManager = clusterFactory.CreateCheckpointManager(opts.DeviceFactoryCreator,
new DefaultCheckpointNamingScheme(checkpointDir + "/Store/checkpoints"), isMainStore: true, logger);
}
else
{
kvSettings.CheckpointManager = new DeviceLogCommitCheckpointManager(opts.DeviceFactoryCreator,
new DefaultCheckpointNamingScheme(checkpointDir + "/Store/checkpoints"), removeOutdated: true);
}
var baseName = opts.GetMainStoreCheckpointDirectory(dbId);
var defaultNamingScheme = new DefaultCheckpointNamingScheme(baseName);

store = new(kvSettings
kvSettings.CheckpointManager = opts.EnableCluster ?
clusterFactory.CreateCheckpointManager(opts.DeviceFactoryCreator, defaultNamingScheme, isMainStore: true, logger) :
new DeviceLogCommitCheckpointManager(opts.DeviceFactoryCreator, defaultNamingScheme, removeOutdated: true);

return new TsavoriteKV<SpanByte, SpanByte, MainStoreFunctions, MainStoreAllocator>(kvSettings
, StoreFunctions<SpanByte, SpanByte>.Create()
, (allocatorSettings, storeFunctions) => new(allocatorSettings, storeFunctions));
}

private void CreateObjectStore(IClusterFactory clusterFactory, CustomCommandManager customCommandManager, string CheckpointDir, out CacheSizeTracker objectStoreSizeTracker)
private TsavoriteKV<byte[], IGarnetObject, ObjectStoreFunctions, ObjectStoreAllocator> CreateObjectStore(int dbId, IClusterFactory clusterFactory, CustomCommandManager customCommandManager, out CacheSizeTracker objectStoreSizeTracker)
{
objectStoreSizeTracker = null;
if (!opts.DisableObjects)
{
objKvSettings = opts.GetObjectStoreSettings(this.loggerFactory?.CreateLogger("TsavoriteKV [obj]"),
out var objHeapMemorySize, out var objReadCacheHeapMemorySize);
if (opts.DisableObjects)
return null;

// Run checkpoint on its own thread to control p99
objKvSettings.ThrottleCheckpointFlushDelayMs = opts.CheckpointThrottleFlushDelayMs;
objKvSettings = opts.GetObjectStoreSettings(this.loggerFactory?.CreateLogger("TsavoriteKV [obj]"),
out var objHeapMemorySize, out var objReadCacheHeapMemorySize);

// Run checkpoint on its own thread to control p99
objKvSettings.ThrottleCheckpointFlushDelayMs = opts.CheckpointThrottleFlushDelayMs;

var baseName = opts.GetObjectStoreCheckpointDirectory(dbId);
var defaultNamingScheme = new DefaultCheckpointNamingScheme(baseName);

objKvSettings.CheckpointManager = opts.EnableCluster ?
clusterFactory.CreateCheckpointManager(opts.DeviceFactoryCreator, defaultNamingScheme, isMainStore: false, logger) :
new DeviceLogCommitCheckpointManager(opts.DeviceFactoryCreator, defaultNamingScheme, removeOutdated: true);

var objStore = new TsavoriteKV<byte[], IGarnetObject, ObjectStoreFunctions, ObjectStoreAllocator>(
objKvSettings,
StoreFunctions<byte[], IGarnetObject>.Create(new ByteArrayKeyComparer(),
() => new ByteArrayBinaryObjectSerializer(),
() => new GarnetObjectSerializer(customCommandManager)),
(allocatorSettings, storeFunctions) => new(allocatorSettings, storeFunctions));

if (objHeapMemorySize > 0 || objReadCacheHeapMemorySize > 0)
objectStoreSizeTracker = new CacheSizeTracker(objStore, objKvSettings, objHeapMemorySize, objReadCacheHeapMemorySize,
this.loggerFactory);

return objStore;

if (opts.EnableCluster)
objKvSettings.CheckpointManager = clusterFactory.CreateCheckpointManager(
opts.DeviceFactoryCreator,
new DefaultCheckpointNamingScheme(CheckpointDir + "/ObjectStore/checkpoints"),
isMainStore: false, logger);
else
objKvSettings.CheckpointManager = new DeviceLogCommitCheckpointManager(opts.DeviceFactoryCreator,
new DefaultCheckpointNamingScheme(CheckpointDir + "/ObjectStore/checkpoints"),
removeOutdated: true);

objectStore = new(objKvSettings
, StoreFunctions<byte[], IGarnetObject>.Create(new ByteArrayKeyComparer(),
() => new ByteArrayBinaryObjectSerializer(),
() => new GarnetObjectSerializer(customCommandManager))
, (allocatorSettings, storeFunctions) => new(allocatorSettings, storeFunctions));

if (objHeapMemorySize > 0 || objReadCacheHeapMemorySize > 0)
objectStoreSizeTracker = new CacheSizeTracker(objectStore, objKvSettings, objHeapMemorySize, objReadCacheHeapMemorySize,
this.loggerFactory);
}
}

private void CreateAOF()
private (IDevice, TsavoriteLog) CreateAOF(int dbId)
{
if (opts.EnableAOF)
{
if (opts.FastAofTruncate && opts.CommitFrequencyMs != -1)
throw new Exception("Need to set CommitFrequencyMs to -1 (manual commits) with MainMemoryReplication");

opts.GetAofSettings(out var aofSettings);
aofDevice = aofSettings.LogDevice;
appendOnlyFile = new TsavoriteLog(aofSettings, logger: this.loggerFactory?.CreateLogger("TsavoriteLog [aof]"));

opts.GetAofSettings(dbId, out var aofSettings);
var aofDevice = aofSettings.LogDevice;
var appendOnlyFile = new TsavoriteLog(aofSettings, logger: this.loggerFactory?.CreateLogger("TsavoriteLog [aof]"));
if (opts.CommitFrequencyMs < 0 && opts.WaitForCommit)
throw new Exception("Cannot use CommitWait with manual commits");
return;
return (aofDevice, appendOnlyFile);
}

if (opts.CommitFrequencyMs != 0 || opts.WaitForCommit)
throw new Exception("Cannot use CommitFrequencyMs or CommitWait without EnableAOF");
return (null, null);
}

/// <summary>
Expand Down Expand Up @@ -415,13 +426,9 @@ private void InternalDispose()
Provider?.Dispose();
server.Dispose();
subscribeBroker?.Dispose();
store.Dispose();
appendOnlyFile?.Dispose();
aofDevice?.Dispose();
kvSettings.LogDevice?.Dispose();
if (!opts.DisableObjects)
{
objectStore.Dispose();
objKvSettings.LogDevice?.Dispose();
objKvSettings.ObjectLogDevice?.Dispose();
}
Expand Down
5 changes: 4 additions & 1 deletion libs/host/defaults.conf
Original file line number Diff line number Diff line change
Expand Up @@ -384,5 +384,8 @@
"UnixSocketPath": null,

/* Unix socket permissions in octal (Unix platforms only) */
"UnixSocketPermission": 0
"UnixSocketPermission": 0,

/* Max number of logical databases allowed in a single Garnet server instance */
"MaxDatabases": 16
}
Loading