Skip to content

Commit

Permalink
Added ServerOperations to BDN + allocation improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
TalZaccai committed Mar 7, 2025
1 parent 53dc7c1 commit 9583a97
Show file tree
Hide file tree
Showing 18 changed files with 299 additions and 108 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci-bdnbenchmark.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ jobs:
os: [ ubuntu-latest, windows-latest ]
framework: [ 'net8.0' ]
configuration: [ 'Release' ]
test: [ 'Operations.BasicOperations', 'Operations.ObjectOperations', 'Operations.HashObjectOperations', 'Operations.SortedSetOperations', 'Cluster.ClusterMigrate', 'Cluster.ClusterOperations', 'Lua.LuaScripts', 'Lua.LuaScriptCacheOperations','Lua.LuaRunnerOperations','Operations.CustomOperations', 'Operations.RawStringOperations', 'Operations.ScriptOperations', 'Operations.ModuleOperations', 'Operations.PubSubOperations', 'Network.BasicOperations', 'Network.RawStringOperations' ]
test: [ 'Operations.BasicOperations', 'Operations.ObjectOperations', 'Operations.HashObjectOperations', 'Operations.SortedSetOperations', 'Cluster.ClusterMigrate', 'Cluster.ClusterOperations', 'Lua.LuaScripts', 'Lua.LuaScriptCacheOperations','Lua.LuaRunnerOperations','Operations.CustomOperations', 'Operations.RawStringOperations', 'Operations.ScriptOperations', 'Operations.ModuleOperations', 'Operations.PubSubOperations', 'Operations.ServerOperations', 'Network.BasicOperations', 'Network.RawStringOperations' ]
steps:
- name: Check out code
uses: actions/checkout@v4
Expand Down
52 changes: 52 additions & 0 deletions benchmark/BDN.benchmark/Operations/ServerOperations.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

using BenchmarkDotNet.Attributes;
using Embedded.server;

namespace BDN.benchmark.Operations
{
/// <summary>
/// Benchmark for ServerOperations
/// </summary>
[MemoryDiagnoser]
public unsafe class ServerOperations : OperationsBase
{
static ReadOnlySpan<byte> SELECTUNSELECT => "*2\r\n$6\r\nSELECT\r\n$1\r\n1\r\n*2\r\n$6\r\nSELECT\r\n$1\r\n0\r\n"u8;
Request selectUnselect;

static ReadOnlySpan<byte> SWAPDB => "*3\r\n$6\r\nSWAPDB\r\n$1\r\n1\r\n$1\r\n0\r\n"u8;
Request swapDb;

public override void GlobalSetup()
{
base.GlobalSetup();

SetupOperation(ref selectUnselect, SELECTUNSELECT);
SetupOperation(ref swapDb, SWAPDB);

// Pre-populate data in DB0
SlowConsumeMessage("*3\r\n$3\r\nSET\r\n$1\r\na\r\n$1\r\na\r\n"u8);
SlowConsumeMessage("*3\r\n$5\r\nLPUSH\r\n$1\r\nd\r\n$1\r\nf\r\n"u8);
SlowConsumeMessage("*3\r\n$4\r\nSADD\r\n$1\r\ne\r\n$1\r\nb\r\n"u8);

// Pre-populate data in DB1
SlowConsumeMessage("*2\r\n$6\r\nSELECT\r\n$1\r\n1\r\n"u8);
SlowConsumeMessage("*3\r\n$3\r\nSET\r\n$1\r\nb\r\n$1\r\nb\r\n"u8);
SlowConsumeMessage("*3\r\n$5\r\nLPUSH\r\n$1\r\nf\r\n$1\r\nh\r\n"u8);
SlowConsumeMessage("*3\r\n$4\r\nSADD\r\n$1\r\ng\r\n$1\r\ni\r\n"u8);
}

[Benchmark]
public void SelectUnselect()
{
Send(selectUnselect);
}

[Benchmark]
public void SwapDb()
{
Send(swapDb);
}
}
}
43 changes: 1 addition & 42 deletions libs/server/Custom/ExpandableMap.cs
Original file line number Diff line number Diff line change
Expand Up @@ -76,47 +76,6 @@ public bool TryGetValue(int id, out T value)
return true;
}

