diff --git a/libs/host/GarnetServer.cs b/libs/host/GarnetServer.cs index ef5fd1a5df..b350d5ce24 100644 --- a/libs/host/GarnetServer.cs +++ b/libs/host/GarnetServer.cs @@ -227,6 +227,7 @@ private void InitializeServer() if (!setMax && !ThreadPool.SetMaxThreads(maxThreads, maxCPThreads)) throw new Exception($"Unable to call ThreadPool.SetMaxThreads with {maxThreads}, {maxCPThreads}"); + opts.Initialize(loggerFactory); CreateMainStore(clusterFactory, out var checkpointDir); CreateObjectStore(clusterFactory, customCommandManager, checkpointDir, out var objectStoreSizeTracker); @@ -324,7 +325,7 @@ private void CreateObjectStore(IClusterFactory clusterFactory, CustomCommandMana objectStoreSizeTracker = null; if (!opts.DisableObjects) { - objKvSettings = opts.GetObjectStoreSettings(this.loggerFactory?.CreateLogger("TsavoriteKV [obj]"), + objKvSettings = opts.GetObjectStoreSettings(loggerFactory, out var objHeapMemorySize, out var objReadCacheHeapMemorySize); // Run checkpoint on its own thread to control p99 diff --git a/libs/server/AOF/AofProcessor.cs b/libs/server/AOF/AofProcessor.cs index e3f8ee95c4..0472d93f73 100644 --- a/libs/server/AOF/AofProcessor.cs +++ b/libs/server/AOF/AofProcessor.cs @@ -210,11 +210,10 @@ public unsafe void ProcessAofRecordInternal(byte* ptr, int length, bool asReplic break; case AofEntryType.MainStoreCheckpointStartCommit: if (asReplica && header.storeVersion > storeWrapper.store.CurrentVersion) - _ = storeWrapper.TakeCheckpoint(false, StoreType.Main, logger); + _ = storeWrapper.TakeCheckpoint(false, logger); break; case AofEntryType.ObjectStoreCheckpointStartCommit: - if (asReplica && header.storeVersion > storeWrapper.objectStore.CurrentVersion) - _ = storeWrapper.TakeCheckpoint(false, StoreType.Object, logger); + // With unified checkpoint, we do not need to take object store checkpoint separately break; case AofEntryType.MainStoreCheckpointEndCommit: case AofEntryType.ObjectStoreCheckpointEndCommit: diff --git a/libs/server/Resp/AdminCommands.cs b/libs/server/Resp/AdminCommands.cs index 5905b61eee..676766a09a 100644 --- a/libs/server/Resp/AdminCommands.cs +++ b/libs/server/Resp/AdminCommands.cs @@ -818,7 +818,7 @@ private bool NetworkSAVE() return AbortWithWrongNumberOfArguments(nameof(RespCommand.SAVE)); } - if (!storeWrapper.TakeCheckpoint(false, StoreType.All, logger)) + if (!storeWrapper.TakeCheckpoint(false, logger)) { while (!RespWriteUtils.TryWriteError("ERR checkpoint already in progress"u8, ref dcurr, dend)) SendAndReset(); @@ -853,7 +853,7 @@ private bool NetworkBGSAVE() return AbortWithWrongNumberOfArguments(nameof(RespCommand.BGSAVE)); } - var success = storeWrapper.TakeCheckpoint(true, StoreType.All, logger); + var success = storeWrapper.TakeCheckpoint(true, logger); if (success) { while (!RespWriteUtils.TryWriteSimpleString("Background saving started"u8, ref dcurr, dend)) diff --git a/libs/server/Servers/GarnetServerOptions.cs b/libs/server/Servers/GarnetServerOptions.cs index 399578e82d..fd6a0d8f79 100644 --- a/libs/server/Servers/GarnetServerOptions.cs +++ b/libs/server/Servers/GarnetServerOptions.cs @@ -446,6 +446,16 @@ public class GarnetServerOptions : ServerOptions /// public UnixFileMode UnixSocketPermission { get; set; } + /// + /// Epoch instance used by server + /// + public LightEpoch Epoch; + + /// + /// Common state machine driver used by Garnet + /// + public StateMachineDriver StateMachineDriver; + /// /// Constructor /// @@ -454,6 +464,16 @@ public GarnetServerOptions(ILogger logger = null) : base(logger) this.logger = logger; } + /// + /// Initialize Garnet server options + /// + /// + public void Initialize(ILoggerFactory loggerFactory = null) + { + Epoch = new LightEpoch(); + StateMachineDriver = new(Epoch, loggerFactory?.CreateLogger($"StateMachineDriver")); + } + /// /// Get main store settings /// @@ -466,15 +486,16 @@ public KVSettings GetSettings(ILoggerFactory loggerFactory, if (MutablePercent is < 10 or > 95) throw new Exception("MutablePercent must be between 10 and 95"); - KVSettings kvSettings = new(baseDir: null, logger: logger); - var indexCacheLines = IndexSizeCachelines("hash index size", IndexSize); - kvSettings = new() + + KVSettings kvSettings = new() { IndexSize = indexCacheLines * 64L, PreallocateLog = false, MutableFraction = MutablePercent / 100.0, PageSize = 1L << PageSizeBits(), + Epoch = Epoch, + StateMachineDriver = StateMachineDriver, loggerFactory = loggerFactory, logger = loggerFactory?.CreateLogger("TsavoriteKV [main]") }; @@ -618,23 +639,26 @@ public static int MemorySizeBits(string memorySize, string storePageSize, out in /// /// Get KVSettings for the object store log /// - public KVSettings GetObjectStoreSettings(ILogger logger, out long objHeapMemorySize, out long objReadCacheHeapMemorySize) + public KVSettings GetObjectStoreSettings(ILoggerFactory loggerFactory, out long objHeapMemorySize, out long objReadCacheHeapMemorySize) { objReadCacheHeapMemorySize = default; if (ObjectStoreMutablePercent is < 10 or > 95) throw new Exception("ObjectStoreMutablePercent must be between 10 and 95"); - KVSettings kvSettings = new(baseDir: null, logger: logger); - var indexCacheLines = IndexSizeCachelines("object store hash index size", ObjectStoreIndexSize); - kvSettings = new() + KVSettings kvSettings = new() { IndexSize = indexCacheLines * 64L, PreallocateLog = false, MutableFraction = ObjectStoreMutablePercent / 100.0, - PageSize = 1L << ObjectStorePageSizeBits() + PageSize = 1L << ObjectStorePageSizeBits(), + Epoch = Epoch, + StateMachineDriver = StateMachineDriver, + loggerFactory = loggerFactory, + logger = loggerFactory?.CreateLogger("TsavoriteKV [obj]") }; + logger?.LogInformation("[Object Store] Using page size of {PageSize}", PrettySize(kvSettings.PageSize)); logger?.LogInformation("[Object Store] Each page can hold ~{PageSize} key-value pairs of objects", kvSettings.PageSize / 24); diff --git a/libs/server/StoreWrapper.cs b/libs/server/StoreWrapper.cs index 0def6ff075..555fcec299 100644 --- a/libs/server/StoreWrapper.cs +++ b/libs/server/StoreWrapper.cs @@ -761,28 +761,27 @@ public async Task TakeOnDemandCheckpoint(DateTimeOffset entryTime) } // Necessary to take a checkpoint because the latest checkpoint is before entryTime - await CheckpointTask(StoreType.All, logger: logger); + await CheckpointTask(logger); } /// /// Take checkpoint /// /// - /// /// /// - public bool TakeCheckpoint(bool background, StoreType storeType = StoreType.All, ILogger logger = null) + public bool TakeCheckpoint(bool background, ILogger logger = null) { // Prevent parallel checkpoint if (!TryPauseCheckpoints()) return false; if (background) - Task.Run(async () => await CheckpointTask(storeType, logger)); + Task.Run(async () => await CheckpointTask(logger)); else - CheckpointTask(storeType, logger).ConfigureAwait(false).GetAwaiter().GetResult(); + CheckpointTask(logger).ConfigureAwait(false).GetAwaiter().GetResult(); return true; } - private async Task CheckpointTask(StoreType storeType, ILogger logger = null) + private async Task CheckpointTask(ILogger logger = null) { try { @@ -803,12 +802,11 @@ private async Task CheckpointTask(StoreType storeType, ILogger logger = null) tryIncremental = false; var checkpointType = serverOptions.UseFoldOverCheckpoints ? CheckpointType.FoldOver : CheckpointType.Snapshot; - await InitiateCheckpoint(full, checkpointType, tryIncremental, storeType, logger); + await InitiateCheckpoint(full, checkpointType, tryIncremental, logger); if (full) { - if (storeType is StoreType.Main or StoreType.All) - this.lastSaveStoreTailAddress = lastSaveStoreTailAddress; - if (storeType is StoreType.Object or StoreType.All) + this.lastSaveStoreTailAddress = lastSaveStoreTailAddress; + if (objectStore != null) this.lastSaveObjectStoreTailAddress = lastSaveObjectStoreTailAddress; } lastSaveTime = DateTimeOffset.UtcNow; @@ -823,9 +821,9 @@ private async Task CheckpointTask(StoreType storeType, ILogger logger = null) } } - private async Task InitiateCheckpoint(bool full, CheckpointType checkpointType, bool tryIncremental, StoreType storeType, ILogger logger = null) + private async Task InitiateCheckpoint(bool full, CheckpointType checkpointType, bool tryIncremental, ILogger logger = null) { - logger?.LogInformation("Initiating checkpoint; full = {full}, type = {checkpointType}, tryIncremental = {tryIncremental}, storeType = {storeType}", full, checkpointType, tryIncremental, storeType); + logger?.LogInformation("Initiating checkpoint; full = {full}, type = {checkpointType}, tryIncremental = {tryIncremental}", full, checkpointType, tryIncremental); long CheckpointCoveredAofAddress = 0; if (appendOnlyFile != null) @@ -839,41 +837,47 @@ private async Task InitiateCheckpoint(bool full, CheckpointType checkpointType, logger?.LogInformation("Will truncate AOF to {tailAddress} after checkpoint (files deleted after next commit)", CheckpointCoveredAofAddress); } - (bool success, Guid token) storeCheckpointResult = default; - (bool success, Guid token) objectStoreCheckpointResult = default; + (bool success, Guid token) checkpointResult = default; + IStateMachine sm; if (full) { - if (storeType is StoreType.Main or StoreType.All) - { - storeCheckpointResult = await store.TakeFullCheckpointAsync(checkpointType); - if (serverOptions.EnableCluster && clusterProvider.IsPrimary()) EnqueueCommit(AofEntryType.MainStoreCheckpointEndCommit, store.CurrentVersion); - } - - if (objectStore != null && (storeType == StoreType.Object || storeType == StoreType.All)) - { - objectStoreCheckpointResult = await objectStore.TakeFullCheckpointAsync(checkpointType); - if (serverOptions.EnableCluster && clusterProvider.IsPrimary()) EnqueueCommit(AofEntryType.ObjectStoreCheckpointEndCommit, objectStore.CurrentVersion); - } + sm = objectStore == null ? + Checkpoint.Full(store, checkpointType, out checkpointResult.token) : + Checkpoint.Full(store, objectStore, checkpointType, out checkpointResult.token); } else { - if (storeType is StoreType.Main or StoreType.All) + tryIncremental &= store.CanTakeIncrementalCheckpoint(checkpointType, out checkpointResult.token); + if (objectStore != null) + tryIncremental &= objectStore.CanTakeIncrementalCheckpoint(checkpointType, out var guid2) & checkpointResult.token == guid2; + + if (tryIncremental) { - storeCheckpointResult = await store.TakeHybridLogCheckpointAsync(checkpointType, tryIncremental); - if (serverOptions.EnableCluster && clusterProvider.IsPrimary()) EnqueueCommit(AofEntryType.MainStoreCheckpointEndCommit, store.CurrentVersion); + sm = objectStore == null ? + Checkpoint.IncrementalHybridLogOnly(store, checkpointResult.token) : + Checkpoint.IncrementalHybridLogOnly(store, objectStore, checkpointResult.token); } - - if (objectStore != null && (storeType == StoreType.Object || storeType == StoreType.All)) + else { - objectStoreCheckpointResult = await objectStore.TakeHybridLogCheckpointAsync(checkpointType, tryIncremental); - if (serverOptions.EnableCluster && clusterProvider.IsPrimary()) EnqueueCommit(AofEntryType.ObjectStoreCheckpointEndCommit, objectStore.CurrentVersion); + sm = objectStore == null ? + Checkpoint.HybridLogOnly(store, checkpointType, out checkpointResult.token) : + Checkpoint.HybridLogOnly(store, objectStore, checkpointType, out checkpointResult.token); } } + checkpointResult.success = await serverOptions.StateMachineDriver.RunAsync(sm); + + if (serverOptions.EnableCluster && clusterProvider.IsPrimary()) + { + EnqueueCommit(AofEntryType.MainStoreCheckpointEndCommit, store.CurrentVersion); + if (objectStore != null) + EnqueueCommit(AofEntryType.ObjectStoreCheckpointEndCommit, objectStore.CurrentVersion); + } + // If cluster is enabled the replication manager is responsible for truncating AOF if (serverOptions.EnableCluster && serverOptions.EnableAOF) { - clusterProvider.SafeTruncateAOF(storeType, full, CheckpointCoveredAofAddress, storeCheckpointResult.token, objectStoreCheckpointResult.token); + clusterProvider.SafeTruncateAOF(StoreType.All, full, CheckpointCoveredAofAddress, checkpointResult.token, checkpointResult.token); } else { diff --git a/libs/server/Transaction/TransactionManager.cs b/libs/server/Transaction/TransactionManager.cs index 1119cf84db..5e9d0ead44 100644 --- a/libs/server/Transaction/TransactionManager.cs +++ b/libs/server/Transaction/TransactionManager.cs @@ -292,6 +292,35 @@ internal void GetKeysForValidation(byte* recvBufferPtr, out ArgSlice[] keys, out readOnly = keyEntries.IsReadOnly; } + void BeginLockable(StoreType transactionStoreType) + { + while (true) + { + if (transactionStoreType is StoreType.All or StoreType.Main) + { + lockableContext.BeginLockable(); + } + if (transactionStoreType is StoreType.All or StoreType.Object) + { + if (objectStoreBasicContext.IsNull) + throw new Exception("Trying to perform object store transaction with object store disabled"); + if (objectStoreLockableContext.TryBeginLockable()) + { + // If we managed to begin lockable for the object store, we MUST be in the same version as the main store + break; + } + objectStoreLockableContext.Refresh(); + if (transactionStoreType is StoreType.All or StoreType.Main) + { + lockableContext.EndLockable(); + lockableContext.Refresh(); + } + continue; + } + break; + } + } + internal bool Run(bool internal_txn = false, bool fail_fast_on_lock = false, TimeSpan lock_timeout = default) { // Save watch keys to lock list @@ -299,16 +328,7 @@ internal bool Run(bool internal_txn = false, bool fail_fast_on_lock = false, Tim watchContainer.SaveKeysToLock(this); // Acquire lock sessions - if (transactionStoreType == StoreType.All || transactionStoreType == StoreType.Main) - { - lockableContext.BeginLockable(); - } - if (transactionStoreType == StoreType.All || transactionStoreType == StoreType.Object) - { - if (objectStoreBasicContext.IsNull) - throw new Exception("Trying to perform object store transaction with object store disabled"); - objectStoreLockableContext.BeginLockable(); - } + BeginLockable(transactionStoreType); bool lockSuccess; if (fail_fast_on_lock) diff --git a/libs/storage/Tsavorite/cs/src/core/Allocator/IStreamingSnapshotIteratorFunctions.cs b/libs/storage/Tsavorite/cs/src/core/Allocator/IStreamingSnapshotIteratorFunctions.cs index cd95e696be..422353aa74 100644 --- a/libs/storage/Tsavorite/cs/src/core/Allocator/IStreamingSnapshotIteratorFunctions.cs +++ b/libs/storage/Tsavorite/cs/src/core/Allocator/IStreamingSnapshotIteratorFunctions.cs @@ -13,9 +13,9 @@ public interface IStreamingSnapshotIteratorFunctions /// Iteration is starting. /// Checkpoint token /// Current version of database - /// Target version of database + /// Next version of database /// True to continue iteration, else false - bool OnStart(Guid checkpointToken, long currentVersion, long targetVersion); + bool OnStart(Guid checkpointToken, long currentVersion, long nextVersion); /// Next record in the streaming snapshot. /// Reference to the current record's key diff --git a/libs/storage/Tsavorite/cs/src/core/ClientSession/ClientSession.cs b/libs/storage/Tsavorite/cs/src/core/ClientSession/ClientSession.cs index 56296b2438..5e11351dd0 100644 --- a/libs/storage/Tsavorite/cs/src/core/ClientSession/ClientSession.cs +++ b/libs/storage/Tsavorite/cs/src/core/ClientSession/ClientSession.cs @@ -67,6 +67,27 @@ internal void AcquireLockable(TSessionFunctions sessionFuncti } } + internal bool TryAcquireLockable(TSessionFunctions sessionFunctions) + where TSessionFunctions : ISessionFunctionsWrapper + { + CheckIsNotAcquiredLockable(sessionFunctions); + + // Checkpoints cannot complete while we have active locking sessions. + if (IsInPreparePhase()) + { + return false; + } + + store.IncrementNumLockingSessions(); + sessionFunctions.Ctx.isAcquiredLockable = true; + + if (!IsInPreparePhase()) + return true; + + InternalReleaseLockable(sessionFunctions); + return false; + } + internal void ReleaseLockable(TSessionFunctions sessionFunctions) where TSessionFunctions : ISessionFunctionsWrapper { diff --git a/libs/storage/Tsavorite/cs/src/core/ClientSession/ILockableContext.cs b/libs/storage/Tsavorite/cs/src/core/ClientSession/ILockableContext.cs index 373c56ae40..e2c455675c 100644 --- a/libs/storage/Tsavorite/cs/src/core/ClientSession/ILockableContext.cs +++ b/libs/storage/Tsavorite/cs/src/core/ClientSession/ILockableContext.cs @@ -19,6 +19,11 @@ public interface ILockableContext /// void BeginLockable(); + /// + /// Try to begin a series of lock operations on possibly multiple keys; call before any locks are taken. + /// + bool TryBeginLockable(); + /// /// Ends a series of lock operations on possibly multiple keys; call after all locks are released. /// diff --git a/libs/storage/Tsavorite/cs/src/core/ClientSession/LockableContext.cs b/libs/storage/Tsavorite/cs/src/core/ClientSession/LockableContext.cs index 5658018f4f..d12f5ff867 100644 --- a/libs/storage/Tsavorite/cs/src/core/ClientSession/LockableContext.cs +++ b/libs/storage/Tsavorite/cs/src/core/ClientSession/LockableContext.cs @@ -36,6 +36,9 @@ internal LockableContext(ClientSession public void BeginLockable() => clientSession.AcquireLockable(sessionFunctions); + /// + public bool TryBeginLockable() => clientSession.TryAcquireLockable(sessionFunctions); + /// public void EndLockable() => clientSession.ReleaseLockable(sessionFunctions); diff --git a/libs/storage/Tsavorite/cs/src/core/ClientSession/LockableUnsafeContext.cs b/libs/storage/Tsavorite/cs/src/core/ClientSession/LockableUnsafeContext.cs index 9dc1fb2c23..303ec9ec6d 100644 --- a/libs/storage/Tsavorite/cs/src/core/ClientSession/LockableUnsafeContext.cs +++ b/libs/storage/Tsavorite/cs/src/core/ClientSession/LockableUnsafeContext.cs @@ -47,6 +47,9 @@ internal LockableUnsafeContext(ClientSession public void BeginLockable() => clientSession.AcquireLockable(sessionFunctions); + /// + public bool TryBeginLockable() => clientSession.TryAcquireLockable(sessionFunctions); + /// public void EndLockable() => clientSession.ReleaseLockable(sessionFunctions); #endregion Begin/EndLockable diff --git a/libs/storage/Tsavorite/cs/src/core/Index/Checkpointing/Checkpoint.cs b/libs/storage/Tsavorite/cs/src/core/Index/Checkpointing/Checkpoint.cs index 0c3e54a882..1295be2461 100644 --- a/libs/storage/Tsavorite/cs/src/core/Index/Checkpointing/Checkpoint.cs +++ b/libs/storage/Tsavorite/cs/src/core/Index/Checkpointing/Checkpoint.cs @@ -5,9 +5,10 @@ namespace Tsavorite.core { - static class Checkpoint + public static class Checkpoint { - public static IStateMachine Full(TsavoriteKV store, CheckpointType checkpointType, long targetVersion, out Guid guid) + #region Single-store APIs + public static IStateMachine Full(TsavoriteKV store, CheckpointType checkpointType, out Guid guid) where TStoreFunctions : IStoreFunctions where TAllocator : IAllocator { @@ -17,12 +18,12 @@ public static IStateMachine Full(Tsav if (checkpointType == CheckpointType.FoldOver) { var backend = new FoldOverSMTask(store, guid); - return new FullCheckpointSM(targetVersion, indexCheckpointTask, backend); + return new FullCheckpointSM(indexCheckpointTask, backend); } else if (checkpointType == CheckpointType.Snapshot) { var backend = new SnapshotCheckpointSMTask(store, guid); - return new FullCheckpointSM(targetVersion, indexCheckpointTask, backend); + return new FullCheckpointSM(indexCheckpointTask, backend); } else { @@ -30,25 +31,25 @@ public static IStateMachine Full(Tsav } } - public static IStateMachine Streaming(TsavoriteKV store, long targetVersion, out Guid guid) + public static IStateMachine Streaming(TsavoriteKV store, out Guid guid) where TStoreFunctions : IStoreFunctions where TAllocator : IAllocator { guid = Guid.NewGuid(); - var backend = new StreamingSnapshotCheckpointSMTask(targetVersion, store, guid); - return new StreamingSnapshotCheckpointSM(targetVersion, backend); + var backend = new StreamingSnapshotCheckpointSMTask(store, guid); + return new StreamingSnapshotCheckpointSM(backend); } - public static IStateMachine IndexOnly(TsavoriteKV store, long targetVersion, out Guid guid) + public static IStateMachine IndexOnly(TsavoriteKV store, out Guid guid) where TStoreFunctions : IStoreFunctions where TAllocator : IAllocator { guid = Guid.NewGuid(); var indexCheckpointTask = new IndexCheckpointSMTask(store, guid); - return new IndexCheckpointSM(targetVersion, indexCheckpointTask); + return new IndexCheckpointSM(indexCheckpointTask); } - public static IStateMachine HybridLogOnly(TsavoriteKV store, CheckpointType checkpointType, long targetVersion, out Guid guid) + public static IStateMachine HybridLogOnly(TsavoriteKV store, CheckpointType checkpointType, out Guid guid) where TStoreFunctions : IStoreFunctions where TAllocator : IAllocator { @@ -57,12 +58,12 @@ public static IStateMachine HybridLogOnly(store, guid); - return new HybridLogCheckpointSM(targetVersion, backend); + return new HybridLogCheckpointSM(backend); } else if (checkpointType == CheckpointType.Snapshot) { var backend = new SnapshotCheckpointSMTask(store, guid); - return new HybridLogCheckpointSM(targetVersion, backend); + return new HybridLogCheckpointSM(backend); } else { @@ -70,12 +71,119 @@ public static IStateMachine HybridLogOnly(TsavoriteKV store, long targetVersion, Guid guid) + public static IStateMachine IncrementalHybridLogOnly(TsavoriteKV store, Guid guid) where TStoreFunctions : IStoreFunctions where TAllocator : IAllocator { var backend = new IncrementalSnapshotCheckpointSMTask(store, guid); - return new HybridLogCheckpointSM(targetVersion, backend); + return new HybridLogCheckpointSM(backend); } + #endregion + + #region Two-store APIs + public static IStateMachine Full( + TsavoriteKV store1, + TsavoriteKV store2, + CheckpointType checkpointType, out Guid guid) + where TStoreFunctions1 : IStoreFunctions + where TAllocator1 : IAllocator + where TStoreFunctions2 : IStoreFunctions + where TAllocator2 : IAllocator + { + guid = Guid.NewGuid(); + var indexCheckpointTask1 = new IndexCheckpointSMTask(store1, guid); + var indexCheckpointTask2 = new IndexCheckpointSMTask(store2, guid); + + if (checkpointType == CheckpointType.FoldOver) + { + var backend1 = new FoldOverSMTask(store1, guid); + var backend2 = new FoldOverSMTask(store2, guid); + return new FullCheckpointSM(indexCheckpointTask1, indexCheckpointTask2, backend1, backend2); + } + else if (checkpointType == CheckpointType.Snapshot) + { + var backend1 = new SnapshotCheckpointSMTask(store1, guid); + var backend2 = new SnapshotCheckpointSMTask(store2, guid); + return new FullCheckpointSM(indexCheckpointTask1, indexCheckpointTask2, backend1, backend2); + } + else + { + throw new TsavoriteException("Invalid checkpoint type"); + } + } + + public static IStateMachine Streaming( + TsavoriteKV store1, + TsavoriteKV store2, + out Guid guid) + where TStoreFunctions1 : IStoreFunctions + where TAllocator1 : IAllocator + where TStoreFunctions2 : IStoreFunctions + where TAllocator2 : IAllocator + { + guid = Guid.NewGuid(); + var backend1 = new StreamingSnapshotCheckpointSMTask(store1, guid); + var backend2 = new StreamingSnapshotCheckpointSMTask(store2, guid); + return new StreamingSnapshotCheckpointSM(backend1, backend2); + } + + public static IStateMachine IndexOnly( + TsavoriteKV store1, + TsavoriteKV store2, + out Guid guid) + where TStoreFunctions1 : IStoreFunctions + where TAllocator1 : IAllocator + where TStoreFunctions2 : IStoreFunctions + where TAllocator2 : IAllocator + { + guid = Guid.NewGuid(); + var indexCheckpointTask1 = new IndexCheckpointSMTask(store1, guid); + var indexCheckpointTask2 = new IndexCheckpointSMTask(store2, guid); + return new IndexCheckpointSM(indexCheckpointTask1, indexCheckpointTask2); + } + + public static IStateMachine HybridLogOnly( + TsavoriteKV store1, + TsavoriteKV store2, + CheckpointType checkpointType, out Guid guid) + where TStoreFunctions1 : IStoreFunctions + where TAllocator1 : IAllocator + where TStoreFunctions2 : IStoreFunctions + where TAllocator2 : IAllocator + { + guid = Guid.NewGuid(); + + if (checkpointType == CheckpointType.FoldOver) + { + var backend1 = new FoldOverSMTask(store1, guid); + var backend2 = new FoldOverSMTask(store2, guid); + return new HybridLogCheckpointSM(backend1, backend2); + } + else if (checkpointType == CheckpointType.Snapshot) + { + var backend1 = new SnapshotCheckpointSMTask(store1, guid); + var backend2 = new SnapshotCheckpointSMTask(store2, guid); + return new HybridLogCheckpointSM(backend1, backend2); + } + else + { + throw new TsavoriteException("Invalid checkpoint type"); + } + } + + public static IStateMachine IncrementalHybridLogOnly( + TsavoriteKV store1, + TsavoriteKV store2, + Guid guid) + where TStoreFunctions1 : IStoreFunctions + where TAllocator1 : IAllocator + where TStoreFunctions2 : IStoreFunctions + where TAllocator2 : IAllocator + { + var backend1 = new IncrementalSnapshotCheckpointSMTask(store1, guid); + var backend2 = new IncrementalSnapshotCheckpointSMTask(store2, guid); + return new HybridLogCheckpointSM(backend1, backend2); + } + #endregion } } \ No newline at end of file diff --git a/libs/storage/Tsavorite/cs/src/core/Index/Checkpointing/FullCheckpointSM.cs b/libs/storage/Tsavorite/cs/src/core/Index/Checkpointing/FullCheckpointSM.cs index f35f63373c..8b1889714f 100644 --- a/libs/storage/Tsavorite/cs/src/core/Index/Checkpointing/FullCheckpointSM.cs +++ b/libs/storage/Tsavorite/cs/src/core/Index/Checkpointing/FullCheckpointSM.cs @@ -9,13 +9,11 @@ namespace Tsavorite.core internal sealed class FullCheckpointSM : HybridLogCheckpointSM { /// - /// Construct a new FullCheckpointStateMachine to use the given set of checkpoint tasks, - /// drawing boundary at targetVersion. + /// Construct a new FullCheckpointStateMachine to use the given set of checkpoint tasks. /// - /// upper limit (inclusive) of the version included /// Tasks - public FullCheckpointSM(long targetVersion = -1, params IStateMachineTask[] tasks) - : base(targetVersion, tasks) + public FullCheckpointSM(params IStateMachineTask[] tasks) + : base(tasks) { } /// diff --git a/libs/storage/Tsavorite/cs/src/core/Index/Checkpointing/HybridLogCheckpointSM.cs b/libs/storage/Tsavorite/cs/src/core/Index/Checkpointing/HybridLogCheckpointSM.cs index 501f79152b..76dd21734d 100644 --- a/libs/storage/Tsavorite/cs/src/core/Index/Checkpointing/HybridLogCheckpointSM.cs +++ b/libs/storage/Tsavorite/cs/src/core/Index/Checkpointing/HybridLogCheckpointSM.cs @@ -11,10 +11,9 @@ internal class HybridLogCheckpointSM : VersionChangeSM /// /// Construct a new HybridLogCheckpointStateMachine with the given tasks. Does not load any tasks by default. /// - /// upper limit (inclusive) of the version included /// The tasks to load onto the state machine - public HybridLogCheckpointSM(long targetVersion, params IStateMachineTask[] tasks) - : base(targetVersion, tasks) { } + public HybridLogCheckpointSM(params IStateMachineTask[] tasks) + : base(tasks) { } /// public override SystemState NextState(SystemState start) diff --git a/libs/storage/Tsavorite/cs/src/core/Index/Checkpointing/IndexCheckpointSM.cs b/libs/storage/Tsavorite/cs/src/core/Index/Checkpointing/IndexCheckpointSM.cs index 1a300e9bbf..4e1f46a8bf 100644 --- a/libs/storage/Tsavorite/cs/src/core/Index/Checkpointing/IndexCheckpointSM.cs +++ b/libs/storage/Tsavorite/cs/src/core/Index/Checkpointing/IndexCheckpointSM.cs @@ -11,8 +11,8 @@ internal sealed class IndexCheckpointSM : StateMachineBase /// /// Create a new IndexSnapshotStateMachine /// - public IndexCheckpointSM(long targetVersion = -1, params IStateMachineTask[] tasks) - : base(targetVersion, tasks) + public IndexCheckpointSM(params IStateMachineTask[] tasks) + : base(tasks) { } diff --git a/libs/storage/Tsavorite/cs/src/core/Index/Checkpointing/IndexResizeSM.cs b/libs/storage/Tsavorite/cs/src/core/Index/Checkpointing/IndexResizeSM.cs index 0bff11bdcd..d15856d311 100644 --- a/libs/storage/Tsavorite/cs/src/core/Index/Checkpointing/IndexResizeSM.cs +++ b/libs/storage/Tsavorite/cs/src/core/Index/Checkpointing/IndexResizeSM.cs @@ -11,7 +11,7 @@ internal sealed class IndexResizeSM : StateMachineBase /// /// Constructs a new IndexResizeStateMachine /// - public IndexResizeSM(int targetVersion = -1, params IStateMachineTask[] tasks) : base(targetVersion, tasks) + public IndexResizeSM(params IStateMachineTask[] tasks) : base(tasks) { } diff --git a/libs/storage/Tsavorite/cs/src/core/Index/Checkpointing/StateMachineBase.cs b/libs/storage/Tsavorite/cs/src/core/Index/Checkpointing/StateMachineBase.cs index 7b0add4b56..f031e40f8a 100644 --- a/libs/storage/Tsavorite/cs/src/core/Index/Checkpointing/StateMachineBase.cs +++ b/libs/storage/Tsavorite/cs/src/core/Index/Checkpointing/StateMachineBase.cs @@ -10,17 +10,14 @@ namespace Tsavorite.core internal abstract class StateMachineBase : IStateMachine { readonly IStateMachineTask[] tasks; - protected long toVersion; /// /// Construct a new SynchronizationStateMachine with the given tasks. The order of tasks given is the /// order they are executed on each state machine. /// - /// To version /// The ISynchronizationTasks to run on the state machine - protected StateMachineBase(long toVersion = -1, params IStateMachineTask[] tasks) + protected StateMachineBase(params IStateMachineTask[] tasks) { - this.toVersion = toVersion; this.tasks = tasks; } diff --git a/libs/storage/Tsavorite/cs/src/core/Index/Checkpointing/StateMachineDriver.cs b/libs/storage/Tsavorite/cs/src/core/Index/Checkpointing/StateMachineDriver.cs index ea05c8fc19..83293292c9 100644 --- a/libs/storage/Tsavorite/cs/src/core/Index/Checkpointing/StateMachineDriver.cs +++ b/libs/storage/Tsavorite/cs/src/core/Index/Checkpointing/StateMachineDriver.cs @@ -29,10 +29,10 @@ public class StateMachineDriver public SystemState SystemState => SystemState.Copy(ref systemState); - public StateMachineDriver(LightEpoch epoch, SystemState initialState, ILogger logger = null) + public StateMachineDriver(LightEpoch epoch, ILogger logger = null) { this.epoch = epoch; - this.systemState = initialState; + this.systemState = SystemState.Make(Phase.REST, 1); this.waitingList = []; this.logger = logger; } diff --git a/libs/storage/Tsavorite/cs/src/core/Index/Checkpointing/StreamingSnapshotCheckpointSM.cs b/libs/storage/Tsavorite/cs/src/core/Index/Checkpointing/StreamingSnapshotCheckpointSM.cs index f8e74f3176..bc0d401e76 100644 --- a/libs/storage/Tsavorite/cs/src/core/Index/Checkpointing/StreamingSnapshotCheckpointSM.cs +++ b/libs/storage/Tsavorite/cs/src/core/Index/Checkpointing/StreamingSnapshotCheckpointSM.cs @@ -9,11 +9,10 @@ namespace Tsavorite.core class StreamingSnapshotCheckpointSM : VersionChangeSM { /// - /// Construct a new StreamingSnapshotCheckpointStateMachine, drawing boundary at targetVersion. + /// Construct a new StreamingSnapshotCheckpointStateMachine. /// - /// upper limit (inclusive) of the version included - public StreamingSnapshotCheckpointSM(long targetVersion, IStateMachineTask backend) - : base(targetVersion, backend) + public StreamingSnapshotCheckpointSM(params IStateMachineTask[] tasks) + : base(tasks) { } /// diff --git a/libs/storage/Tsavorite/cs/src/core/Index/Checkpointing/StreamingSnapshotCheckpointSMTask.cs b/libs/storage/Tsavorite/cs/src/core/Index/Checkpointing/StreamingSnapshotCheckpointSMTask.cs index b4d99098c6..f2a5ca871d 100644 --- a/libs/storage/Tsavorite/cs/src/core/Index/Checkpointing/StreamingSnapshotCheckpointSMTask.cs +++ b/libs/storage/Tsavorite/cs/src/core/Index/Checkpointing/StreamingSnapshotCheckpointSMTask.cs @@ -17,12 +17,9 @@ sealed class StreamingSnapshotCheckpointSMTask where TAllocator : IAllocator { - readonly long targetVersion; - - public StreamingSnapshotCheckpointSMTask(long targetVersion, TsavoriteKV store, Guid guid) + public StreamingSnapshotCheckpointSMTask(TsavoriteKV store, Guid guid) : base(store, guid) { - this.targetVersion = targetVersion; } /// @@ -31,15 +28,22 @@ public override void GlobalBeforeEnteringState(SystemState next, StateMachineDri switch (next.Phase) { case Phase.PREPARE: + base.GlobalBeforeEnteringState(next, stateMachineDriver); store._lastSnapshotCheckpoint.Dispose(); store._hybridLogCheckpointToken = guid; store.InitializeHybridLogCheckpoint(store._hybridLogCheckpointToken, next.Version); - store._hybridLogCheckpoint.info.version = next.Version; - store._hybridLogCheckpoint.info.nextVersion = targetVersion == -1 ? next.Version + 1 : targetVersion; + store._hybridLogCheckpoint.info.nextVersion = next.Version + 1; store.StreamingSnapshotScanPhase1(); break; + + case Phase.IN_PROGRESS: + if (store._hybridLogCheckpoint.info.nextVersion != next.Version) + throw new TsavoriteException($"StreamingSnapshotCheckpointSMTask: IN_PROGRESS phase with incorrect version {next.Version}, expected {store._hybridLogCheckpoint.info.nextVersion}"); + base.GlobalBeforeEnteringState(next, stateMachineDriver); + break; case Phase.WAIT_FLUSH: + base.GlobalBeforeEnteringState(next, stateMachineDriver); var finalLogicalAddress = store.hlogBase.GetTailAddress(); store.StreamingSnapshotScanPhase2(finalLogicalAddress); break; diff --git a/libs/storage/Tsavorite/cs/src/core/Index/Checkpointing/StreamingSnapshotTsavoriteKV.cs b/libs/storage/Tsavorite/cs/src/core/Index/Checkpointing/StreamingSnapshotTsavoriteKV.cs index 56694ed8ec..a6dcda90a8 100644 --- a/libs/storage/Tsavorite/cs/src/core/Index/Checkpointing/StreamingSnapshotTsavoriteKV.cs +++ b/libs/storage/Tsavorite/cs/src/core/Index/Checkpointing/StreamingSnapshotTsavoriteKV.cs @@ -24,15 +24,15 @@ class ScanPhase1Functions : IScanIteratorFunctions readonly IStreamingSnapshotIteratorFunctions streamingSnapshotIteratorFunctions; readonly Guid checkpointToken; readonly long currentVersion; - readonly long targetVersion; + readonly long nextVersion; public long numberOfRecords; - public ScanPhase1Functions(IStreamingSnapshotIteratorFunctions streamingSnapshotIteratorFunctions, Guid checkpointToken, long currentVersion, long targetVersion) + public ScanPhase1Functions(IStreamingSnapshotIteratorFunctions streamingSnapshotIteratorFunctions, Guid checkpointToken, long currentVersion, long nextVersion) { this.streamingSnapshotIteratorFunctions = streamingSnapshotIteratorFunctions; this.checkpointToken = checkpointToken; this.currentVersion = currentVersion; - this.targetVersion = targetVersion; + this.nextVersion = nextVersion; } /// @@ -52,7 +52,7 @@ public void OnException(Exception exception, long numberOfRecords) /// public bool OnStart(long beginAddress, long endAddress) - => streamingSnapshotIteratorFunctions.OnStart(checkpointToken, currentVersion, targetVersion); + => streamingSnapshotIteratorFunctions.OnStart(checkpointToken, currentVersion, nextVersion); /// public void OnStop(bool completed, long numberOfRecords) diff --git a/libs/storage/Tsavorite/cs/src/core/Index/Checkpointing/VersionChangeSM.cs b/libs/storage/Tsavorite/cs/src/core/Index/Checkpointing/VersionChangeSM.cs index 7197760f5b..67802d0f8b 100644 --- a/libs/storage/Tsavorite/cs/src/core/Index/Checkpointing/VersionChangeSM.cs +++ b/libs/storage/Tsavorite/cs/src/core/Index/Checkpointing/VersionChangeSM.cs @@ -11,9 +11,8 @@ internal class VersionChangeSM : StateMachineBase /// /// Construct a new VersionChangeStateMachine with the given tasks. Does not load any tasks by default. /// - /// upper limit (inclusive) of the version included /// The tasks to load onto the state machine - protected VersionChangeSM(long targetVersion = -1, params IStateMachineTask[] tasks) : base(targetVersion, tasks) + protected VersionChangeSM(params IStateMachineTask[] tasks) : base(tasks) { } @@ -28,8 +27,7 @@ public override SystemState NextState(SystemState start) break; case Phase.PREPARE: nextState.Phase = Phase.IN_PROGRESS; - if (toVersion == -1) toVersion = start.Version + 1; - nextState.Version = toVersion; + nextState.Version = start.Version + 1; break; case Phase.IN_PROGRESS: nextState.Phase = Phase.REST; diff --git a/libs/storage/Tsavorite/cs/src/core/Index/Common/KVSettings.cs b/libs/storage/Tsavorite/cs/src/core/Index/Common/KVSettings.cs index 43d5eab5a1..bfb6d9468a 100644 --- a/libs/storage/Tsavorite/cs/src/core/Index/Common/KVSettings.cs +++ b/libs/storage/Tsavorite/cs/src/core/Index/Common/KVSettings.cs @@ -119,6 +119,16 @@ public sealed class KVSettings : IDisposable /// public RevivificationSettings RevivificationSettings; + /// + /// Epoch instance used by the store + /// + public LightEpoch Epoch = null; + + /// + /// State machine driver for the store + /// + public StateMachineDriver StateMachineDriver = null; + /// /// Create default configuration settings for TsavoriteKV. You need to create and specify LogDevice /// explicitly with this API. diff --git a/libs/storage/Tsavorite/cs/src/core/Index/Tsavorite/Tsavorite.cs b/libs/storage/Tsavorite/cs/src/core/Index/Tsavorite/Tsavorite.cs index fcd0f5cb87..d89a621798 100644 --- a/libs/storage/Tsavorite/cs/src/core/Index/Tsavorite/Tsavorite.cs +++ b/libs/storage/Tsavorite/cs/src/core/Index/Tsavorite/Tsavorite.cs @@ -87,7 +87,7 @@ internal void IncrementNumLockingSessions() /// Store-level user function implementations /// Func to call to create the allocator(s, if doing readcache) public TsavoriteKV(KVSettings kvSettings, TStoreFunctions storeFunctions, Func allocatorFactory) - : base(kvSettings.logger ?? kvSettings.loggerFactory?.CreateLogger("TsavoriteKV Index Overflow buckets")) + : base(kvSettings.Epoch, kvSettings.logger ?? kvSettings.loggerFactory?.CreateLogger("TsavoriteKV Index Overflow buckets")) { this.allocatorFactory = allocatorFactory; loggerFactory = kvSettings.loggerFactory; @@ -158,7 +158,7 @@ public TsavoriteKV(KVSettings kvSettings, TStoreFunctions storeFun LockTable = new OverflowBucketLockTable(this); RevivificationManager = new(this, isFixedLenReviv, kvSettings.RevivificationSettings, logSettings); - stateMachineDriver = new(epoch, SystemState.Make(Phase.REST, 1), kvSettings.logger ?? kvSettings.loggerFactory?.CreateLogger($"StateMachineDriver")); + stateMachineDriver = kvSettings.StateMachineDriver ?? new(epoch, kvSettings.logger ?? kvSettings.loggerFactory?.CreateLogger($"StateMachineDriver")); if (kvSettings.TryRecoverLatest) { @@ -181,18 +181,13 @@ public TsavoriteKV(KVSettings kvSettings, TStoreFunctions storeFun /// /// Checkpoint token /// Checkpoint type - /// - /// intended version number of the next version. Checkpoint will not execute if supplied version is not larger - /// than current version. Actual new version may have version number greater than supplied number. If the supplied - /// number is -1, checkpoint will unconditionally create a new version. - /// /// Iterator for streaming snapshot records /// /// Whether we successfully initiated the checkpoint (initiation may /// fail if we are already taking a checkpoint or performing some other /// operation such as growing the index). Use CompleteCheckpointAsync to wait completion. /// - public bool TryInitiateFullCheckpoint(out Guid token, CheckpointType checkpointType, long targetVersion = -1, IStreamingSnapshotIteratorFunctions streamingSnapshotIteratorFunctions = null) + public bool TryInitiateFullCheckpoint(out Guid token, CheckpointType checkpointType, IStreamingSnapshotIteratorFunctions streamingSnapshotIteratorFunctions = null) { IStateMachine stateMachine; @@ -201,11 +196,11 @@ public bool TryInitiateFullCheckpoint(out Guid token, CheckpointType checkpointT if (streamingSnapshotIteratorFunctions is null) throw new TsavoriteException("StreamingSnapshot checkpoint requires a streaming snapshot iterator"); this.streamingSnapshotIteratorFunctions = streamingSnapshotIteratorFunctions; - stateMachine = Checkpoint.Streaming(this, targetVersion, out token); + stateMachine = Checkpoint.Streaming(this, out token); } else { - stateMachine = Checkpoint.Full(this, checkpointType, targetVersion, out token); + stateMachine = Checkpoint.Full(this, checkpointType, out token); } return stateMachineDriver.Register(stateMachine); } @@ -215,11 +210,6 @@ public bool TryInitiateFullCheckpoint(out Guid token, CheckpointType checkpointT /// /// Checkpoint type /// Cancellation token - /// - /// intended version number of the next version. Checkpoint will not execute if supplied version is not larger - /// than current version. Actual new version may have version number greater than supplied number. If the supplied - /// number is -1, checkpoint will unconditionally create a new version. - /// /// Iterator for streaming snapshot records /// /// (bool success, Guid token) @@ -230,9 +220,9 @@ public bool TryInitiateFullCheckpoint(out Guid token, CheckpointType checkpointT /// Await task to complete checkpoint, if initiated successfully /// public async ValueTask<(bool success, Guid token)> TakeFullCheckpointAsync(CheckpointType checkpointType, - CancellationToken cancellationToken = default, long targetVersion = -1, IStreamingSnapshotIteratorFunctions streamingSnapshotIteratorFunctions = null) + CancellationToken cancellationToken = default, IStreamingSnapshotIteratorFunctions streamingSnapshotIteratorFunctions = null) { - var success = TryInitiateFullCheckpoint(out Guid token, checkpointType, targetVersion, streamingSnapshotIteratorFunctions); + var success = TryInitiateFullCheckpoint(out Guid token, checkpointType, streamingSnapshotIteratorFunctions); if (success) await CompleteCheckpointAsync(cancellationToken).ConfigureAwait(false); @@ -247,7 +237,7 @@ public bool TryInitiateFullCheckpoint(out Guid token, CheckpointType checkpointT /// Whether we could initiate the checkpoint. Use CompleteCheckpointAsync to wait completion. public bool TryInitiateIndexCheckpoint(out Guid token) { - var stateMachine = Checkpoint.IndexOnly(this, -1, out token); + var stateMachine = Checkpoint.IndexOnly(this, out token); return stateMachineDriver.Register(stateMachine); } @@ -279,14 +269,9 @@ public bool TryInitiateIndexCheckpoint(out Guid token) /// Checkpoint token /// Checkpoint type /// For snapshot, try to store as incremental delta over last snapshot - /// - /// intended version number of the next version. Checkpoint will not execute if supplied version is not larger - /// than current version. Actual new version may have version number greater than supplied number. If the supplied - /// number is -1, checkpoint will unconditionally create a new version. - /// /// Whether we could initiate the checkpoint. Use CompleteCheckpointAsync to wait completion. public bool TryInitiateHybridLogCheckpoint(out Guid token, CheckpointType checkpointType, bool tryIncremental = false, - long targetVersion = -1, IStreamingSnapshotIteratorFunctions streamingSnapshotIteratorFunctions = null) + IStreamingSnapshotIteratorFunctions streamingSnapshotIteratorFunctions = null) { IStateMachine stateMachine; @@ -295,7 +280,7 @@ public bool TryInitiateHybridLogCheckpoint(out Guid token, CheckpointType checkp if (streamingSnapshotIteratorFunctions is null) throw new TsavoriteException("StreamingSnapshot checkpoint requires a streaming snapshot iterator"); this.streamingSnapshotIteratorFunctions = streamingSnapshotIteratorFunctions; - stateMachine = Checkpoint.Streaming(this, targetVersion, out token); + stateMachine = Checkpoint.Streaming(this, out token); } else { @@ -307,27 +292,37 @@ public bool TryInitiateHybridLogCheckpoint(out Guid token, CheckpointType checkp && !hlog.HasObjectLog; if (incremental) { - stateMachine = Checkpoint.IncrementalHybridLogOnly(this, targetVersion, token); + stateMachine = Checkpoint.IncrementalHybridLogOnly(this, token); } else { - stateMachine = Checkpoint.HybridLogOnly(this, checkpointType, targetVersion, out token); + stateMachine = Checkpoint.HybridLogOnly(this, checkpointType, out token); } } return stateMachineDriver.Register(stateMachine); } + /// + /// Whether we can take an incremental snapshot checkpoint given current state of the store + /// + /// + /// + public bool CanTakeIncrementalCheckpoint(CheckpointType checkpointType, out Guid guid) + { + guid = _lastSnapshotCheckpoint.info.guid; + return + checkpointType == CheckpointType.Snapshot + && guid != default + && _lastSnapshotCheckpoint.info.finalLogicalAddress > hlogBase.FlushedUntilAddress + && !hlog.HasObjectLog; + } + /// /// Take log-only checkpoint /// /// Checkpoint type /// For snapshot, try to store as incremental delta over last snapshot /// Cancellation token - /// - /// intended version number of the next version. Checkpoint will not execute if supplied version is not larger - /// than current version. Actual new version may have version number greater than supplied number. If the supplied - /// number is -1, checkpoint will unconditionally create a new version. - /// /// /// (bool success, Guid token) /// success: Whether we successfully initiated the checkpoint (initiation may @@ -337,9 +332,9 @@ public bool TryInitiateHybridLogCheckpoint(out Guid token, CheckpointType checkp /// Await task to complete checkpoint, if initiated successfully /// public async ValueTask<(bool success, Guid token)> TakeHybridLogCheckpointAsync(CheckpointType checkpointType, - bool tryIncremental = false, CancellationToken cancellationToken = default, long targetVersion = -1) + bool tryIncremental = false, CancellationToken cancellationToken = default) { - var success = TryInitiateHybridLogCheckpoint(out Guid token, checkpointType, tryIncremental, targetVersion); + var success = TryInitiateHybridLogCheckpoint(out Guid token, checkpointType, tryIncremental); if (success) await CompleteCheckpointAsync(cancellationToken).ConfigureAwait(false); @@ -585,7 +580,7 @@ public async Task GrowIndexAsync() throw new TsavoriteException("Cannot use GrowIndex when using non-async sessions"); var indexResizeTask = new IndexResizeSMTask(this); - var indexResizeSM = new IndexResizeSM(-1, indexResizeTask); + var indexResizeSM = new IndexResizeSM(indexResizeTask); return await stateMachineDriver.RunAsync(indexResizeSM); } diff --git a/libs/storage/Tsavorite/cs/src/core/Index/Tsavorite/TsavoriteBase.cs b/libs/storage/Tsavorite/cs/src/core/Index/Tsavorite/TsavoriteBase.cs index 12efdb2850..d8ac3e5824 100644 --- a/libs/storage/Tsavorite/cs/src/core/Index/Tsavorite/TsavoriteBase.cs +++ b/libs/storage/Tsavorite/cs/src/core/Index/Tsavorite/TsavoriteBase.cs @@ -52,9 +52,9 @@ public unsafe partial class TsavoriteBase /// /// Constructor /// - public TsavoriteBase(ILogger logger = null) + public TsavoriteBase(LightEpoch epoch = null, ILogger logger = null) { - epoch = new LightEpoch(); + this.epoch = epoch ?? new LightEpoch(); overflowBucketsAllocator = new MallocFixedPageSize(logger); } diff --git a/libs/storage/Tsavorite/cs/test/RecoveryChecks.cs b/libs/storage/Tsavorite/cs/test/RecoveryChecks.cs index 4df0676d56..a13d60f9d9 100644 --- a/libs/storage/Tsavorite/cs/test/RecoveryChecks.cs +++ b/libs/storage/Tsavorite/cs/test/RecoveryChecks.cs @@ -958,9 +958,9 @@ public SnapshotIterator(TsavoriteKV(new MyFunctions()); bc2 = session2.BasicContext; return true; diff --git a/libs/storage/Tsavorite/cs/test/StateMachineBarrierTests.cs b/libs/storage/Tsavorite/cs/test/StateMachineBarrierTests.cs index 13e695812c..7ccd56b414 100644 --- a/libs/storage/Tsavorite/cs/test/StateMachineBarrierTests.cs +++ b/libs/storage/Tsavorite/cs/test/StateMachineBarrierTests.cs @@ -101,8 +101,7 @@ public void StateMachineBarrierTest1() void Prepare(out SMSimpleFunctions f, out ClientSession s1, out UnsafeContext uc1, - out ThreadSession s2, - long toVersion = -1) + out ThreadSession s2) { f = new SMSimpleFunctions(); @@ -141,7 +140,7 @@ void Prepare(out SMSimpleFunctions f, // We should be in REST, 1 ClassicAssert.IsTrue(SystemState.Equal(SystemState.Make(Phase.REST, 1), store.SystemState)); - _ = store.TryInitiateHybridLogCheckpoint(out _, CheckpointType.FoldOver, targetVersion: toVersion); + _ = store.TryInitiateHybridLogCheckpoint(out _, CheckpointType.FoldOver); // Wait for PREPARE phase while (!SystemState.Equal(SystemState.Make(Phase.PREPARE, 1), store.SystemState)) diff --git a/test/Garnet.test.cluster/ReplicationTests/ClusterReplicationDisklessSyncTests.cs b/test/Garnet.test.cluster/ReplicationTests/ClusterReplicationDisklessSyncTests.cs index a8075f0716..389fa5d55d 100644 --- a/test/Garnet.test.cluster/ReplicationTests/ClusterReplicationDisklessSyncTests.cs +++ b/test/Garnet.test.cluster/ReplicationTests/ClusterReplicationDisklessSyncTests.cs @@ -236,7 +236,11 @@ public void ClusterDBVersionAlignmentDisklessSync([Values] bool disableObjects, // Validate db version var primaryVersion = context.clusterTestUtils.GetStoreCurrentVersion(primaryIndex, isMainStore: true, logger: context.logger); var replicaOneVersion = context.clusterTestUtils.GetStoreCurrentVersion(replicaOneIndex, isMainStore: true, logger: context.logger); - ClassicAssert.AreEqual(2, primaryVersion); + + // With unified store, versions increase per scan (main and object) + // so expected versions depend on whether objects are disabled or not + var expectedVersion1 = disableObjects ? 2 : 3; + ClassicAssert.AreEqual(expectedVersion1, primaryVersion); ClassicAssert.AreEqual(primaryVersion, replicaOneVersion); // Reset and re-attach replica as primary @@ -258,9 +262,13 @@ public void ClusterDBVersionAlignmentDisklessSync([Values] bool disableObjects, primaryVersion = context.clusterTestUtils.GetStoreCurrentVersion(primaryIndex, isMainStore: true, logger: context.logger); replicaOneVersion = context.clusterTestUtils.GetStoreCurrentVersion(replicaOneIndex, isMainStore: true, logger: context.logger); var replicaTwoVersion = context.clusterTestUtils.GetStoreCurrentVersion(replicaTwoIndex, isMainStore: true, logger: context.logger); - ClassicAssert.AreEqual(3, primaryVersion); + + // With unified store, versions increase per scan (main and object) + // so expected versions depend on whether objects are disabled or not + var expectedVersion2 = disableObjects ? 3 : 5; + ClassicAssert.AreEqual(expectedVersion2, primaryVersion); ClassicAssert.AreEqual(primaryVersion, replicaTwoVersion); - ClassicAssert.AreEqual(2, replicaOneVersion); + ClassicAssert.AreEqual(expectedVersion1, replicaOneVersion); // Re-attach first replica _ = context.clusterTestUtils.ClusterReplicate(replicaNodeIndex: replicaOneIndex, primaryNodeIndex: primaryIndex, logger: context.logger); @@ -271,7 +279,11 @@ public void ClusterDBVersionAlignmentDisklessSync([Values] bool disableObjects, primaryVersion = context.clusterTestUtils.GetStoreCurrentVersion(primaryIndex, isMainStore: true, logger: context.logger); replicaOneVersion = context.clusterTestUtils.GetStoreCurrentVersion(replicaOneIndex, isMainStore: true, logger: context.logger); replicaTwoVersion = context.clusterTestUtils.GetStoreCurrentVersion(replicaTwoIndex, isMainStore: true, logger: context.logger); - ClassicAssert.AreEqual(4, primaryVersion); + + // With unified store, versions increase per scan (main and object) + // so expected versions depend on whether objects are disabled or not + var expectedVersion3 = disableObjects ? 4 : 7; + ClassicAssert.AreEqual(expectedVersion3, primaryVersion); ClassicAssert.AreEqual(primaryVersion, replicaOneVersion); ClassicAssert.AreEqual(primaryVersion, replicaTwoVersion); }