diff --git a/libs/host/GarnetServer.cs b/libs/host/GarnetServer.cs
index ef5fd1a5df..b350d5ce24 100644
--- a/libs/host/GarnetServer.cs
+++ b/libs/host/GarnetServer.cs
@@ -227,6 +227,7 @@ private void InitializeServer()
if (!setMax && !ThreadPool.SetMaxThreads(maxThreads, maxCPThreads))
throw new Exception($"Unable to call ThreadPool.SetMaxThreads with {maxThreads}, {maxCPThreads}");
+ opts.Initialize(loggerFactory);
CreateMainStore(clusterFactory, out var checkpointDir);
CreateObjectStore(clusterFactory, customCommandManager, checkpointDir, out var objectStoreSizeTracker);
@@ -324,7 +325,7 @@ private void CreateObjectStore(IClusterFactory clusterFactory, CustomCommandMana
objectStoreSizeTracker = null;
if (!opts.DisableObjects)
{
- objKvSettings = opts.GetObjectStoreSettings(this.loggerFactory?.CreateLogger("TsavoriteKV [obj]"),
+ objKvSettings = opts.GetObjectStoreSettings(loggerFactory,
out var objHeapMemorySize, out var objReadCacheHeapMemorySize);
// Run checkpoint on its own thread to control p99
diff --git a/libs/server/AOF/AofProcessor.cs b/libs/server/AOF/AofProcessor.cs
index e3f8ee95c4..0472d93f73 100644
--- a/libs/server/AOF/AofProcessor.cs
+++ b/libs/server/AOF/AofProcessor.cs
@@ -210,11 +210,10 @@ public unsafe void ProcessAofRecordInternal(byte* ptr, int length, bool asReplic
break;
case AofEntryType.MainStoreCheckpointStartCommit:
if (asReplica && header.storeVersion > storeWrapper.store.CurrentVersion)
- _ = storeWrapper.TakeCheckpoint(false, StoreType.Main, logger);
+ _ = storeWrapper.TakeCheckpoint(false, logger);
break;
case AofEntryType.ObjectStoreCheckpointStartCommit:
- if (asReplica && header.storeVersion > storeWrapper.objectStore.CurrentVersion)
- _ = storeWrapper.TakeCheckpoint(false, StoreType.Object, logger);
+ // With unified checkpoint, we do not need to take object store checkpoint separately
break;
case AofEntryType.MainStoreCheckpointEndCommit:
case AofEntryType.ObjectStoreCheckpointEndCommit:
diff --git a/libs/server/Resp/AdminCommands.cs b/libs/server/Resp/AdminCommands.cs
index 5905b61eee..676766a09a 100644
--- a/libs/server/Resp/AdminCommands.cs
+++ b/libs/server/Resp/AdminCommands.cs
@@ -818,7 +818,7 @@ private bool NetworkSAVE()
return AbortWithWrongNumberOfArguments(nameof(RespCommand.SAVE));
}
- if (!storeWrapper.TakeCheckpoint(false, StoreType.All, logger))
+ if (!storeWrapper.TakeCheckpoint(false, logger))
{
while (!RespWriteUtils.TryWriteError("ERR checkpoint already in progress"u8, ref dcurr, dend))
SendAndReset();
@@ -853,7 +853,7 @@ private bool NetworkBGSAVE()
return AbortWithWrongNumberOfArguments(nameof(RespCommand.BGSAVE));
}
- var success = storeWrapper.TakeCheckpoint(true, StoreType.All, logger);
+ var success = storeWrapper.TakeCheckpoint(true, logger);
if (success)
{
while (!RespWriteUtils.TryWriteSimpleString("Background saving started"u8, ref dcurr, dend))
diff --git a/libs/server/Servers/GarnetServerOptions.cs b/libs/server/Servers/GarnetServerOptions.cs
index 399578e82d..fd6a0d8f79 100644
--- a/libs/server/Servers/GarnetServerOptions.cs
+++ b/libs/server/Servers/GarnetServerOptions.cs
@@ -446,6 +446,16 @@ public class GarnetServerOptions : ServerOptions
///
public UnixFileMode UnixSocketPermission { get; set; }
+ ///
+ /// Epoch instance used by server
+ ///
+ public LightEpoch Epoch;
+
+ ///
+ /// Common state machine driver used by Garnet
+ ///
+ public StateMachineDriver StateMachineDriver;
+
///
/// Constructor
///
@@ -454,6 +464,16 @@ public GarnetServerOptions(ILogger logger = null) : base(logger)
this.logger = logger;
}
+ ///
+ /// Initialize Garnet server options
+ ///
+ ///
+ public void Initialize(ILoggerFactory loggerFactory = null)
+ {
+ Epoch = new LightEpoch();
+ StateMachineDriver = new(Epoch, loggerFactory?.CreateLogger($"StateMachineDriver"));
+ }
+
///
/// Get main store settings
///
@@ -466,15 +486,16 @@ public KVSettings GetSettings(ILoggerFactory loggerFactory,
if (MutablePercent is < 10 or > 95)
throw new Exception("MutablePercent must be between 10 and 95");
- KVSettings kvSettings = new(baseDir: null, logger: logger);
-
var indexCacheLines = IndexSizeCachelines("hash index size", IndexSize);
- kvSettings = new()
+
+ KVSettings kvSettings = new()
{
IndexSize = indexCacheLines * 64L,
PreallocateLog = false,
MutableFraction = MutablePercent / 100.0,
PageSize = 1L << PageSizeBits(),
+ Epoch = Epoch,
+ StateMachineDriver = StateMachineDriver,
loggerFactory = loggerFactory,
logger = loggerFactory?.CreateLogger("TsavoriteKV [main]")
};
@@ -618,23 +639,26 @@ public static int MemorySizeBits(string memorySize, string storePageSize, out in
///
/// Get KVSettings for the object store log
///
- public KVSettings GetObjectStoreSettings(ILogger logger, out long objHeapMemorySize, out long objReadCacheHeapMemorySize)
+ public KVSettings GetObjectStoreSettings(ILoggerFactory loggerFactory, out long objHeapMemorySize, out long objReadCacheHeapMemorySize)
{
objReadCacheHeapMemorySize = default;
if (ObjectStoreMutablePercent is < 10 or > 95)
throw new Exception("ObjectStoreMutablePercent must be between 10 and 95");
- KVSettings kvSettings = new(baseDir: null, logger: logger);
-
var indexCacheLines = IndexSizeCachelines("object store hash index size", ObjectStoreIndexSize);
- kvSettings = new()
+ KVSettings kvSettings = new()
{
IndexSize = indexCacheLines * 64L,
PreallocateLog = false,
MutableFraction = ObjectStoreMutablePercent / 100.0,
- PageSize = 1L << ObjectStorePageSizeBits()
+ PageSize = 1L << ObjectStorePageSizeBits(),
+ Epoch = Epoch,
+ StateMachineDriver = StateMachineDriver,
+ loggerFactory = loggerFactory,
+ logger = loggerFactory?.CreateLogger("TsavoriteKV [obj]")
};
+
logger?.LogInformation("[Object Store] Using page size of {PageSize}", PrettySize(kvSettings.PageSize));
logger?.LogInformation("[Object Store] Each page can hold ~{PageSize} key-value pairs of objects", kvSettings.PageSize / 24);
diff --git a/libs/server/StoreWrapper.cs b/libs/server/StoreWrapper.cs
index 0def6ff075..555fcec299 100644
--- a/libs/server/StoreWrapper.cs
+++ b/libs/server/StoreWrapper.cs
@@ -761,28 +761,27 @@ public async Task TakeOnDemandCheckpoint(DateTimeOffset entryTime)
}
// Necessary to take a checkpoint because the latest checkpoint is before entryTime
- await CheckpointTask(StoreType.All, logger: logger);
+ await CheckpointTask(logger);
}
///
/// Take checkpoint
///
///
- ///
///
///
- public bool TakeCheckpoint(bool background, StoreType storeType = StoreType.All, ILogger logger = null)
+ public bool TakeCheckpoint(bool background, ILogger logger = null)
{
// Prevent parallel checkpoint
if (!TryPauseCheckpoints()) return false;
if (background)
- Task.Run(async () => await CheckpointTask(storeType, logger));
+ Task.Run(async () => await CheckpointTask(logger));
else
- CheckpointTask(storeType, logger).ConfigureAwait(false).GetAwaiter().GetResult();
+ CheckpointTask(logger).ConfigureAwait(false).GetAwaiter().GetResult();
return true;
}
- private async Task CheckpointTask(StoreType storeType, ILogger logger = null)
+ private async Task CheckpointTask(ILogger logger = null)
{
try
{
@@ -803,12 +802,11 @@ private async Task CheckpointTask(StoreType storeType, ILogger logger = null)
tryIncremental = false;
var checkpointType = serverOptions.UseFoldOverCheckpoints ? CheckpointType.FoldOver : CheckpointType.Snapshot;
- await InitiateCheckpoint(full, checkpointType, tryIncremental, storeType, logger);
+ await InitiateCheckpoint(full, checkpointType, tryIncremental, logger);
if (full)
{
- if (storeType is StoreType.Main or StoreType.All)
- this.lastSaveStoreTailAddress = lastSaveStoreTailAddress;
- if (storeType is StoreType.Object or StoreType.All)
+ this.lastSaveStoreTailAddress = lastSaveStoreTailAddress;
+ if (objectStore != null)
this.lastSaveObjectStoreTailAddress = lastSaveObjectStoreTailAddress;
}
lastSaveTime = DateTimeOffset.UtcNow;
@@ -823,9 +821,9 @@ private async Task CheckpointTask(StoreType storeType, ILogger logger = null)
}
}
- private async Task InitiateCheckpoint(bool full, CheckpointType checkpointType, bool tryIncremental, StoreType storeType, ILogger logger = null)
+ private async Task InitiateCheckpoint(bool full, CheckpointType checkpointType, bool tryIncremental, ILogger logger = null)
{
- logger?.LogInformation("Initiating checkpoint; full = {full}, type = {checkpointType}, tryIncremental = {tryIncremental}, storeType = {storeType}", full, checkpointType, tryIncremental, storeType);
+ logger?.LogInformation("Initiating checkpoint; full = {full}, type = {checkpointType}, tryIncremental = {tryIncremental}", full, checkpointType, tryIncremental);
long CheckpointCoveredAofAddress = 0;
if (appendOnlyFile != null)
@@ -839,41 +837,47 @@ private async Task InitiateCheckpoint(bool full, CheckpointType checkpointType,
logger?.LogInformation("Will truncate AOF to {tailAddress} after checkpoint (files deleted after next commit)", CheckpointCoveredAofAddress);
}
- (bool success, Guid token) storeCheckpointResult = default;
- (bool success, Guid token) objectStoreCheckpointResult = default;
+ (bool success, Guid token) checkpointResult = default;
+ IStateMachine sm;
if (full)
{
- if (storeType is StoreType.Main or StoreType.All)
- {
- storeCheckpointResult = await store.TakeFullCheckpointAsync(checkpointType);
- if (serverOptions.EnableCluster && clusterProvider.IsPrimary()) EnqueueCommit(AofEntryType.MainStoreCheckpointEndCommit, store.CurrentVersion);
- }
-
- if (objectStore != null && (storeType == StoreType.Object || storeType == StoreType.All))
- {
- objectStoreCheckpointResult = await objectStore.TakeFullCheckpointAsync(checkpointType);
- if (serverOptions.EnableCluster && clusterProvider.IsPrimary()) EnqueueCommit(AofEntryType.ObjectStoreCheckpointEndCommit, objectStore.CurrentVersion);
- }
+ sm = objectStore == null ?
+ Checkpoint.Full(store, checkpointType, out checkpointResult.token) :
+ Checkpoint.Full(store, objectStore, checkpointType, out checkpointResult.token);
}
else
{
- if (storeType is StoreType.Main or StoreType.All)
+ tryIncremental &= store.CanTakeIncrementalCheckpoint(checkpointType, out checkpointResult.token);
+ if (objectStore != null)
+ tryIncremental &= objectStore.CanTakeIncrementalCheckpoint(checkpointType, out var guid2) & checkpointResult.token == guid2;
+
+ if (tryIncremental)
{
- storeCheckpointResult = await store.TakeHybridLogCheckpointAsync(checkpointType, tryIncremental);
- if (serverOptions.EnableCluster && clusterProvider.IsPrimary()) EnqueueCommit(AofEntryType.MainStoreCheckpointEndCommit, store.CurrentVersion);
+ sm = objectStore == null ?
+ Checkpoint.IncrementalHybridLogOnly(store, checkpointResult.token) :
+ Checkpoint.IncrementalHybridLogOnly(store, objectStore, checkpointResult.token);
}
-
- if (objectStore != null && (storeType == StoreType.Object || storeType == StoreType.All))
+ else
{
- objectStoreCheckpointResult = await objectStore.TakeHybridLogCheckpointAsync(checkpointType, tryIncremental);
- if (serverOptions.EnableCluster && clusterProvider.IsPrimary()) EnqueueCommit(AofEntryType.ObjectStoreCheckpointEndCommit, objectStore.CurrentVersion);
+ sm = objectStore == null ?
+ Checkpoint.HybridLogOnly(store, checkpointType, out checkpointResult.token) :
+ Checkpoint.HybridLogOnly(store, objectStore, checkpointType, out checkpointResult.token);
}
}
+ checkpointResult.success = await serverOptions.StateMachineDriver.RunAsync(sm);
+
+ if (serverOptions.EnableCluster && clusterProvider.IsPrimary())
+ {
+ EnqueueCommit(AofEntryType.MainStoreCheckpointEndCommit, store.CurrentVersion);
+ if (objectStore != null)
+ EnqueueCommit(AofEntryType.ObjectStoreCheckpointEndCommit, objectStore.CurrentVersion);
+ }
+
// If cluster is enabled the replication manager is responsible for truncating AOF
if (serverOptions.EnableCluster && serverOptions.EnableAOF)
{
- clusterProvider.SafeTruncateAOF(storeType, full, CheckpointCoveredAofAddress, storeCheckpointResult.token, objectStoreCheckpointResult.token);
+ clusterProvider.SafeTruncateAOF(StoreType.All, full, CheckpointCoveredAofAddress, checkpointResult.token, checkpointResult.token);
}
else
{
diff --git a/libs/server/Transaction/TransactionManager.cs b/libs/server/Transaction/TransactionManager.cs
index 1119cf84db..5e9d0ead44 100644
--- a/libs/server/Transaction/TransactionManager.cs
+++ b/libs/server/Transaction/TransactionManager.cs
@@ -292,6 +292,35 @@ internal void GetKeysForValidation(byte* recvBufferPtr, out ArgSlice[] keys, out
readOnly = keyEntries.IsReadOnly;
}
+ void BeginLockable(StoreType transactionStoreType)
+ {
+ while (true)
+ {
+ if (transactionStoreType is StoreType.All or StoreType.Main)
+ {
+ lockableContext.BeginLockable();
+ }
+ if (transactionStoreType is StoreType.All or StoreType.Object)
+ {
+ if (objectStoreBasicContext.IsNull)
+ throw new Exception("Trying to perform object store transaction with object store disabled");
+ if (objectStoreLockableContext.TryBeginLockable())
+ {
+ // If we managed to begin lockable for the object store, we MUST be in the same version as the main store
+ break;
+ }
+ objectStoreLockableContext.Refresh();
+ if (transactionStoreType is StoreType.All or StoreType.Main)
+ {
+ lockableContext.EndLockable();
+ lockableContext.Refresh();
+ }
+ continue;
+ }
+ break;
+ }
+ }
+
internal bool Run(bool internal_txn = false, bool fail_fast_on_lock = false, TimeSpan lock_timeout = default)
{
// Save watch keys to lock list
@@ -299,16 +328,7 @@ internal bool Run(bool internal_txn = false, bool fail_fast_on_lock = false, Tim
watchContainer.SaveKeysToLock(this);
// Acquire lock sessions
- if (transactionStoreType == StoreType.All || transactionStoreType == StoreType.Main)
- {
- lockableContext.BeginLockable();
- }
- if (transactionStoreType == StoreType.All || transactionStoreType == StoreType.Object)
- {
- if (objectStoreBasicContext.IsNull)
- throw new Exception("Trying to perform object store transaction with object store disabled");
- objectStoreLockableContext.BeginLockable();
- }
+ BeginLockable(transactionStoreType);
bool lockSuccess;
if (fail_fast_on_lock)
diff --git a/libs/storage/Tsavorite/cs/src/core/Allocator/IStreamingSnapshotIteratorFunctions.cs b/libs/storage/Tsavorite/cs/src/core/Allocator/IStreamingSnapshotIteratorFunctions.cs
index cd95e696be..422353aa74 100644
--- a/libs/storage/Tsavorite/cs/src/core/Allocator/IStreamingSnapshotIteratorFunctions.cs
+++ b/libs/storage/Tsavorite/cs/src/core/Allocator/IStreamingSnapshotIteratorFunctions.cs
@@ -13,9 +13,9 @@ public interface IStreamingSnapshotIteratorFunctions
/// Iteration is starting.
/// Checkpoint token
/// Current version of database
- /// Target version of database
+ /// Next version of database
/// True to continue iteration, else false
- bool OnStart(Guid checkpointToken, long currentVersion, long targetVersion);
+ bool OnStart(Guid checkpointToken, long currentVersion, long nextVersion);
/// Next record in the streaming snapshot.
/// Reference to the current record's key
diff --git a/libs/storage/Tsavorite/cs/src/core/ClientSession/ClientSession.cs b/libs/storage/Tsavorite/cs/src/core/ClientSession/ClientSession.cs
index 56296b2438..5e11351dd0 100644
--- a/libs/storage/Tsavorite/cs/src/core/ClientSession/ClientSession.cs
+++ b/libs/storage/Tsavorite/cs/src/core/ClientSession/ClientSession.cs
@@ -67,6 +67,27 @@ internal void AcquireLockable(TSessionFunctions sessionFuncti
}
}
+ internal bool TryAcquireLockable(TSessionFunctions sessionFunctions)
+ where TSessionFunctions : ISessionFunctionsWrapper
+ {
+ CheckIsNotAcquiredLockable(sessionFunctions);
+
+ // Checkpoints cannot complete while we have active locking sessions.
+ if (IsInPreparePhase())
+ {
+ return false;
+ }
+
+ store.IncrementNumLockingSessions();
+ sessionFunctions.Ctx.isAcquiredLockable = true;
+
+ if (!IsInPreparePhase())
+ return true;
+
+ InternalReleaseLockable(sessionFunctions);
+ return false;
+ }
+
internal void ReleaseLockable(TSessionFunctions sessionFunctions)
where TSessionFunctions : ISessionFunctionsWrapper
{
diff --git a/libs/storage/Tsavorite/cs/src/core/ClientSession/ILockableContext.cs b/libs/storage/Tsavorite/cs/src/core/ClientSession/ILockableContext.cs
index 373c56ae40..e2c455675c 100644
--- a/libs/storage/Tsavorite/cs/src/core/ClientSession/ILockableContext.cs
+++ b/libs/storage/Tsavorite/cs/src/core/ClientSession/ILockableContext.cs
@@ -19,6 +19,11 @@ public interface ILockableContext
///
void BeginLockable();
+ ///
+ /// Try to begin a series of lock operations on possibly multiple keys; call before any locks are taken.
+ ///
+ bool TryBeginLockable();
+
///
/// Ends a series of lock operations on possibly multiple keys; call after all locks are released.
///
diff --git a/libs/storage/Tsavorite/cs/src/core/ClientSession/LockableContext.cs b/libs/storage/Tsavorite/cs/src/core/ClientSession/LockableContext.cs
index 5658018f4f..d12f5ff867 100644
--- a/libs/storage/Tsavorite/cs/src/core/ClientSession/LockableContext.cs
+++ b/libs/storage/Tsavorite/cs/src/core/ClientSession/LockableContext.cs
@@ -36,6 +36,9 @@ internal LockableContext(ClientSession
public void BeginLockable() => clientSession.AcquireLockable(sessionFunctions);
+ ///
+ public bool TryBeginLockable() => clientSession.TryAcquireLockable(sessionFunctions);
+
///
public void EndLockable() => clientSession.ReleaseLockable(sessionFunctions);
diff --git a/libs/storage/Tsavorite/cs/src/core/ClientSession/LockableUnsafeContext.cs b/libs/storage/Tsavorite/cs/src/core/ClientSession/LockableUnsafeContext.cs
index 9dc1fb2c23..303ec9ec6d 100644
--- a/libs/storage/Tsavorite/cs/src/core/ClientSession/LockableUnsafeContext.cs
+++ b/libs/storage/Tsavorite/cs/src/core/ClientSession/LockableUnsafeContext.cs
@@ -47,6 +47,9 @@ internal LockableUnsafeContext(ClientSession
public void BeginLockable() => clientSession.AcquireLockable(sessionFunctions);
+ ///
+ public bool TryBeginLockable() => clientSession.TryAcquireLockable(sessionFunctions);
+
///
public void EndLockable() => clientSession.ReleaseLockable(sessionFunctions);
#endregion Begin/EndLockable
diff --git a/libs/storage/Tsavorite/cs/src/core/Index/Checkpointing/Checkpoint.cs b/libs/storage/Tsavorite/cs/src/core/Index/Checkpointing/Checkpoint.cs
index 0c3e54a882..1295be2461 100644
--- a/libs/storage/Tsavorite/cs/src/core/Index/Checkpointing/Checkpoint.cs
+++ b/libs/storage/Tsavorite/cs/src/core/Index/Checkpointing/Checkpoint.cs
@@ -5,9 +5,10 @@
namespace Tsavorite.core
{
- static class Checkpoint
+ public static class Checkpoint
{
- public static IStateMachine Full(TsavoriteKV store, CheckpointType checkpointType, long targetVersion, out Guid guid)
+ #region Single-store APIs
+ public static IStateMachine Full(TsavoriteKV store, CheckpointType checkpointType, out Guid guid)
where TStoreFunctions : IStoreFunctions
where TAllocator : IAllocator
{
@@ -17,12 +18,12 @@ public static IStateMachine Full(Tsav
if (checkpointType == CheckpointType.FoldOver)
{
var backend = new FoldOverSMTask(store, guid);
- return new FullCheckpointSM(targetVersion, indexCheckpointTask, backend);
+ return new FullCheckpointSM(indexCheckpointTask, backend);
}
else if (checkpointType == CheckpointType.Snapshot)
{
var backend = new SnapshotCheckpointSMTask(store, guid);
- return new FullCheckpointSM(targetVersion, indexCheckpointTask, backend);
+ return new FullCheckpointSM(indexCheckpointTask, backend);
}
else
{
@@ -30,25 +31,25 @@ public static IStateMachine Full(Tsav
}
}
- public static IStateMachine Streaming(TsavoriteKV store, long targetVersion, out Guid guid)
+ public static IStateMachine Streaming(TsavoriteKV store, out Guid guid)
where TStoreFunctions : IStoreFunctions
where TAllocator : IAllocator
{
guid = Guid.NewGuid();
- var backend = new StreamingSnapshotCheckpointSMTask(targetVersion, store, guid);
- return new StreamingSnapshotCheckpointSM(targetVersion, backend);
+ var backend = new StreamingSnapshotCheckpointSMTask(store, guid);
+ return new StreamingSnapshotCheckpointSM(backend);
}
- public static IStateMachine IndexOnly(TsavoriteKV store, long targetVersion, out Guid guid)
+ public static IStateMachine IndexOnly(TsavoriteKV store, out Guid guid)
where TStoreFunctions : IStoreFunctions
where TAllocator : IAllocator
{
guid = Guid.NewGuid();
var indexCheckpointTask = new IndexCheckpointSMTask(store, guid);
- return new IndexCheckpointSM(targetVersion, indexCheckpointTask);
+ return new IndexCheckpointSM(indexCheckpointTask);
}
- public static IStateMachine HybridLogOnly(TsavoriteKV store, CheckpointType checkpointType, long targetVersion, out Guid guid)
+ public static IStateMachine HybridLogOnly(TsavoriteKV store, CheckpointType checkpointType, out Guid guid)
where TStoreFunctions : IStoreFunctions
where TAllocator : IAllocator
{
@@ -57,12 +58,12 @@ public static IStateMachine HybridLogOnly(store, guid);
- return new HybridLogCheckpointSM(targetVersion, backend);
+ return new HybridLogCheckpointSM(backend);
}
else if (checkpointType == CheckpointType.Snapshot)
{
var backend = new SnapshotCheckpointSMTask(store, guid);
- return new HybridLogCheckpointSM(targetVersion, backend);
+ return new HybridLogCheckpointSM(backend);
}
else
{
@@ -70,12 +71,119 @@ public static IStateMachine HybridLogOnly(TsavoriteKV store, long targetVersion, Guid guid)
+ public static IStateMachine IncrementalHybridLogOnly(TsavoriteKV store, Guid guid)
where TStoreFunctions : IStoreFunctions
where TAllocator : IAllocator
{
var backend = new IncrementalSnapshotCheckpointSMTask(store, guid);
- return new HybridLogCheckpointSM(targetVersion, backend);
+ return new HybridLogCheckpointSM(backend);
}
+ #endregion
+
+ #region Two-store APIs
+ public static IStateMachine Full(
+ TsavoriteKV store1,
+ TsavoriteKV store2,
+ CheckpointType checkpointType, out Guid guid)
+ where TStoreFunctions1 : IStoreFunctions
+ where TAllocator1 : IAllocator
+ where TStoreFunctions2 : IStoreFunctions
+ where TAllocator2 : IAllocator
+ {
+ guid = Guid.NewGuid();
+ var indexCheckpointTask1 = new IndexCheckpointSMTask(store1, guid);
+ var indexCheckpointTask2 = new IndexCheckpointSMTask(store2, guid);
+
+ if (checkpointType == CheckpointType.FoldOver)
+ {
+ var backend1 = new FoldOverSMTask(store1, guid);
+ var backend2 = new FoldOverSMTask(store2, guid);
+ return new FullCheckpointSM(indexCheckpointTask1, indexCheckpointTask2, backend1, backend2);
+ }
+ else if (checkpointType == CheckpointType.Snapshot)
+ {
+ var backend1 = new SnapshotCheckpointSMTask(store1, guid);
+ var backend2 = new SnapshotCheckpointSMTask(store2, guid);
+ return new FullCheckpointSM(indexCheckpointTask1, indexCheckpointTask2, backend1, backend2);
+ }
+ else
+ {
+ throw new TsavoriteException("Invalid checkpoint type");
+ }
+ }
+
+ public static IStateMachine Streaming(
+ TsavoriteKV store1,
+ TsavoriteKV store2,
+ out Guid guid)
+ where TStoreFunctions1 : IStoreFunctions
+ where TAllocator1 : IAllocator
+ where TStoreFunctions2 : IStoreFunctions
+ where TAllocator2 : IAllocator
+ {
+ guid = Guid.NewGuid();
+ var backend1 = new StreamingSnapshotCheckpointSMTask(store1, guid);
+ var backend2 = new StreamingSnapshotCheckpointSMTask(store2, guid);
+ return new StreamingSnapshotCheckpointSM(backend1, backend2);
+ }
+
+ public static IStateMachine IndexOnly(
+ TsavoriteKV store1,
+ TsavoriteKV store2,
+ out Guid guid)
+ where TStoreFunctions1 : IStoreFunctions
+ where TAllocator1 : IAllocator
+ where TStoreFunctions2 : IStoreFunctions
+ where TAllocator2 : IAllocator
+ {
+ guid = Guid.NewGuid();
+ var indexCheckpointTask1 = new IndexCheckpointSMTask(store1, guid);
+ var indexCheckpointTask2 = new IndexCheckpointSMTask(store2, guid);
+ return new IndexCheckpointSM(indexCheckpointTask1, indexCheckpointTask2);
+ }
+
+ public static IStateMachine HybridLogOnly(
+ TsavoriteKV store1,
+ TsavoriteKV store2,
+ CheckpointType checkpointType, out Guid guid)
+ where TStoreFunctions1 : IStoreFunctions
+ where TAllocator1 : IAllocator
+ where TStoreFunctions2 : IStoreFunctions
+ where TAllocator2 : IAllocator
+ {
+ guid = Guid.NewGuid();
+
+ if (checkpointType == CheckpointType.FoldOver)
+ {
+ var backend1 = new FoldOverSMTask(store1, guid);
+ var backend2 = new FoldOverSMTask(store2, guid);
+ return new HybridLogCheckpointSM(backend1, backend2);
+ }
+ else if (checkpointType == CheckpointType.Snapshot)
+ {
+ var backend1 = new SnapshotCheckpointSMTask(store1, guid);
+ var backend2 = new SnapshotCheckpointSMTask(store2, guid);
+ return new HybridLogCheckpointSM(backend1, backend2);
+ }
+ else
+ {
+ throw new TsavoriteException("Invalid checkpoint type");
+ }
+ }
+
+ public static IStateMachine IncrementalHybridLogOnly(
+ TsavoriteKV store1,
+ TsavoriteKV store2,
+ Guid guid)
+ where TStoreFunctions1 : IStoreFunctions
+ where TAllocator1 : IAllocator
+ where TStoreFunctions2 : IStoreFunctions
+ where TAllocator2 : IAllocator
+ {
+ var backend1 = new IncrementalSnapshotCheckpointSMTask(store1, guid);
+ var backend2 = new IncrementalSnapshotCheckpointSMTask(store2, guid);
+ return new HybridLogCheckpointSM(backend1, backend2);
+ }
+ #endregion
}
}
\ No newline at end of file
diff --git a/libs/storage/Tsavorite/cs/src/core/Index/Checkpointing/FullCheckpointSM.cs b/libs/storage/Tsavorite/cs/src/core/Index/Checkpointing/FullCheckpointSM.cs
index f35f63373c..8b1889714f 100644
--- a/libs/storage/Tsavorite/cs/src/core/Index/Checkpointing/FullCheckpointSM.cs
+++ b/libs/storage/Tsavorite/cs/src/core/Index/Checkpointing/FullCheckpointSM.cs
@@ -9,13 +9,11 @@ namespace Tsavorite.core
internal sealed class FullCheckpointSM : HybridLogCheckpointSM
{
///
- /// Construct a new FullCheckpointStateMachine to use the given set of checkpoint tasks,
- /// drawing boundary at targetVersion.
+ /// Construct a new FullCheckpointStateMachine to use the given set of checkpoint tasks.
///
- /// upper limit (inclusive) of the version included
/// Tasks
- public FullCheckpointSM(long targetVersion = -1, params IStateMachineTask[] tasks)
- : base(targetVersion, tasks)
+ public FullCheckpointSM(params IStateMachineTask[] tasks)
+ : base(tasks)
{ }
///
diff --git a/libs/storage/Tsavorite/cs/src/core/Index/Checkpointing/HybridLogCheckpointSM.cs b/libs/storage/Tsavorite/cs/src/core/Index/Checkpointing/HybridLogCheckpointSM.cs
index 501f79152b..76dd21734d 100644
--- a/libs/storage/Tsavorite/cs/src/core/Index/Checkpointing/HybridLogCheckpointSM.cs
+++ b/libs/storage/Tsavorite/cs/src/core/Index/Checkpointing/HybridLogCheckpointSM.cs
@@ -11,10 +11,9 @@ internal class HybridLogCheckpointSM : VersionChangeSM
///
/// Construct a new HybridLogCheckpointStateMachine with the given tasks. Does not load any tasks by default.
///
- /// upper limit (inclusive) of the version included
/// The tasks to load onto the state machine
- public HybridLogCheckpointSM(long targetVersion, params IStateMachineTask[] tasks)
- : base(targetVersion, tasks) { }
+ public HybridLogCheckpointSM(params IStateMachineTask[] tasks)
+ : base(tasks) { }
///
public override SystemState NextState(SystemState start)
diff --git a/libs/storage/Tsavorite/cs/src/core/Index/Checkpointing/IndexCheckpointSM.cs b/libs/storage/Tsavorite/cs/src/core/Index/Checkpointing/IndexCheckpointSM.cs
index 1a300e9bbf..4e1f46a8bf 100644
--- a/libs/storage/Tsavorite/cs/src/core/Index/Checkpointing/IndexCheckpointSM.cs
+++ b/libs/storage/Tsavorite/cs/src/core/Index/Checkpointing/IndexCheckpointSM.cs
@@ -11,8 +11,8 @@ internal sealed class IndexCheckpointSM : StateMachineBase
///
/// Create a new IndexSnapshotStateMachine
///
- public IndexCheckpointSM(long targetVersion = -1, params IStateMachineTask[] tasks)
- : base(targetVersion, tasks)
+ public IndexCheckpointSM(params IStateMachineTask[] tasks)
+ : base(tasks)
{
}
diff --git a/libs/storage/Tsavorite/cs/src/core/Index/Checkpointing/IndexResizeSM.cs b/libs/storage/Tsavorite/cs/src/core/Index/Checkpointing/IndexResizeSM.cs
index 0bff11bdcd..d15856d311 100644
--- a/libs/storage/Tsavorite/cs/src/core/Index/Checkpointing/IndexResizeSM.cs
+++ b/libs/storage/Tsavorite/cs/src/core/Index/Checkpointing/IndexResizeSM.cs
@@ -11,7 +11,7 @@ internal sealed class IndexResizeSM : StateMachineBase
///
/// Constructs a new IndexResizeStateMachine
///
- public IndexResizeSM(int targetVersion = -1, params IStateMachineTask[] tasks) : base(targetVersion, tasks)
+ public IndexResizeSM(params IStateMachineTask[] tasks) : base(tasks)
{
}
diff --git a/libs/storage/Tsavorite/cs/src/core/Index/Checkpointing/StateMachineBase.cs b/libs/storage/Tsavorite/cs/src/core/Index/Checkpointing/StateMachineBase.cs
index 7b0add4b56..f031e40f8a 100644
--- a/libs/storage/Tsavorite/cs/src/core/Index/Checkpointing/StateMachineBase.cs
+++ b/libs/storage/Tsavorite/cs/src/core/Index/Checkpointing/StateMachineBase.cs
@@ -10,17 +10,14 @@ namespace Tsavorite.core
internal abstract class StateMachineBase : IStateMachine
{
readonly IStateMachineTask[] tasks;
- protected long toVersion;
///
/// Construct a new SynchronizationStateMachine with the given tasks. The order of tasks given is the
/// order they are executed on each state machine.
///
- /// To version
/// The ISynchronizationTasks to run on the state machine
- protected StateMachineBase(long toVersion = -1, params IStateMachineTask[] tasks)
+ protected StateMachineBase(params IStateMachineTask[] tasks)
{
- this.toVersion = toVersion;
this.tasks = tasks;
}
diff --git a/libs/storage/Tsavorite/cs/src/core/Index/Checkpointing/StateMachineDriver.cs b/libs/storage/Tsavorite/cs/src/core/Index/Checkpointing/StateMachineDriver.cs
index ea05c8fc19..83293292c9 100644
--- a/libs/storage/Tsavorite/cs/src/core/Index/Checkpointing/StateMachineDriver.cs
+++ b/libs/storage/Tsavorite/cs/src/core/Index/Checkpointing/StateMachineDriver.cs
@@ -29,10 +29,10 @@ public class StateMachineDriver
public SystemState SystemState => SystemState.Copy(ref systemState);
- public StateMachineDriver(LightEpoch epoch, SystemState initialState, ILogger logger = null)
+ public StateMachineDriver(LightEpoch epoch, ILogger logger = null)
{
this.epoch = epoch;
- this.systemState = initialState;
+ this.systemState = SystemState.Make(Phase.REST, 1);
this.waitingList = [];
this.logger = logger;
}
diff --git a/libs/storage/Tsavorite/cs/src/core/Index/Checkpointing/StreamingSnapshotCheckpointSM.cs b/libs/storage/Tsavorite/cs/src/core/Index/Checkpointing/StreamingSnapshotCheckpointSM.cs
index f8e74f3176..bc0d401e76 100644
--- a/libs/storage/Tsavorite/cs/src/core/Index/Checkpointing/StreamingSnapshotCheckpointSM.cs
+++ b/libs/storage/Tsavorite/cs/src/core/Index/Checkpointing/StreamingSnapshotCheckpointSM.cs
@@ -9,11 +9,10 @@ namespace Tsavorite.core
class StreamingSnapshotCheckpointSM : VersionChangeSM
{
///
- /// Construct a new StreamingSnapshotCheckpointStateMachine, drawing boundary at targetVersion.
+ /// Construct a new StreamingSnapshotCheckpointStateMachine.
///
- /// upper limit (inclusive) of the version included
- public StreamingSnapshotCheckpointSM(long targetVersion, IStateMachineTask backend)
- : base(targetVersion, backend)
+ public StreamingSnapshotCheckpointSM(params IStateMachineTask[] tasks)
+ : base(tasks)
{ }
///
diff --git a/libs/storage/Tsavorite/cs/src/core/Index/Checkpointing/StreamingSnapshotCheckpointSMTask.cs b/libs/storage/Tsavorite/cs/src/core/Index/Checkpointing/StreamingSnapshotCheckpointSMTask.cs
index b4d99098c6..f2a5ca871d 100644
--- a/libs/storage/Tsavorite/cs/src/core/Index/Checkpointing/StreamingSnapshotCheckpointSMTask.cs
+++ b/libs/storage/Tsavorite/cs/src/core/Index/Checkpointing/StreamingSnapshotCheckpointSMTask.cs
@@ -17,12 +17,9 @@ sealed class StreamingSnapshotCheckpointSMTask
where TAllocator : IAllocator
{
- readonly long targetVersion;
-
- public StreamingSnapshotCheckpointSMTask(long targetVersion, TsavoriteKV store, Guid guid)
+ public StreamingSnapshotCheckpointSMTask(TsavoriteKV store, Guid guid)
: base(store, guid)
{
- this.targetVersion = targetVersion;
}
///
@@ -31,15 +28,22 @@ public override void GlobalBeforeEnteringState(SystemState next, StateMachineDri
switch (next.Phase)
{
case Phase.PREPARE:
+ base.GlobalBeforeEnteringState(next, stateMachineDriver);
store._lastSnapshotCheckpoint.Dispose();
store._hybridLogCheckpointToken = guid;
store.InitializeHybridLogCheckpoint(store._hybridLogCheckpointToken, next.Version);
- store._hybridLogCheckpoint.info.version = next.Version;
- store._hybridLogCheckpoint.info.nextVersion = targetVersion == -1 ? next.Version + 1 : targetVersion;
+ store._hybridLogCheckpoint.info.nextVersion = next.Version + 1;
store.StreamingSnapshotScanPhase1();
break;
+
+ case Phase.IN_PROGRESS:
+ if (store._hybridLogCheckpoint.info.nextVersion != next.Version)
+ throw new TsavoriteException($"StreamingSnapshotCheckpointSMTask: IN_PROGRESS phase with incorrect version {next.Version}, expected {store._hybridLogCheckpoint.info.nextVersion}");
+ base.GlobalBeforeEnteringState(next, stateMachineDriver);
+ break;
case Phase.WAIT_FLUSH:
+ base.GlobalBeforeEnteringState(next, stateMachineDriver);
var finalLogicalAddress = store.hlogBase.GetTailAddress();
store.StreamingSnapshotScanPhase2(finalLogicalAddress);
break;
diff --git a/libs/storage/Tsavorite/cs/src/core/Index/Checkpointing/StreamingSnapshotTsavoriteKV.cs b/libs/storage/Tsavorite/cs/src/core/Index/Checkpointing/StreamingSnapshotTsavoriteKV.cs
index 56694ed8ec..a6dcda90a8 100644
--- a/libs/storage/Tsavorite/cs/src/core/Index/Checkpointing/StreamingSnapshotTsavoriteKV.cs
+++ b/libs/storage/Tsavorite/cs/src/core/Index/Checkpointing/StreamingSnapshotTsavoriteKV.cs
@@ -24,15 +24,15 @@ class ScanPhase1Functions : IScanIteratorFunctions
readonly IStreamingSnapshotIteratorFunctions streamingSnapshotIteratorFunctions;
readonly Guid checkpointToken;
readonly long currentVersion;
- readonly long targetVersion;
+ readonly long nextVersion;
public long numberOfRecords;
- public ScanPhase1Functions(IStreamingSnapshotIteratorFunctions streamingSnapshotIteratorFunctions, Guid checkpointToken, long currentVersion, long targetVersion)
+ public ScanPhase1Functions(IStreamingSnapshotIteratorFunctions streamingSnapshotIteratorFunctions, Guid checkpointToken, long currentVersion, long nextVersion)
{
this.streamingSnapshotIteratorFunctions = streamingSnapshotIteratorFunctions;
this.checkpointToken = checkpointToken;
this.currentVersion = currentVersion;
- this.targetVersion = targetVersion;
+ this.nextVersion = nextVersion;
}
///
@@ -52,7 +52,7 @@ public void OnException(Exception exception, long numberOfRecords)
///
public bool OnStart(long beginAddress, long endAddress)
- => streamingSnapshotIteratorFunctions.OnStart(checkpointToken, currentVersion, targetVersion);
+ => streamingSnapshotIteratorFunctions.OnStart(checkpointToken, currentVersion, nextVersion);
///
public void OnStop(bool completed, long numberOfRecords)
diff --git a/libs/storage/Tsavorite/cs/src/core/Index/Checkpointing/VersionChangeSM.cs b/libs/storage/Tsavorite/cs/src/core/Index/Checkpointing/VersionChangeSM.cs
index 7197760f5b..67802d0f8b 100644
--- a/libs/storage/Tsavorite/cs/src/core/Index/Checkpointing/VersionChangeSM.cs
+++ b/libs/storage/Tsavorite/cs/src/core/Index/Checkpointing/VersionChangeSM.cs
@@ -11,9 +11,8 @@ internal class VersionChangeSM : StateMachineBase
///
/// Construct a new VersionChangeStateMachine with the given tasks. Does not load any tasks by default.
///
- /// upper limit (inclusive) of the version included
/// The tasks to load onto the state machine
- protected VersionChangeSM(long targetVersion = -1, params IStateMachineTask[] tasks) : base(targetVersion, tasks)
+ protected VersionChangeSM(params IStateMachineTask[] tasks) : base(tasks)
{
}
@@ -28,8 +27,7 @@ public override SystemState NextState(SystemState start)
break;
case Phase.PREPARE:
nextState.Phase = Phase.IN_PROGRESS;
- if (toVersion == -1) toVersion = start.Version + 1;
- nextState.Version = toVersion;
+ nextState.Version = start.Version + 1;
break;
case Phase.IN_PROGRESS:
nextState.Phase = Phase.REST;
diff --git a/libs/storage/Tsavorite/cs/src/core/Index/Common/KVSettings.cs b/libs/storage/Tsavorite/cs/src/core/Index/Common/KVSettings.cs
index 43d5eab5a1..bfb6d9468a 100644
--- a/libs/storage/Tsavorite/cs/src/core/Index/Common/KVSettings.cs
+++ b/libs/storage/Tsavorite/cs/src/core/Index/Common/KVSettings.cs
@@ -119,6 +119,16 @@ public sealed class KVSettings : IDisposable
///
public RevivificationSettings RevivificationSettings;
+ ///
+ /// Epoch instance used by the store
+ ///
+ public LightEpoch Epoch = null;
+
+ ///
+ /// State machine driver for the store
+ ///
+ public StateMachineDriver StateMachineDriver = null;
+
///
/// Create default configuration settings for TsavoriteKV. You need to create and specify LogDevice
/// explicitly with this API.
diff --git a/libs/storage/Tsavorite/cs/src/core/Index/Tsavorite/Tsavorite.cs b/libs/storage/Tsavorite/cs/src/core/Index/Tsavorite/Tsavorite.cs
index fcd0f5cb87..d89a621798 100644
--- a/libs/storage/Tsavorite/cs/src/core/Index/Tsavorite/Tsavorite.cs
+++ b/libs/storage/Tsavorite/cs/src/core/Index/Tsavorite/Tsavorite.cs
@@ -87,7 +87,7 @@ internal void IncrementNumLockingSessions()
/// Store-level user function implementations
/// Func to call to create the allocator(s, if doing readcache)
public TsavoriteKV(KVSettings kvSettings, TStoreFunctions storeFunctions, Func allocatorFactory)
- : base(kvSettings.logger ?? kvSettings.loggerFactory?.CreateLogger("TsavoriteKV Index Overflow buckets"))
+ : base(kvSettings.Epoch, kvSettings.logger ?? kvSettings.loggerFactory?.CreateLogger("TsavoriteKV Index Overflow buckets"))
{
this.allocatorFactory = allocatorFactory;
loggerFactory = kvSettings.loggerFactory;
@@ -158,7 +158,7 @@ public TsavoriteKV(KVSettings kvSettings, TStoreFunctions storeFun
LockTable = new OverflowBucketLockTable(this);
RevivificationManager = new(this, isFixedLenReviv, kvSettings.RevivificationSettings, logSettings);
- stateMachineDriver = new(epoch, SystemState.Make(Phase.REST, 1), kvSettings.logger ?? kvSettings.loggerFactory?.CreateLogger($"StateMachineDriver"));
+ stateMachineDriver = kvSettings.StateMachineDriver ?? new(epoch, kvSettings.logger ?? kvSettings.loggerFactory?.CreateLogger($"StateMachineDriver"));
if (kvSettings.TryRecoverLatest)
{
@@ -181,18 +181,13 @@ public TsavoriteKV(KVSettings kvSettings, TStoreFunctions storeFun
///
/// Checkpoint token
/// Checkpoint type
- ///
- /// intended version number of the next version. Checkpoint will not execute if supplied version is not larger
- /// than current version. Actual new version may have version number greater than supplied number. If the supplied
- /// number is -1, checkpoint will unconditionally create a new version.
- ///
/// Iterator for streaming snapshot records
///
/// Whether we successfully initiated the checkpoint (initiation may
/// fail if we are already taking a checkpoint or performing some other
/// operation such as growing the index). Use CompleteCheckpointAsync to wait completion.
///
- public bool TryInitiateFullCheckpoint(out Guid token, CheckpointType checkpointType, long targetVersion = -1, IStreamingSnapshotIteratorFunctions streamingSnapshotIteratorFunctions = null)
+ public bool TryInitiateFullCheckpoint(out Guid token, CheckpointType checkpointType, IStreamingSnapshotIteratorFunctions streamingSnapshotIteratorFunctions = null)
{
IStateMachine stateMachine;
@@ -201,11 +196,11 @@ public bool TryInitiateFullCheckpoint(out Guid token, CheckpointType checkpointT
if (streamingSnapshotIteratorFunctions is null)
throw new TsavoriteException("StreamingSnapshot checkpoint requires a streaming snapshot iterator");
this.streamingSnapshotIteratorFunctions = streamingSnapshotIteratorFunctions;
- stateMachine = Checkpoint.Streaming(this, targetVersion, out token);
+ stateMachine = Checkpoint.Streaming(this, out token);
}
else
{
- stateMachine = Checkpoint.Full(this, checkpointType, targetVersion, out token);
+ stateMachine = Checkpoint.Full(this, checkpointType, out token);
}
return stateMachineDriver.Register(stateMachine);
}
@@ -215,11 +210,6 @@ public bool TryInitiateFullCheckpoint(out Guid token, CheckpointType checkpointT
///
/// Checkpoint type
/// Cancellation token
- ///
- /// intended version number of the next version. Checkpoint will not execute if supplied version is not larger
- /// than current version. Actual new version may have version number greater than supplied number. If the supplied
- /// number is -1, checkpoint will unconditionally create a new version.
- ///
/// Iterator for streaming snapshot records
///
/// (bool success, Guid token)
@@ -230,9 +220,9 @@ public bool TryInitiateFullCheckpoint(out Guid token, CheckpointType checkpointT
/// Await task to complete checkpoint, if initiated successfully
///
public async ValueTask<(bool success, Guid token)> TakeFullCheckpointAsync(CheckpointType checkpointType,
- CancellationToken cancellationToken = default, long targetVersion = -1, IStreamingSnapshotIteratorFunctions streamingSnapshotIteratorFunctions = null)
+ CancellationToken cancellationToken = default, IStreamingSnapshotIteratorFunctions streamingSnapshotIteratorFunctions = null)
{
- var success = TryInitiateFullCheckpoint(out Guid token, checkpointType, targetVersion, streamingSnapshotIteratorFunctions);
+ var success = TryInitiateFullCheckpoint(out Guid token, checkpointType, streamingSnapshotIteratorFunctions);
if (success)
await CompleteCheckpointAsync(cancellationToken).ConfigureAwait(false);
@@ -247,7 +237,7 @@ public bool TryInitiateFullCheckpoint(out Guid token, CheckpointType checkpointT
/// Whether we could initiate the checkpoint. Use CompleteCheckpointAsync to wait completion.
public bool TryInitiateIndexCheckpoint(out Guid token)
{
- var stateMachine = Checkpoint.IndexOnly(this, -1, out token);
+ var stateMachine = Checkpoint.IndexOnly(this, out token);
return stateMachineDriver.Register(stateMachine);
}
@@ -279,14 +269,9 @@ public bool TryInitiateIndexCheckpoint(out Guid token)
/// Checkpoint token
/// Checkpoint type
/// For snapshot, try to store as incremental delta over last snapshot
- ///
- /// intended version number of the next version. Checkpoint will not execute if supplied version is not larger
- /// than current version. Actual new version may have version number greater than supplied number. If the supplied
- /// number is -1, checkpoint will unconditionally create a new version.
- ///
/// Whether we could initiate the checkpoint. Use CompleteCheckpointAsync to wait completion.
public bool TryInitiateHybridLogCheckpoint(out Guid token, CheckpointType checkpointType, bool tryIncremental = false,
- long targetVersion = -1, IStreamingSnapshotIteratorFunctions streamingSnapshotIteratorFunctions = null)
+ IStreamingSnapshotIteratorFunctions streamingSnapshotIteratorFunctions = null)
{
IStateMachine stateMachine;
@@ -295,7 +280,7 @@ public bool TryInitiateHybridLogCheckpoint(out Guid token, CheckpointType checkp
if (streamingSnapshotIteratorFunctions is null)
throw new TsavoriteException("StreamingSnapshot checkpoint requires a streaming snapshot iterator");
this.streamingSnapshotIteratorFunctions = streamingSnapshotIteratorFunctions;
- stateMachine = Checkpoint.Streaming(this, targetVersion, out token);
+ stateMachine = Checkpoint.Streaming(this, out token);
}
else
{
@@ -307,27 +292,37 @@ public bool TryInitiateHybridLogCheckpoint(out Guid token, CheckpointType checkp
&& !hlog.HasObjectLog;
if (incremental)
{
- stateMachine = Checkpoint.IncrementalHybridLogOnly(this, targetVersion, token);
+ stateMachine = Checkpoint.IncrementalHybridLogOnly(this, token);
}
else
{
- stateMachine = Checkpoint.HybridLogOnly(this, checkpointType, targetVersion, out token);
+ stateMachine = Checkpoint.HybridLogOnly(this, checkpointType, out token);
}
}
return stateMachineDriver.Register(stateMachine);
}
+ ///
+ /// Whether we can take an incremental snapshot checkpoint given current state of the store
+ ///
+ ///
+ ///
+ public bool CanTakeIncrementalCheckpoint(CheckpointType checkpointType, out Guid guid)
+ {
+ guid = _lastSnapshotCheckpoint.info.guid;
+ return
+ checkpointType == CheckpointType.Snapshot
+ && guid != default
+ && _lastSnapshotCheckpoint.info.finalLogicalAddress > hlogBase.FlushedUntilAddress
+ && !hlog.HasObjectLog;
+ }
+
///
/// Take log-only checkpoint
///
/// Checkpoint type
/// For snapshot, try to store as incremental delta over last snapshot
/// Cancellation token
- ///
- /// intended version number of the next version. Checkpoint will not execute if supplied version is not larger
- /// than current version. Actual new version may have version number greater than supplied number. If the supplied
- /// number is -1, checkpoint will unconditionally create a new version.
- ///
///
/// (bool success, Guid token)
/// success: Whether we successfully initiated the checkpoint (initiation may
@@ -337,9 +332,9 @@ public bool TryInitiateHybridLogCheckpoint(out Guid token, CheckpointType checkp
/// Await task to complete checkpoint, if initiated successfully
///
public async ValueTask<(bool success, Guid token)> TakeHybridLogCheckpointAsync(CheckpointType checkpointType,
- bool tryIncremental = false, CancellationToken cancellationToken = default, long targetVersion = -1)
+ bool tryIncremental = false, CancellationToken cancellationToken = default)
{
- var success = TryInitiateHybridLogCheckpoint(out Guid token, checkpointType, tryIncremental, targetVersion);
+ var success = TryInitiateHybridLogCheckpoint(out Guid token, checkpointType, tryIncremental);
if (success)
await CompleteCheckpointAsync(cancellationToken).ConfigureAwait(false);
@@ -585,7 +580,7 @@ public async Task GrowIndexAsync()
throw new TsavoriteException("Cannot use GrowIndex when using non-async sessions");
var indexResizeTask = new IndexResizeSMTask(this);
- var indexResizeSM = new IndexResizeSM(-1, indexResizeTask);
+ var indexResizeSM = new IndexResizeSM(indexResizeTask);
return await stateMachineDriver.RunAsync(indexResizeSM);
}
diff --git a/libs/storage/Tsavorite/cs/src/core/Index/Tsavorite/TsavoriteBase.cs b/libs/storage/Tsavorite/cs/src/core/Index/Tsavorite/TsavoriteBase.cs
index 12efdb2850..d8ac3e5824 100644
--- a/libs/storage/Tsavorite/cs/src/core/Index/Tsavorite/TsavoriteBase.cs
+++ b/libs/storage/Tsavorite/cs/src/core/Index/Tsavorite/TsavoriteBase.cs
@@ -52,9 +52,9 @@ public unsafe partial class TsavoriteBase
///
/// Constructor
///
- public TsavoriteBase(ILogger logger = null)
+ public TsavoriteBase(LightEpoch epoch = null, ILogger logger = null)
{
- epoch = new LightEpoch();
+ this.epoch = epoch ?? new LightEpoch();
overflowBucketsAllocator = new MallocFixedPageSize(logger);
}
diff --git a/libs/storage/Tsavorite/cs/test/RecoveryChecks.cs b/libs/storage/Tsavorite/cs/test/RecoveryChecks.cs
index 4df0676d56..a13d60f9d9 100644
--- a/libs/storage/Tsavorite/cs/test/RecoveryChecks.cs
+++ b/libs/storage/Tsavorite/cs/test/RecoveryChecks.cs
@@ -958,9 +958,9 @@ public SnapshotIterator(TsavoriteKV(new MyFunctions());
bc2 = session2.BasicContext;
return true;
diff --git a/libs/storage/Tsavorite/cs/test/StateMachineBarrierTests.cs b/libs/storage/Tsavorite/cs/test/StateMachineBarrierTests.cs
index 13e695812c..7ccd56b414 100644
--- a/libs/storage/Tsavorite/cs/test/StateMachineBarrierTests.cs
+++ b/libs/storage/Tsavorite/cs/test/StateMachineBarrierTests.cs
@@ -101,8 +101,7 @@ public void StateMachineBarrierTest1()
void Prepare(out SMSimpleFunctions f,
out ClientSession s1,
out UnsafeContext uc1,
- out ThreadSession s2,
- long toVersion = -1)
+ out ThreadSession s2)
{
f = new SMSimpleFunctions();
@@ -141,7 +140,7 @@ void Prepare(out SMSimpleFunctions f,
// We should be in REST, 1
ClassicAssert.IsTrue(SystemState.Equal(SystemState.Make(Phase.REST, 1), store.SystemState));
- _ = store.TryInitiateHybridLogCheckpoint(out _, CheckpointType.FoldOver, targetVersion: toVersion);
+ _ = store.TryInitiateHybridLogCheckpoint(out _, CheckpointType.FoldOver);
// Wait for PREPARE phase
while (!SystemState.Equal(SystemState.Make(Phase.PREPARE, 1), store.SystemState))
diff --git a/test/Garnet.test.cluster/ReplicationTests/ClusterReplicationDisklessSyncTests.cs b/test/Garnet.test.cluster/ReplicationTests/ClusterReplicationDisklessSyncTests.cs
index a8075f0716..389fa5d55d 100644
--- a/test/Garnet.test.cluster/ReplicationTests/ClusterReplicationDisklessSyncTests.cs
+++ b/test/Garnet.test.cluster/ReplicationTests/ClusterReplicationDisklessSyncTests.cs
@@ -236,7 +236,11 @@ public void ClusterDBVersionAlignmentDisklessSync([Values] bool disableObjects,
// Validate db version
var primaryVersion = context.clusterTestUtils.GetStoreCurrentVersion(primaryIndex, isMainStore: true, logger: context.logger);
var replicaOneVersion = context.clusterTestUtils.GetStoreCurrentVersion(replicaOneIndex, isMainStore: true, logger: context.logger);
- ClassicAssert.AreEqual(2, primaryVersion);
+
+ // With unified store, versions increase per scan (main and object)
+ // so expected versions depend on whether objects are disabled or not
+ var expectedVersion1 = disableObjects ? 2 : 3;
+ ClassicAssert.AreEqual(expectedVersion1, primaryVersion);
ClassicAssert.AreEqual(primaryVersion, replicaOneVersion);
// Reset and re-attach replica as primary
@@ -258,9 +262,13 @@ public void ClusterDBVersionAlignmentDisklessSync([Values] bool disableObjects,
primaryVersion = context.clusterTestUtils.GetStoreCurrentVersion(primaryIndex, isMainStore: true, logger: context.logger);
replicaOneVersion = context.clusterTestUtils.GetStoreCurrentVersion(replicaOneIndex, isMainStore: true, logger: context.logger);
var replicaTwoVersion = context.clusterTestUtils.GetStoreCurrentVersion(replicaTwoIndex, isMainStore: true, logger: context.logger);
- ClassicAssert.AreEqual(3, primaryVersion);
+
+ // With unified store, versions increase per scan (main and object)
+ // so expected versions depend on whether objects are disabled or not
+ var expectedVersion2 = disableObjects ? 3 : 5;
+ ClassicAssert.AreEqual(expectedVersion2, primaryVersion);
ClassicAssert.AreEqual(primaryVersion, replicaTwoVersion);
- ClassicAssert.AreEqual(2, replicaOneVersion);
+ ClassicAssert.AreEqual(expectedVersion1, replicaOneVersion);
// Re-attach first replica
_ = context.clusterTestUtils.ClusterReplicate(replicaNodeIndex: replicaOneIndex, primaryNodeIndex: primaryIndex, logger: context.logger);
@@ -271,7 +279,11 @@ public void ClusterDBVersionAlignmentDisklessSync([Values] bool disableObjects,
primaryVersion = context.clusterTestUtils.GetStoreCurrentVersion(primaryIndex, isMainStore: true, logger: context.logger);
replicaOneVersion = context.clusterTestUtils.GetStoreCurrentVersion(replicaOneIndex, isMainStore: true, logger: context.logger);
replicaTwoVersion = context.clusterTestUtils.GetStoreCurrentVersion(replicaTwoIndex, isMainStore: true, logger: context.logger);
- ClassicAssert.AreEqual(4, primaryVersion);
+
+ // With unified store, versions increase per scan (main and object)
+ // so expected versions depend on whether objects are disabled or not
+ var expectedVersion3 = disableObjects ? 4 : 7;
+ ClassicAssert.AreEqual(expectedVersion3, primaryVersion);
ClassicAssert.AreEqual(primaryVersion, replicaOneVersion);
ClassicAssert.AreEqual(primaryVersion, replicaTwoVersion);
}