Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Single state machine across main and object stores #1077

Draft
wants to merge 57 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
57 commits
Select commit Hold shift + click to select a range
e1db784
Simpler state machine for checkpointing
badrishc Feb 24, 2025
49b9c16
cleanup
badrishc Feb 24, 2025
d111602
updates
badrishc Feb 25, 2025
9f838f7
remove dead code
badrishc Feb 25, 2025
c907987
updates
badrishc Feb 26, 2025
e715503
updates
badrishc Feb 26, 2025
7529ade
updates
badrishc Feb 26, 2025
6da12ce
update
badrishc Feb 27, 2025
e24720c
kill code
badrishc Feb 27, 2025
1a8ba12
updates
badrishc Feb 27, 2025
a1b87a9
simplify LightEpoch
badrishc Feb 27, 2025
69c07e9
move epvs to test
badrishc Feb 27, 2025
6ec5233
nits
badrishc Feb 27, 2025
9599202
updates
badrishc Feb 27, 2025
a24e7e6
updates
badrishc Feb 28, 2025
223abf6
formatting
badrishc Feb 28, 2025
4d17abb
fix garnet
badrishc Feb 28, 2025
cc4e677
Merge remote-tracking branch 'origin/main' into badrishc/state-machin…
badrishc Feb 28, 2025
2b830b1
nit
badrishc Feb 28, 2025
d92360a
comments
badrishc Feb 28, 2025
8145607
remove manualLockingActive
badrishc Feb 28, 2025
4ba164c
update the barrier condition and remove checkpoint version switch bar…
badrishc Feb 28, 2025
aa36bf4
remove INTERMEDIATE state
badrishc Mar 1, 2025
6119e97
Merge remote-tracking branch 'origin/main' into badrishc/state-machin…
badrishc Mar 1, 2025
528c227
Remove CPR_SHIFT_DETECTED and LartchDestination.Retry
badrishc Mar 1, 2025
2067cb9
add black box test for checkpointing version switch state machine
badrishc Mar 3, 2025
174ae9a
add transaction test
badrishc Mar 3, 2025
8f72d00
clean the test
badrishc Mar 3, 2025
d304ecd
cleanup
badrishc Mar 3, 2025
36a5e18
Refactor the phases of various machines
badrishc Mar 4, 2025
0c68988
format
badrishc Mar 4, 2025
b4763c9
Merge remote-tracking branch 'origin/main' into badrishc/state-machin…
badrishc Mar 4, 2025
efd8bd2
initial commit
badrishc Mar 4, 2025
511fe75
remove sessionName
badrishc Mar 6, 2025
8ed6101
update LightEpoch based on PR comment
badrishc Mar 6, 2025
17f04ac
fix break
badrishc Mar 6, 2025
6dfc383
Use session-local isAcquiredLockable as signal for threads to decide …
badrishc Mar 6, 2025
f1ffaec
address review comments
badrishc Mar 6, 2025
3e3a377
nit
badrishc Mar 6, 2025
7c40678
Merge from base
badrishc Mar 6, 2025
cdd91f9
Merge remote-tracking branch 'origin/main' into badrishc/state-machin…
badrishc Mar 7, 2025
aa3113c
Merge branch 'badrishc/state-machine-v2' into badrishc/two-store-chec…
badrishc Mar 7, 2025
1c49869
minor code move
badrishc Mar 7, 2025
443b8ab
use shared epoch across stores
badrishc Mar 7, 2025
3071098
nit
badrishc Mar 7, 2025
aa4b628
Merge from main
badrishc Mar 7, 2025
f8a1899
add unified checkpointing logic to garnet
badrishc Mar 8, 2025
9930388
Merge remote-tracking branch 'origin/main' into badrishc/two-store-ch…
badrishc Mar 8, 2025
b6df00c
nit
badrishc Mar 8, 2025
5b4c646
fix
badrishc Mar 8, 2025
3e8c2c7
use correct SMD
badrishc Mar 8, 2025
576f92d
nit
badrishc Mar 8, 2025
76430c1
fix
badrishc Mar 8, 2025
7b7bec5
nit
badrishc Mar 8, 2025
bbad4f0
fix test as versions are different due to unified ckpt
badrishc Mar 8, 2025
af870fb
add comment
badrishc Mar 8, 2025
2bccee7
remove targetVersion from checkpoint API, versions always progress by 1.
badrishc Mar 9, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion libs/host/GarnetServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ private void InitializeServer()
if (!setMax && !ThreadPool.SetMaxThreads(maxThreads, maxCPThreads))
throw new Exception($"Unable to call ThreadPool.SetMaxThreads with {maxThreads}, {maxCPThreads}");