/// <summary>
/// If value exists for specified ID, return it,
/// otherwise set it using the provided value factory
/// </summary>
/// <param name="id">Item ID</param>
/// <param name="valueFactory">Value factory</param>
/// <param name="value">Returned value</param>
/// <param name="added">True if item was not previously set, but was set by this method</param>
/// <returns>True if item was previously initialized or set successfully</returns>
public bool TryGetOrSet(int id, Func<T> valueFactory, out T value, out bool added)
{
added = false;

// Try to get the current value, if value is already set, return it
if (this.TryGetValue(id, out var currValue) && !currValue.Equals(default(T)))
{
value = currValue;
return true;
}

mapLock.WriteLock();
try
{
// Try to get the current value, if value is already set, return it
if (this.TryGetValue(id, out currValue) && !currValue.Equals(default(T)))
{
value = currValue;
return true;
}

// Try to set value with expanding the map, if needed
value = valueFactory();
added = this.TrySetValueUnsafe(id, ref value, noExpansion: false);
return added;
}
finally
{
mapLock.WriteUnlock();
}
}

/// <summary>
/// Try to set item by ID
/// </summary>
Expand Down Expand Up @@ -236,7 +195,7 @@ private bool TryUpdateActualSize(int id)
/// <param name="value">Item value</param>
/// <param name="noExpansion">True if should not attempt to expand the underlying array</param>
/// <returns>True if assignment succeeded</returns>
private bool TrySetValueUnsafe(int id, ref T value, bool noExpansion)
internal bool TrySetValueUnsafe(int id, ref T value, bool noExpansion)
{
var idx = id - minId;
if (idx < 0 || idx >= maxSize) return false;
Expand Down
10 changes: 10 additions & 0 deletions libs/server/Custom/IDefaultChecker.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

namespace Garnet.server
{
public interface IDefaultChecker
{
public bool IsDefault();
}
}
4 changes: 2 additions & 2 deletions libs/server/Databases/DatabaseManagerBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ internal abstract class DatabaseManagerBase : IDatabaseManager
public readonly StoreWrapper StoreWrapper;

/// <inheritdoc/>
public abstract bool TryGetOrAddDatabase(int dbId, out GarnetDatabase db);
public abstract ref GarnetDatabase TryGetOrAddDatabase(int dbId, out bool success, out bool added);

/// <inheritdoc/>
public abstract bool TryPauseCheckpoints(int dbId);
Expand Down Expand Up @@ -119,7 +119,7 @@ public abstract Task TaskCheckpointBasedOnAofSizeLimitAsync(long aofSizeLimit,
public abstract GarnetDatabase[] GetDatabasesSnapshot();

/// <inheritdoc/>
public abstract bool TryGetDatabase(int dbId, out GarnetDatabase db);
public abstract ref GarnetDatabase TryGetDatabase(int dbId, out bool found);

/// <inheritdoc/>
public abstract void FlushDatabase(bool unsafeTruncateLog, int dbId = 0);
Expand Down
13 changes: 7 additions & 6 deletions libs/server/Databases/IDatabaseManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,10 @@ public interface IDatabaseManager : IDisposable
/// Try to get or add a new database
/// </summary>
/// <param name="dbId">Database ID</param>
/// <param name="db">Database</param>
/// <returns>True if database was retrieved or added successfully</returns>
public bool TryGetOrAddDatabase(int dbId, out GarnetDatabase db);
/// <param name="success">Database was found or added successfully</param>
/// <param name="added">True if database was added</param>
/// <returns>Reference to retrieved or added database</returns>
public ref GarnetDatabase TryGetOrAddDatabase(int dbId, out bool success, out bool added);

/// <summary>
/// Mark the beginning of a checkpoint by taking and a lock to avoid concurrent checkpointing
Expand Down Expand Up @@ -201,9 +202,9 @@ public Task TaskCheckpointBasedOnAofSizeLimitAsync(long aofSizeLimit, Cancellati
/// Get database DB ID
/// </summary>
/// <param name="dbId">DB Id</param>
/// <param name="db">Database</param>
/// <returns>True if database found</returns>
public bool TryGetDatabase(int dbId, out GarnetDatabase db);
/// <param name="found">True if database was found</param>
/// <returns>Reference to database</returns>
public ref GarnetDatabase TryGetDatabase(int dbId, out bool found);

/// <summary>
/// Flush database with specified ID
Expand Down
102 changes: 77 additions & 25 deletions libs/server/Databases/MultiDatabaseManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Garnet.common;
Expand Down Expand Up @@ -120,7 +121,8 @@ public override void RecoverCheckpoint(bool replicaRecover = false, bool recover

foreach (var dbId in dbIdsToRecover)
{
if (!TryGetOrAddDatabase(dbId, out var db))
ref var db = ref TryGetOrAddDatabase(dbId, out var success, out _);
if (!success)
throw new GarnetException($"Failed to retrieve or create database for checkpoint recovery (DB ID = {dbId}).");

try
Expand Down Expand Up @@ -377,7 +379,8 @@ public override void RecoverAOF()

foreach (var dbId in dbIdsToRecover)
{
if (!TryGetOrAddDatabase(dbId, out var db))
ref var db = ref TryGetOrAddDatabase(dbId, out var success, out _);
if (!success)
throw new GarnetException($"Failed to retrieve or create database for AOF recovery (DB ID = {dbId}).");

RecoverDatabaseAOF(ref db);
Expand Down Expand Up @@ -519,7 +522,8 @@ public override void StartObjectSizeTrackers(CancellationToken token = default)
/// <inheritdoc/>
public override void Reset(int dbId = 0)
{
if (!TryGetOrAddDatabase(dbId, out var db))
ref var db = ref TryGetOrAddDatabase(dbId, out var success, out _);
if (!success)
throw new GarnetException($"Database with ID {dbId} was not found.");

ResetDatabase(ref db);
Expand All @@ -542,7 +546,8 @@ public override void ResetRevivificationStats()

public override void EnqueueCommit(bool isMainStore, long version, int dbId = 0, bool diskless = false)
{
if (!TryGetOrAddDatabase(dbId, out var db))
ref var db = ref TryGetOrAddDatabase(dbId, out var success, out _);
if (!success)
throw new GarnetException($"Database with ID {dbId} was not found.");

EnqueueDatabaseCommit(ref db, isMainStore, version, diskless);
Expand Down Expand Up @@ -570,18 +575,26 @@ public override bool TrySwapDatabases(int dbId1, int dbId2)
{
if (dbId1 == dbId2) return true;

if (!TryGetOrAddDatabase(dbId1, out var db1) ||
!TryGetOrAddDatabase(dbId2, out var db2))
ref var db1 = ref TryGetOrAddDatabase(dbId1, out var success, out _);
if (!success)
return false;

ref var db2 = ref TryGetOrAddDatabase(dbId2, out success, out _);
if (!success)
return false;

databasesLock.WriteLock();
try
{
var databaseMapSnapshot = databases.Map;
databaseMapSnapshot[dbId2] = db1;
var tmp = db1;
databaseMapSnapshot[dbId1] = db2;
databaseMapSnapshot[dbId2] = tmp;

var sessions = StoreWrapper.TcpServer?.ActiveConsumers().ToArray();
if (sessions == null) return true;
if (sessions.Length > 1) return false;

var sessions = StoreWrapper.TcpServer.ActiveConsumers();
foreach (var session in sessions)
{
if (session is not RespServerSession respServerSession) continue;
Expand Down Expand Up @@ -611,29 +624,62 @@ protected override ref GarnetDatabase GetDatabaseByRef(int dbId = 0)

public override FunctionsState CreateFunctionsState(int dbId = 0)
{
if (!TryGetOrAddDatabase(dbId, out var db))
ref var db = ref TryGetOrAddDatabase(dbId, out var success, out _);
if (!success)
throw new GarnetException($"Database with ID {dbId} was not found.");

return new(db.AppendOnlyFile, db.VersionMap, StoreWrapper.customCommandManager, null, db.ObjectStoreSizeTracker,
StoreWrapper.GarnetObjectSerializer);
}

/// <inheritdoc/>
public override bool TryGetOrAddDatabase(int dbId, out GarnetDatabase db)
public override ref GarnetDatabase TryGetOrAddDatabase(int dbId, out bool success, out bool added)
{
if (!databases.TryGetOrSet(dbId, () => CreateDatabaseDelegate(dbId, out _, out _), out db, out var added))
return false;
added = false;
success = false;

if (added)
HandleDatabaseAdded(dbId);
var databasesMapSize = databases.ActualSize;
var databasesMapSnapshot = databases.Map;

return true;
if (dbId >= 0 && dbId < databasesMapSize && !databasesMapSnapshot[dbId].IsDefault())
{
success = true;
return ref databasesMapSnapshot[dbId];
}

databases.mapLock.WriteLock();

try
{
if (dbId >= 0 && dbId < databasesMapSize && !databasesMapSnapshot[dbId].IsDefault())
{
success = true;
return ref databasesMapSnapshot[dbId];
}

var db = CreateDatabaseDelegate(dbId, out _, out _);
if (!databases.TrySetValueUnsafe(dbId, ref db, false))
return ref GarnetDatabase.Empty;
}
finally
{
databases.mapLock.WriteUnlock();
}

added = true;
success = true;

HandleDatabaseAdded(dbId);

databasesMapSnapshot = databases.Map;
return ref databasesMapSnapshot[dbId];
}

/// <inheritdoc/>
public override bool TryPauseCheckpoints(int dbId)
{
if (!TryGetOrAddDatabase(dbId, out var db))
ref var db = ref TryGetOrAddDatabase(dbId, out var success, out _);
if (!success)
throw new GarnetException($"Database with ID {dbId} was not found.");

return TryPauseCheckpoints(ref db);
Expand Down Expand Up @@ -668,33 +714,39 @@ public override void ResumeCheckpoints(int dbId)
}

/// <inheritdoc/>
public override bool TryGetDatabase(int dbId, out GarnetDatabase db)
public override ref GarnetDatabase TryGetDatabase(int dbId, out bool found)
{
found = false;

var databasesMapSize = databases.ActualSize;
var databasesMapSnapshot = databases.Map;

if (dbId == 0)
{
db = databasesMapSnapshot[0];
Debug.Assert(!db.IsDefault());
return true;
Debug.Assert(!databasesMapSnapshot[0].IsDefault());
found = true;
return ref databasesMapSnapshot[0];
}

// Check if database already exists
if (dbId < databasesMapSize)
{
db = databasesMapSnapshot[dbId];
if (!db.IsDefault()) return true;
if (!databasesMapSnapshot[dbId].IsDefault())
{
found = true;
return ref databasesMapSnapshot[dbId];
}
}

// Try to retrieve or add database
return TryGetOrAddDatabase(dbId, out db);
found = false;
return ref GarnetDatabase.Empty;
}

/// <inheritdoc/>
public override void FlushDatabase(bool unsafeTruncateLog, int dbId = 0)
{
if (!TryGetOrAddDatabase(dbId, out var db))
ref var db = ref TryGetOrAddDatabase(dbId, out var success, out _);
if (!success)
throw new GarnetException($"Database with ID {dbId} was not found.");

FlushDatabase(ref db, unsafeTruncateLog);
Expand Down
Loading

0 comments on commit 9583a97

Please sign in to comment.