Skip to content

Commit

Permalink
some merge fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
TalZaccai committed Mar 5, 2025
1 parent 8476e35 commit 52e8339
Show file tree
Hide file tree
Showing 8 changed files with 18 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -195,9 +195,6 @@ public void OnStop(bool completed, long numberOfRecords, bool isMainStore, long
// Wait for flush and response to complete
replicationSyncManager.WaitForFlush().GetAwaiter().GetResult();

// Enqueue version change commit
replicationSyncManager.ClusterProvider.storeWrapper.EnqueueCommit(isMainStore, targetVersion);

logger?.LogTrace("{OnStop} {store} {numberOfRecords} {targetVersion}",
nameof(OnStop), isMainStore ? "MAIN STORE" : "OBJECT STORE", numberOfRecords, targetVersion);

Expand Down
2 changes: 1 addition & 1 deletion libs/cluster/Server/Replication/ReplicationManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ void CheckpointVersionShift(bool isMainStore, long oldVersion, long newVersion)
{
if (clusterProvider.clusterManager.CurrentConfig.LocalNodeRole == NodeRole.REPLICA)
return;
storeWrapper.EnqueueCommit(isMainStore, newVersion, streaming: true);
storeWrapper.EnqueueCommit(isMainStore, newVersion, diskless: clusterProvider.serverOptions.ReplicaDisklessSync);
}

/// <summary>
Expand Down
2 changes: 1 addition & 1 deletion libs/server/Databases/DatabaseManagerBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public abstract Task TaskCheckpointBasedOnAofSizeLimitAsync(long aofSizeLimit,
public abstract void ResetRevivificationStats();

/// <inheritdoc/>
public abstract void EnqueueCommit(bool isMainStore, long version, int dbId = 0, bool streaming = false);
public abstract void EnqueueCommit(bool isMainStore, long version, int dbId = 0, bool diskless = false);

/// <inheritdoc/>
public abstract GarnetDatabase[] GetDatabasesSnapshot();
Expand Down
4 changes: 2 additions & 2 deletions libs/server/Databases/IDatabaseManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,8 @@ public Task TaskCheckpointBasedOnAofSizeLimitAsync(long aofSizeLimit, Cancellati
/// <param name="isMainStore"></param>
/// <param name="version"></param>
/// <param name="dbId"></param>
/// <param name="streaming"></param>
public void EnqueueCommit(bool isMainStore, long version, int dbId = 0, bool streaming = false);
/// <param name="diskless"></param>
public void EnqueueCommit(bool isMainStore, long version, int dbId = 0, bool diskless = false);

/// <summary>
/// Get a snapshot of all active databases
Expand Down
4 changes: 2 additions & 2 deletions libs/server/Databases/MultiDatabaseManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -540,12 +540,12 @@ public override void ResetRevivificationStats()
}
}

public override void EnqueueCommit(bool isMainStore, long version, int dbId = 0, bool streaming = false)
public override void EnqueueCommit(bool isMainStore, long version, int dbId = 0, bool diskless = false)
{
if (!TryGetOrAddDatabase(dbId, out var db))
throw new GarnetException($"Database with ID {dbId} was not found.");

EnqueueDatabaseCommit(ref db, isMainStore, version, streaming);
EnqueueDatabaseCommit(ref db, isMainStore, version, diskless);
}

/// <inheritdoc/>
Expand Down
4 changes: 2 additions & 2 deletions libs/server/Databases/SingleDatabaseManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -284,11 +284,11 @@ public override void ResetRevivificationStats()
}

/// <inheritdoc/>
public override void EnqueueCommit(bool isMainStore, long version, int dbId = 0, bool streaming = false)
public override void EnqueueCommit(bool isMainStore, long version, int dbId = 0, bool diskless = false)
{
ArgumentOutOfRangeException.ThrowIfNotEqual(dbId, 0);

EnqueueDatabaseCommit(ref DefaultDatabase, isMainStore, version, streaming);
EnqueueDatabaseCommit(ref DefaultDatabase, isMainStore, version, diskless);
}

public override GarnetDatabase[] GetDatabasesSnapshot() => [DefaultDatabase];
Expand Down
6 changes: 3 additions & 3 deletions libs/server/StoreWrapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -325,9 +325,9 @@ public void RecoverCheckpoint(bool replicaRecover = false, bool recoverMainStore
/// <param name="isMainStore"></param>
/// <param name="version"></param>
/// <param name="dbId"></param>
/// <param name="streaming"></param>
public void EnqueueCommit(bool isMainStore, long version, int dbId = 0, bool streaming = false) =>
this.databaseManager.EnqueueCommit(isMainStore, version, dbId, streaming);
/// <param name="diskless"></param>
public void EnqueueCommit(bool isMainStore, long version, int dbId = 0, bool diskless = false) =>
this.databaseManager.EnqueueCommit(isMainStore, version, dbId, diskless);

/// <summary>
/// Reset
Expand Down
8 changes: 7 additions & 1 deletion test/Garnet.test.cluster/ClusterTestUtils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2761,8 +2761,14 @@ public int GetStoreCurrentVersion(int nodeIndex, bool isMainStore, ILogger logge
if (line.StartsWith('#'))
continue;
var field = line.Trim().Split(':');

// Remove 'db0.' prefix
var sepIdx = field[0].IndexOf('.');
if (sepIdx == -1)
continue;
var key = field[0].Substring(sepIdx + 1);

if (!Enum.TryParse(field[0], ignoreCase: true, out StoreInfoItem type))
if (!Enum.TryParse(key, ignoreCase: true, out StoreInfoItem type))
continue;

if (infoItems.Contains(type))
Expand Down

0 comments on commit 52e8339

Please sign in to comment.