From e711d78a94dd2e0fb4d604f386d3a3b9e0292ce4 Mon Sep 17 00:00:00 2001 From: Vasileios Zois <96085550+vazois@users.noreply.github.com> Date: Fri, 7 Mar 2025 10:05:54 -0800 Subject: [PATCH] Fix Checkpoint Version Shift for Diskless Replication (#1068) * correctly add commit marker for streaming checkpoint in the AOF * enqueue only on checkpoint version shift * remove default value * rename checkpoint start marker * add end marker and change enqueue * guard checkpoint end marker commit using EnableCluster flag --- .../ReplicationSnapshotIterator.cs | 6 +- .../Server/Replication/ReplicationManager.cs | 5 +- libs/server/AOF/AofEntryType.cs | 68 +++++++++++++++++-- libs/server/AOF/AofProcessor.cs | 44 ++++++------ libs/server/StoreWrapper.cs | 23 ++++--- 5 files changed, 107 insertions(+), 39 deletions(-) diff --git a/libs/cluster/Server/Replication/PrimaryOps/DisklessReplication/ReplicationSnapshotIterator.cs b/libs/cluster/Server/Replication/PrimaryOps/DisklessReplication/ReplicationSnapshotIterator.cs index 36cdcdf9c9..f304d346da 100644 --- a/libs/cluster/Server/Replication/PrimaryOps/DisklessReplication/ReplicationSnapshotIterator.cs +++ b/libs/cluster/Server/Replication/PrimaryOps/DisklessReplication/ReplicationSnapshotIterator.cs @@ -26,7 +26,6 @@ internal sealed unsafe class SnapshotIteratorManager long currentFlushEventCount = 0; long lastFlushEventCount = 0; - public long CheckpointCoveredAddress { get; private set; } public SnapshotIteratorManager(ReplicationSyncManager replicationSyncManager, CancellationToken cancellationToken, ILogger logger = null) @@ -195,8 +194,9 @@ public void OnStop(bool completed, long numberOfRecords, bool isMainStore, long // Wait for flush and response to complete replicationSyncManager.WaitForFlush().GetAwaiter().GetResult(); - // Enqueue version change commit - replicationSyncManager.ClusterProvider.storeWrapper.EnqueueCommit(isMainStore, targetVersion); + // Enqueue commit end marker + var entryType = isMainStore ? AofEntryType.MainStoreStreamingCheckpointEndCommit : AofEntryType.ObjectStoreStreamingCheckpointEndCommit; + replicationSyncManager.ClusterProvider.storeWrapper.EnqueueCommit(entryType, targetVersion); logger?.LogTrace("{OnStop} {store} {numberOfRecords} {targetVersion}", nameof(OnStop), isMainStore ? "MAIN STORE" : "OBJECT STORE", numberOfRecords, targetVersion); diff --git a/libs/cluster/Server/Replication/ReplicationManager.cs b/libs/cluster/Server/Replication/ReplicationManager.cs index 1920f2dbee..7274257efc 100644 --- a/libs/cluster/Server/Replication/ReplicationManager.cs +++ b/libs/cluster/Server/Replication/ReplicationManager.cs @@ -156,7 +156,10 @@ void CheckpointVersionShift(bool isMainStore, long oldVersion, long newVersion) { if (clusterProvider.clusterManager.CurrentConfig.LocalNodeRole == NodeRole.REPLICA) return; - storeWrapper.EnqueueCommit(isMainStore, newVersion, streaming: true); + var entryType = clusterProvider.serverOptions.ReplicaDisklessSync ? + (isMainStore ? AofEntryType.MainStoreStreamingCheckpointStartCommit : AofEntryType.ObjectStoreStreamingCheckpointStartCommit) : + (isMainStore ? AofEntryType.MainStoreCheckpointStartCommit : AofEntryType.ObjectStoreCheckpointStartCommit); + storeWrapper.EnqueueCommit(entryType, newVersion); } /// diff --git a/libs/server/AOF/AofEntryType.cs b/libs/server/AOF/AofEntryType.cs index bfb61f3b61..32e45a8830 100644 --- a/libs/server/AOF/AofEntryType.cs +++ b/libs/server/AOF/AofEntryType.cs @@ -3,21 +3,79 @@ namespace Garnet.server { - enum AofEntryType : byte + public enum AofEntryType : byte { + /// + /// Store upsert + /// StoreUpsert = 0x00, + /// + /// Store RMW + /// StoreRMW = 0x01, + /// + /// Store delete + /// StoreDelete = 0x02, + /// + /// Object store upsert + /// ObjectStoreUpsert = 0x10, + /// + /// Object store RMW + /// ObjectStoreRMW = 0x11, + /// + /// Object store delete + /// ObjectStoreDelete = 0x12, + /// + /// Transaction start + /// TxnStart = 0x20, + /// + /// Transaction commit + /// TxnCommit = 0x21, + /// + /// Transaction abort + /// TxnAbort = 0x22, - MainStoreCheckpointCommit = 0x30, - ObjectStoreCheckpointCommit = 0x31, - MainStoreStreamingCheckpointCommit = 0x40, - ObjectStoreStreamingCheckpointCommit = 0x41, + /// + /// Checkpoint for main store start + /// + MainStoreCheckpointStartCommit = 0x30, + /// + /// Checkpoint for object store start + /// + ObjectStoreCheckpointStartCommit = 0x31, + /// + /// Checkpoint for main store end + /// + MainStoreCheckpointEndCommit = 0x32, + /// + /// Checkpoint for object store end + /// + ObjectStoreCheckpointEndCommit = 0x33, + /// + /// Streaming checkpoint for main store start + /// + MainStoreStreamingCheckpointStartCommit = 0x40, + /// + /// Streaming checkpoint for object store start + /// + ObjectStoreStreamingCheckpointStartCommit = 0x41, + /// + /// Streaming checkpoint for main store end + /// + MainStoreStreamingCheckpointEndCommit = 0x42, + /// + /// Streaming checkpoint for object store end + /// + ObjectStoreStreamingCheckpointEndCommit = 0x43, + /// + /// StoredProcedure + /// StoredProcedure = 0x50, } diff --git a/libs/server/AOF/AofProcessor.cs b/libs/server/AOF/AofProcessor.cs index 1930dc130a..e3f8ee95c4 100644 --- a/libs/server/AOF/AofProcessor.cs +++ b/libs/server/AOF/AofProcessor.cs @@ -200,7 +200,7 @@ public unsafe void ProcessAofRecordInternal(byte* ptr, int length, bool asReplic switch (header.opType) { case AofEntryType.TxnStart: - inflightTxns[header.sessionID] = new List(); + inflightTxns[header.sessionID] = []; break; case AofEntryType.TxnAbort: case AofEntryType.TxnCommit: @@ -208,34 +208,33 @@ public unsafe void ProcessAofRecordInternal(byte* ptr, int length, bool asReplic // after a checkpoint, and the transaction belonged to the previous version. It can safely // be ignored. break; - case AofEntryType.MainStoreCheckpointCommit: - if (asReplica) - { - if (header.storeVersion > storeWrapper.store.CurrentVersion) - { - storeWrapper.TakeCheckpoint(false, StoreType.Main, logger); - } - } + case AofEntryType.MainStoreCheckpointStartCommit: + if (asReplica && header.storeVersion > storeWrapper.store.CurrentVersion) + _ = storeWrapper.TakeCheckpoint(false, StoreType.Main, logger); break; - case AofEntryType.ObjectStoreCheckpointCommit: - if (asReplica) - { - if (header.storeVersion > storeWrapper.objectStore.CurrentVersion) - { - storeWrapper.TakeCheckpoint(false, StoreType.Object, logger); - } - } + case AofEntryType.ObjectStoreCheckpointStartCommit: + if (asReplica && header.storeVersion > storeWrapper.objectStore.CurrentVersion) + _ = storeWrapper.TakeCheckpoint(false, StoreType.Object, logger); break; - case AofEntryType.MainStoreStreamingCheckpointCommit: + case AofEntryType.MainStoreCheckpointEndCommit: + case AofEntryType.ObjectStoreCheckpointEndCommit: + break; + case AofEntryType.MainStoreStreamingCheckpointStartCommit: Debug.Assert(storeWrapper.serverOptions.ReplicaDisklessSync); - if (header.storeVersion > storeWrapper.store.CurrentVersion) + if (asReplica && header.storeVersion > storeWrapper.store.CurrentVersion) storeWrapper.store.SetVersion(header.storeVersion); break; - case AofEntryType.ObjectStoreStreamingCheckpointCommit: + case AofEntryType.MainStoreStreamingCheckpointEndCommit: + Debug.Assert(storeWrapper.serverOptions.ReplicaDisklessSync); + break; + case AofEntryType.ObjectStoreStreamingCheckpointStartCommit: Debug.Assert(storeWrapper.serverOptions.ReplicaDisklessSync); - if (header.storeVersion > storeWrapper.store.CurrentVersion) + if (asReplica && header.storeVersion > storeWrapper.store.CurrentVersion) storeWrapper.objectStore.SetVersion(header.storeVersion); break; + case AofEntryType.ObjectStoreStreamingCheckpointEndCommit: + Debug.Assert(storeWrapper.serverOptions.ReplicaDisklessSync); + break; default: ReplayOp(ptr); break; @@ -423,7 +422,8 @@ static AofStoreType ToAofStoreType(AofEntryType type) AofEntryType.StoreUpsert or AofEntryType.StoreRMW or AofEntryType.StoreDelete => AofStoreType.MainStoreType, AofEntryType.ObjectStoreUpsert or AofEntryType.ObjectStoreRMW or AofEntryType.ObjectStoreDelete => AofStoreType.ObjectStoreType, AofEntryType.TxnStart or AofEntryType.TxnCommit or AofEntryType.TxnAbort or AofEntryType.StoredProcedure => AofStoreType.TxnType, - AofEntryType.MainStoreCheckpointCommit or AofEntryType.ObjectStoreCheckpointCommit => AofStoreType.CheckpointType, + AofEntryType.MainStoreCheckpointStartCommit or AofEntryType.ObjectStoreCheckpointStartCommit or AofEntryType.MainStoreStreamingCheckpointStartCommit or AofEntryType.ObjectStoreStreamingCheckpointStartCommit => AofStoreType.CheckpointType, + AofEntryType.MainStoreCheckpointEndCommit or AofEntryType.ObjectStoreCheckpointEndCommit or AofEntryType.MainStoreStreamingCheckpointEndCommit or AofEntryType.ObjectStoreStreamingCheckpointEndCommit => AofStoreType.CheckpointType, _ => throw new GarnetException($"Conversion to AofStoreType not possible for {type}"), }; } diff --git a/libs/server/StoreWrapper.cs b/libs/server/StoreWrapper.cs index c2f3298259..358e45b632 100644 --- a/libs/server/StoreWrapper.cs +++ b/libs/server/StoreWrapper.cs @@ -495,18 +495,13 @@ void DoCompaction() /// /// Append a checkpoint commit to the AOF /// - /// + /// /// - /// - public void EnqueueCommit(bool isMainStore, long version, bool streaming = false) + public void EnqueueCommit(AofEntryType entryType, long version) { - var opType = streaming ? - isMainStore ? AofEntryType.MainStoreStreamingCheckpointCommit : AofEntryType.ObjectStoreStreamingCheckpointCommit : - isMainStore ? AofEntryType.MainStoreCheckpointCommit : AofEntryType.ObjectStoreCheckpointCommit; - AofHeader header = new() { - opType = isMainStore ? AofEntryType.MainStoreCheckpointCommit : AofEntryType.ObjectStoreCheckpointCommit, + opType = entryType, storeVersion = version, sessionID = -1 }; @@ -849,18 +844,30 @@ private async Task InitiateCheckpoint(bool full, CheckpointType checkpointType, if (full) { if (storeType is StoreType.Main or StoreType.All) + { storeCheckpointResult = await store.TakeFullCheckpointAsync(checkpointType); + if (serverOptions.EnableCluster && clusterProvider.IsPrimary()) EnqueueCommit(AofEntryType.MainStoreCheckpointEndCommit, store.CurrentVersion); + } if (objectStore != null && (storeType == StoreType.Object || storeType == StoreType.All)) + { objectStoreCheckpointResult = await objectStore.TakeFullCheckpointAsync(checkpointType); + if (serverOptions.EnableCluster && clusterProvider.IsPrimary()) EnqueueCommit(AofEntryType.ObjectStoreCheckpointEndCommit, objectStore.CurrentVersion); + } } else { if (storeType is StoreType.Main or StoreType.All) + { storeCheckpointResult = await store.TakeHybridLogCheckpointAsync(checkpointType, tryIncremental); + if (serverOptions.EnableCluster && clusterProvider.IsPrimary()) EnqueueCommit(AofEntryType.MainStoreCheckpointEndCommit, store.CurrentVersion); + } if (objectStore != null && (storeType == StoreType.Object || storeType == StoreType.All)) + { objectStoreCheckpointResult = await objectStore.TakeHybridLogCheckpointAsync(checkpointType, tryIncremental); + if (serverOptions.EnableCluster && clusterProvider.IsPrimary()) EnqueueCommit(AofEntryType.ObjectStoreCheckpointEndCommit, objectStore.CurrentVersion); + } } // If cluster is enabled the replication manager is responsible for truncating AOF