diff --git a/libs/cluster/Server/Replication/PrimaryOps/DisklessReplication/ReplicationSnapshotIterator.cs b/libs/cluster/Server/Replication/PrimaryOps/DisklessReplication/ReplicationSnapshotIterator.cs index 36cdcdf9c9..30a04798d3 100644 --- a/libs/cluster/Server/Replication/PrimaryOps/DisklessReplication/ReplicationSnapshotIterator.cs +++ b/libs/cluster/Server/Replication/PrimaryOps/DisklessReplication/ReplicationSnapshotIterator.cs @@ -195,9 +195,6 @@ public void OnStop(bool completed, long numberOfRecords, bool isMainStore, long // Wait for flush and response to complete replicationSyncManager.WaitForFlush().GetAwaiter().GetResult(); - // Enqueue version change commit - replicationSyncManager.ClusterProvider.storeWrapper.EnqueueCommit(isMainStore, targetVersion); - logger?.LogTrace("{OnStop} {store} {numberOfRecords} {targetVersion}", nameof(OnStop), isMainStore ? "MAIN STORE" : "OBJECT STORE", numberOfRecords, targetVersion); diff --git a/libs/cluster/Server/Replication/ReplicationManager.cs b/libs/cluster/Server/Replication/ReplicationManager.cs index 1920f2dbee..072bffba82 100644 --- a/libs/cluster/Server/Replication/ReplicationManager.cs +++ b/libs/cluster/Server/Replication/ReplicationManager.cs @@ -156,7 +156,7 @@ void CheckpointVersionShift(bool isMainStore, long oldVersion, long newVersion) { if (clusterProvider.clusterManager.CurrentConfig.LocalNodeRole == NodeRole.REPLICA) return; - storeWrapper.EnqueueCommit(isMainStore, newVersion, streaming: true); + storeWrapper.EnqueueCommit(isMainStore, newVersion, diskless: clusterProvider.serverOptions.ReplicaDisklessSync); } /// diff --git a/libs/server/Databases/DatabaseManagerBase.cs b/libs/server/Databases/DatabaseManagerBase.cs index cd76d602b2..55ea871735 100644 --- a/libs/server/Databases/DatabaseManagerBase.cs +++ b/libs/server/Databases/DatabaseManagerBase.cs @@ -113,7 +113,7 @@ public abstract Task TaskCheckpointBasedOnAofSizeLimitAsync(long aofSizeLimit, public abstract void ResetRevivificationStats(); /// - public abstract void EnqueueCommit(bool isMainStore, long version, int dbId = 0, bool streaming = false); + public abstract void EnqueueCommit(bool isMainStore, long version, int dbId = 0, bool diskless = false); /// public abstract GarnetDatabase[] GetDatabasesSnapshot(); diff --git a/libs/server/Databases/IDatabaseManager.cs b/libs/server/Databases/IDatabaseManager.cs index fcfe0b5e8d..5d0516bde5 100644 --- a/libs/server/Databases/IDatabaseManager.cs +++ b/libs/server/Databases/IDatabaseManager.cs @@ -188,8 +188,8 @@ public Task TaskCheckpointBasedOnAofSizeLimitAsync(long aofSizeLimit, Cancellati /// /// /// - /// - public void EnqueueCommit(bool isMainStore, long version, int dbId = 0, bool streaming = false); + /// + public void EnqueueCommit(bool isMainStore, long version, int dbId = 0, bool diskless = false); /// /// Get a snapshot of all active databases diff --git a/libs/server/Databases/MultiDatabaseManager.cs b/libs/server/Databases/MultiDatabaseManager.cs index 16f18e1c02..11e90774dd 100644 --- a/libs/server/Databases/MultiDatabaseManager.cs +++ b/libs/server/Databases/MultiDatabaseManager.cs @@ -540,12 +540,12 @@ public override void ResetRevivificationStats() } } - public override void EnqueueCommit(bool isMainStore, long version, int dbId = 0, bool streaming = false) + public override void EnqueueCommit(bool isMainStore, long version, int dbId = 0, bool diskless = false) { if (!TryGetOrAddDatabase(dbId, out var db)) throw new GarnetException($"Database with ID {dbId} was not found."); - EnqueueDatabaseCommit(ref db, isMainStore, version, streaming); + EnqueueDatabaseCommit(ref db, isMainStore, version, diskless); } /// diff --git a/libs/server/Databases/SingleDatabaseManager.cs b/libs/server/Databases/SingleDatabaseManager.cs index 51c4cf7f31..d2a320718f 100644 --- a/libs/server/Databases/SingleDatabaseManager.cs +++ b/libs/server/Databases/SingleDatabaseManager.cs @@ -284,11 +284,11 @@ public override void ResetRevivificationStats() } /// - public override void EnqueueCommit(bool isMainStore, long version, int dbId = 0, bool streaming = false) + public override void EnqueueCommit(bool isMainStore, long version, int dbId = 0, bool diskless = false) { ArgumentOutOfRangeException.ThrowIfNotEqual(dbId, 0); - EnqueueDatabaseCommit(ref DefaultDatabase, isMainStore, version, streaming); + EnqueueDatabaseCommit(ref DefaultDatabase, isMainStore, version, diskless); } public override GarnetDatabase[] GetDatabasesSnapshot() => [DefaultDatabase]; diff --git a/libs/server/StoreWrapper.cs b/libs/server/StoreWrapper.cs index 5249abc8b7..5ee268eea9 100644 --- a/libs/server/StoreWrapper.cs +++ b/libs/server/StoreWrapper.cs @@ -325,9 +325,9 @@ public void RecoverCheckpoint(bool replicaRecover = false, bool recoverMainStore /// /// /// - /// - public void EnqueueCommit(bool isMainStore, long version, int dbId = 0, bool streaming = false) => - this.databaseManager.EnqueueCommit(isMainStore, version, dbId, streaming); + /// + public void EnqueueCommit(bool isMainStore, long version, int dbId = 0, bool diskless = false) => + this.databaseManager.EnqueueCommit(isMainStore, version, dbId, diskless); /// /// Reset diff --git a/test/Garnet.test.cluster/ClusterTestUtils.cs b/test/Garnet.test.cluster/ClusterTestUtils.cs index 668bf78b58..e7c772d55d 100644 --- a/test/Garnet.test.cluster/ClusterTestUtils.cs +++ b/test/Garnet.test.cluster/ClusterTestUtils.cs @@ -2761,8 +2761,14 @@ public int GetStoreCurrentVersion(int nodeIndex, bool isMainStore, ILogger logge if (line.StartsWith('#')) continue; var field = line.Trim().Split(':'); + + // Remove 'db0.' prefix + var sepIdx = field[0].IndexOf('.'); + if (sepIdx == -1) + continue; + var key = field[0].Substring(sepIdx + 1); - if (!Enum.TryParse(field[0], ignoreCase: true, out StoreInfoItem type)) + if (!Enum.TryParse(key, ignoreCase: true, out StoreInfoItem type)) continue; if (infoItems.Contains(type))