opts.Initialize(loggerFactory);
CreateMainStore(clusterFactory, out var checkpointDir);
CreateObjectStore(clusterFactory, customCommandManager, checkpointDir, out var objectStoreSizeTracker);

Expand Down Expand Up @@ -324,7 +325,7 @@ private void CreateObjectStore(IClusterFactory clusterFactory, CustomCommandMana
objectStoreSizeTracker = null;
if (!opts.DisableObjects)
{
objKvSettings = opts.GetObjectStoreSettings(this.loggerFactory?.CreateLogger("TsavoriteKV [obj]"),
objKvSettings = opts.GetObjectStoreSettings(loggerFactory,
out var objHeapMemorySize, out var objReadCacheHeapMemorySize);

// Run checkpoint on its own thread to control p99
Expand Down
5 changes: 2 additions & 3 deletions libs/server/AOF/AofProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -210,11 +210,10 @@ public unsafe void ProcessAofRecordInternal(byte* ptr, int length, bool asReplic
break;
case AofEntryType.MainStoreCheckpointStartCommit:
if (asReplica && header.storeVersion > storeWrapper.store.CurrentVersion)
_ = storeWrapper.TakeCheckpoint(false, StoreType.Main, logger);
_ = storeWrapper.TakeCheckpoint(false, logger);
break;
case AofEntryType.ObjectStoreCheckpointStartCommit:
if (asReplica && header.storeVersion > storeWrapper.objectStore.CurrentVersion)
_ = storeWrapper.TakeCheckpoint(false, StoreType.Object, logger);
// With unified checkpoint, we do not need to take object store checkpoint separately
break;
case AofEntryType.MainStoreCheckpointEndCommit:
case AofEntryType.ObjectStoreCheckpointEndCommit:
Expand Down
4 changes: 2 additions & 2 deletions libs/server/Resp/AdminCommands.cs
Original file line number Diff line number Diff line change
Expand Up @@ -818,7 +818,7 @@ private bool NetworkSAVE()
return AbortWithWrongNumberOfArguments(nameof(RespCommand.SAVE));
}

if (!storeWrapper.TakeCheckpoint(false, StoreType.All, logger))
if (!storeWrapper.TakeCheckpoint(false, logger))
{
while (!RespWriteUtils.TryWriteError("ERR checkpoint already in progress"u8, ref dcurr, dend))
SendAndReset();
Expand Down Expand Up @@ -853,7 +853,7 @@ private bool NetworkBGSAVE()
return AbortWithWrongNumberOfArguments(nameof(RespCommand.BGSAVE));
}

var success = storeWrapper.TakeCheckpoint(true, StoreType.All, logger);
var success = storeWrapper.TakeCheckpoint(true, logger);
if (success)
{
while (!RespWriteUtils.TryWriteSimpleString("Background saving started"u8, ref dcurr, dend))
Expand Down
40 changes: 32 additions & 8 deletions libs/server/Servers/GarnetServerOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,16 @@ public class GarnetServerOptions : ServerOptions
/// </summary>
public UnixFileMode UnixSocketPermission { get; set; }

/// <summary>
/// Epoch instance used by server
/// </summary>
public LightEpoch Epoch;

/// <summary>
/// Common state machine driver used by Garnet
/// </summary>
public StateMachineDriver StateMachineDriver;

/// <summary>
/// Constructor
/// </summary>
Expand All @@ -454,6 +464,16 @@ public GarnetServerOptions(ILogger logger = null) : base(logger)
this.logger = logger;
}

/// <summary>
/// Initialize Garnet server options
/// </summary>
/// <param name="loggerFactory"></param>
public void Initialize(ILoggerFactory loggerFactory = null)
{
Epoch = new LightEpoch();
StateMachineDriver = new(Epoch, loggerFactory?.CreateLogger($"StateMachineDriver"));
}

/// <summary>
/// Get main store settings
/// </summary>
Expand All @@ -466,15 +486,16 @@ public KVSettings<SpanByte, SpanByte> GetSettings(ILoggerFactory loggerFactory,
if (MutablePercent is < 10 or > 95)
throw new Exception("MutablePercent must be between 10 and 95");

KVSettings<SpanByte, SpanByte> kvSettings = new(baseDir: null, logger: logger);

var indexCacheLines = IndexSizeCachelines("hash index size", IndexSize);
kvSettings = new()

KVSettings<SpanByte, SpanByte> kvSettings = new()
{
IndexSize = indexCacheLines * 64L,
PreallocateLog = false,
MutableFraction = MutablePercent / 100.0,
PageSize = 1L << PageSizeBits(),
Epoch = Epoch,
StateMachineDriver = StateMachineDriver,
loggerFactory = loggerFactory,
logger = loggerFactory?.CreateLogger("TsavoriteKV [main]")
};
Expand Down Expand Up @@ -618,23 +639,26 @@ public static int MemorySizeBits(string memorySize, string storePageSize, out in
/// <summary>
/// Get KVSettings for the object store log
/// </summary>
public KVSettings<byte[], IGarnetObject> GetObjectStoreSettings(ILogger logger, out long objHeapMemorySize, out long objReadCacheHeapMemorySize)
public KVSettings<byte[], IGarnetObject> GetObjectStoreSettings(ILoggerFactory loggerFactory, out long objHeapMemorySize, out long objReadCacheHeapMemorySize)
{
objReadCacheHeapMemorySize = default;

if (ObjectStoreMutablePercent is < 10 or > 95)
throw new Exception("ObjectStoreMutablePercent must be between 10 and 95");

KVSettings<byte[], IGarnetObject> kvSettings = new(baseDir: null, logger: logger);

var indexCacheLines = IndexSizeCachelines("object store hash index size", ObjectStoreIndexSize);
kvSettings = new()
KVSettings<byte[], IGarnetObject> kvSettings = new()
{
IndexSize = indexCacheLines * 64L,
PreallocateLog = false,
MutableFraction = ObjectStoreMutablePercent / 100.0,
PageSize = 1L << ObjectStorePageSizeBits()
PageSize = 1L << ObjectStorePageSizeBits(),
Epoch = Epoch,
StateMachineDriver = StateMachineDriver,
loggerFactory = loggerFactory,
logger = loggerFactory?.CreateLogger("TsavoriteKV [obj]")
};

logger?.LogInformation("[Object Store] Using page size of {PageSize}", PrettySize(kvSettings.PageSize));
logger?.LogInformation("[Object Store] Each page can hold ~{PageSize} key-value pairs of objects", kvSettings.PageSize / 24);

Expand Down
70 changes: 37 additions & 33 deletions libs/server/StoreWrapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -761,28 +761,27 @@ public async Task TakeOnDemandCheckpoint(DateTimeOffset entryTime)
}

// Necessary to take a checkpoint because the latest checkpoint is before entryTime
await CheckpointTask(StoreType.All, logger: logger);
await CheckpointTask(logger);
}

/// <summary>
/// Take checkpoint
/// </summary>
/// <param name="background"></param>
/// <param name="storeType"></param>
/// <param name="logger"></param>
/// <returns></returns>
public bool TakeCheckpoint(bool background, StoreType storeType = StoreType.All, ILogger logger = null)
public bool TakeCheckpoint(bool background, ILogger logger = null)
{
// Prevent parallel checkpoint
if (!TryPauseCheckpoints()) return false;
if (background)
Task.Run(async () => await CheckpointTask(storeType, logger));
Task.Run(async () => await CheckpointTask(logger));
else
CheckpointTask(storeType, logger).ConfigureAwait(false).GetAwaiter().GetResult();
CheckpointTask(logger).ConfigureAwait(false).GetAwaiter().GetResult();
return true;
}

private async Task CheckpointTask(StoreType storeType, ILogger logger = null)
private async Task CheckpointTask(ILogger logger = null)
{
try
{
Expand All @@ -803,12 +802,11 @@ private async Task CheckpointTask(StoreType storeType, ILogger logger = null)
tryIncremental = false;

var checkpointType = serverOptions.UseFoldOverCheckpoints ? CheckpointType.FoldOver : CheckpointType.Snapshot;
await InitiateCheckpoint(full, checkpointType, tryIncremental, storeType, logger);
await InitiateCheckpoint(full, checkpointType, tryIncremental, logger);
if (full)
{
if (storeType is StoreType.Main or StoreType.All)
this.lastSaveStoreTailAddress = lastSaveStoreTailAddress;
if (storeType is StoreType.Object or StoreType.All)
this.lastSaveStoreTailAddress = lastSaveStoreTailAddress;
if (objectStore != null)
this.lastSaveObjectStoreTailAddress = lastSaveObjectStoreTailAddress;
}
lastSaveTime = DateTimeOffset.UtcNow;
Expand All @@ -823,9 +821,9 @@ private async Task CheckpointTask(StoreType storeType, ILogger logger = null)
}
}

private async Task InitiateCheckpoint(bool full, CheckpointType checkpointType, bool tryIncremental, StoreType storeType, ILogger logger = null)
private async Task InitiateCheckpoint(bool full, CheckpointType checkpointType, bool tryIncremental, ILogger logger = null)
{
logger?.LogInformation("Initiating checkpoint; full = {full}, type = {checkpointType}, tryIncremental = {tryIncremental}, storeType = {storeType}", full, checkpointType, tryIncremental, storeType);
logger?.LogInformation("Initiating checkpoint; full = {full}, type = {checkpointType}, tryIncremental = {tryIncremental}", full, checkpointType, tryIncremental);

long CheckpointCoveredAofAddress = 0;
if (appendOnlyFile != null)
Expand All @@ -839,41 +837,47 @@ private async Task InitiateCheckpoint(bool full, CheckpointType checkpointType,
logger?.LogInformation("Will truncate AOF to {tailAddress} after checkpoint (files deleted after next commit)", CheckpointCoveredAofAddress);
}

(bool success, Guid token) storeCheckpointResult = default;
(bool success, Guid token) objectStoreCheckpointResult = default;
(bool success, Guid token) checkpointResult = default;
IStateMachine sm;
if (full)
{
if (storeType is StoreType.Main or StoreType.All)
{
storeCheckpointResult = await store.TakeFullCheckpointAsync(checkpointType);
if (serverOptions.EnableCluster && clusterProvider.IsPrimary()) EnqueueCommit(AofEntryType.MainStoreCheckpointEndCommit, store.CurrentVersion);
}

if (objectStore != null && (storeType == StoreType.Object || storeType == StoreType.All))
{
objectStoreCheckpointResult = await objectStore.TakeFullCheckpointAsync(checkpointType);
if (serverOptions.EnableCluster && clusterProvider.IsPrimary()) EnqueueCommit(AofEntryType.ObjectStoreCheckpointEndCommit, objectStore.CurrentVersion);
}
sm = objectStore == null ?
Checkpoint.Full(store, checkpointType, out checkpointResult.token) :
Checkpoint.Full(store, objectStore, checkpointType, out checkpointResult.token);
}
else
{
if (storeType is StoreType.Main or StoreType.All)
tryIncremental &= store.CanTakeIncrementalCheckpoint(checkpointType, out checkpointResult.token);
if (objectStore != null)
tryIncremental &= objectStore.CanTakeIncrementalCheckpoint(checkpointType, out var guid2) & checkpointResult.token == guid2;

if (tryIncremental)
{
storeCheckpointResult = await store.TakeHybridLogCheckpointAsync(checkpointType, tryIncremental);
if (serverOptions.EnableCluster && clusterProvider.IsPrimary()) EnqueueCommit(AofEntryType.MainStoreCheckpointEndCommit, store.CurrentVersion);
sm = objectStore == null ?
Checkpoint.IncrementalHybridLogOnly(store, checkpointResult.token) :
Checkpoint.IncrementalHybridLogOnly(store, objectStore, checkpointResult.token);
}

if (objectStore != null && (storeType == StoreType.Object || storeType == StoreType.All))
else
{
objectStoreCheckpointResult = await objectStore.TakeHybridLogCheckpointAsync(checkpointType, tryIncremental);
if (serverOptions.EnableCluster && clusterProvider.IsPrimary()) EnqueueCommit(AofEntryType.ObjectStoreCheckpointEndCommit, objectStore.CurrentVersion);
sm = objectStore == null ?
Checkpoint.HybridLogOnly(store, checkpointType, out checkpointResult.token) :
Checkpoint.HybridLogOnly(store, objectStore, checkpointType, out checkpointResult.token);
}
}

checkpointResult.success = await serverOptions.StateMachineDriver.RunAsync(sm);

if (serverOptions.EnableCluster && clusterProvider.IsPrimary())
{
EnqueueCommit(AofEntryType.MainStoreCheckpointEndCommit, store.CurrentVersion);
if (objectStore != null)
EnqueueCommit(AofEntryType.ObjectStoreCheckpointEndCommit, objectStore.CurrentVersion);
}

// If cluster is enabled the replication manager is responsible for truncating AOF
if (serverOptions.EnableCluster && serverOptions.EnableAOF)
{
clusterProvider.SafeTruncateAOF(storeType, full, CheckpointCoveredAofAddress, storeCheckpointResult.token, objectStoreCheckpointResult.token);
clusterProvider.SafeTruncateAOF(StoreType.All, full, CheckpointCoveredAofAddress, checkpointResult.token, checkpointResult.token);
}
else
{
Expand Down
40 changes: 30 additions & 10 deletions libs/server/Transaction/TransactionManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -292,23 +292,43 @@ internal void GetKeysForValidation(byte* recvBufferPtr, out ArgSlice[] keys, out
readOnly = keyEntries.IsReadOnly;
}

void BeginLockable(StoreType transactionStoreType)
{
while (true)
{
if (transactionStoreType is StoreType.All or StoreType.Main)
{
lockableContext.BeginLockable();
}
if (transactionStoreType is StoreType.All or StoreType.Object)
{
if (objectStoreBasicContext.IsNull)
throw new Exception("Trying to perform object store transaction with object store disabled");
if (objectStoreLockableContext.TryBeginLockable())
{
// If we managed to begin lockable for the object store, we MUST be in the same version as the main store
break;
}
objectStoreLockableContext.Refresh();
if (transactionStoreType is StoreType.All or StoreType.Main)
{
lockableContext.EndLockable();
lockableContext.Refresh();
}
continue;
}
break;
}
}

internal bool Run(bool internal_txn = false, bool fail_fast_on_lock = false, TimeSpan lock_timeout = default)
{
// Save watch keys to lock list
if (!internal_txn)
watchContainer.SaveKeysToLock(this);

// Acquire lock sessions
if (transactionStoreType == StoreType.All || transactionStoreType == StoreType.Main)
{
lockableContext.BeginLockable();
}
if (transactionStoreType == StoreType.All || transactionStoreType == StoreType.Object)
{
if (objectStoreBasicContext.IsNull)
throw new Exception("Trying to perform object store transaction with object store disabled");
objectStoreLockableContext.BeginLockable();
}
BeginLockable(transactionStoreType);

bool lockSuccess;
if (fail_fast_on_lock)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ public interface IStreamingSnapshotIteratorFunctions<TKey, TValue>
/// <summary>Iteration is starting.</summary>
/// <param name="checkpointToken">Checkpoint token</param>
/// <param name="currentVersion">Current version of database</param>
/// <param name="targetVersion">Target version of database</param>
/// <param name="nextVersion">Next version of database</param>
/// <returns>True to continue iteration, else false</returns>
bool OnStart(Guid checkpointToken, long currentVersion, long targetVersion);
bool OnStart(Guid checkpointToken, long currentVersion, long nextVersion);

/// <summary>Next record in the streaming snapshot.</summary>
/// <param name="key">Reference to the current record's key</param>
Expand Down
21 changes: 21 additions & 0 deletions libs/storage/Tsavorite/cs/src/core/ClientSession/ClientSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,27 @@ internal void AcquireLockable<TSessionFunctions>(TSessionFunctions sessionFuncti
}
}

internal bool TryAcquireLockable<TSessionFunctions>(TSessionFunctions sessionFunctions)
where TSessionFunctions : ISessionFunctionsWrapper<TKey, TValue, TInput, TOutput, TContext, TStoreFunctions, TAllocator>
{
CheckIsNotAcquiredLockable(sessionFunctions);

// Checkpoints cannot complete while we have active locking sessions.
if (IsInPreparePhase())
{
return false;
}

store.IncrementNumLockingSessions();
sessionFunctions.Ctx.isAcquiredLockable = true;

if (!IsInPreparePhase())
return true;

InternalReleaseLockable(sessionFunctions);
return false;
}

internal void ReleaseLockable<TSessionFunctions>(TSessionFunctions sessionFunctions)
where TSessionFunctions : ISessionFunctionsWrapper<TKey, TValue, TInput, TOutput, TContext, TStoreFunctions, TAllocator>
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ public interface ILockableContext<TKey>
/// </summary>
void BeginLockable();

/// <summary>
/// Try to begin a series of lock operations on possibly multiple keys; call before any locks are taken.
/// </summary>
bool TryBeginLockable();

/// <summary>
/// Ends a series of lock operations on possibly multiple keys; call after all locks are released.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ internal LockableContext(ClientSession<TKey, TValue, TInput, TOutput, TContext,
/// <inheritdoc/>
public void BeginLockable() => clientSession.AcquireLockable(sessionFunctions);

/// <inheritdoc/>
public bool TryBeginLockable() => clientSession.TryAcquireLockable(sessionFunctions);

/// <inheritdoc/>
public void EndLockable() => clientSession.ReleaseLockable(sessionFunctions);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ internal LockableUnsafeContext(ClientSession<TKey, TValue, TInput, TOutput, TCon
/// <inheritdoc/>
public void BeginLockable() => clientSession.AcquireLockable(sessionFunctions);

/// <inheritdoc/>
public bool TryBeginLockable() => clientSession.TryAcquireLockable(sessionFunctions);

/// <inheritdoc/>
public void EndLockable() => clientSession.ReleaseLockable(sessionFunctions);
#endregion Begin/EndLockable
Expand Down
Loading
Loading