diff --git a/Version.props b/Version.props
index 315929a5b1..8442e703bf 100644
--- a/Version.props
+++ b/Version.props
@@ -1,6 +1,6 @@
- 1.0.53
+ 1.0.54
diff --git a/libs/client/ClientSession/GarnetClientSessionIncremental.cs b/libs/client/ClientSession/GarnetClientSessionIncremental.cs
new file mode 100644
index 0000000000..35b91ad1fe
--- /dev/null
+++ b/libs/client/ClientSession/GarnetClientSessionIncremental.cs
@@ -0,0 +1,195 @@
+// Copyright (c) Microsoft Corporation.
+// Licensed under the MIT license.
+
+using System;
+using System.Diagnostics;
+using System.Threading;
+using System.Threading.Tasks;
+using Garnet.common;
+using Garnet.networking;
+using Microsoft.Extensions.Logging;
+using Tsavorite.core;
+
+namespace Garnet.client
+{
+ public sealed unsafe partial class GarnetClientSession : IServerHook, IMessageConsumer
+ {
+ bool isMainStore;
+ byte* curr, head;
+ int keyValuePairCount;
+ TaskCompletionSource currTcsIterationTask = null;
+
+ ///
+ /// Getter to compute how much space to leave at the front of the buffer
+ /// in order to write the maximum possible RESP length header (of length bufferSize)
+ ///
+ int ExtraSpace =>
+ 1 // $
+ + bufferSizeDigits // Number of digits in maximum possible length (will be written with zero padding)
+ + 2 // \r\n
+ + 4; // We write a 4-byte int keyCount at the start of the payload
+
+ ///
+ /// Check if header for batch is initialized
+ ///
+ public bool NeedsInitialization => curr == null;
+
+ ///
+ /// Flush and initialize buffers/parameters used for migrate command
+ ///
+ ///
+ public void InitializeIterationBuffer(TimeSpan iterationProgressFreq = default)
+ {
+ Flush();
+ currTcsIterationTask = null;
+ curr = head = null;
+ keyValuePairCount = 0;
+ this.iterationProgressFreq = default ? TimeSpan.FromSeconds(5) : iterationProgressFreq;
+ }
+
+ ///
+ /// Send key value pair and reset migrate buffers
+ ///
+ public Task SendAndResetIterationBuffer()
+ {
+ if (keyValuePairCount == 0) return null;
+
+ Debug.Assert(end - curr >= 2);
+ *curr++ = (byte)'\r';
+ *curr++ = (byte)'\n';
+
+ // Payload format = [$length\r\n][number of keys (4 bytes)][raw key value pairs]\r\n
+ var size = (int)(curr - 2 - head - (ExtraSpace - 4));
+ TrackIterationProgress(keyValuePairCount, size);
+ var success = RespWriteUtils.TryWritePaddedBulkStringLength(size, ExtraSpace - 4, ref head, end);
+ Debug.Assert(success);
+
+ // Number of key value pairs in payload
+ *(int*)head = keyValuePairCount;
+
+ // Reset offset and flush buffer
+ offset = curr;
+ Flush();
+ Interlocked.Increment(ref numCommands);
+
+ // Return outstanding task and reset current tcs
+ var task = currTcsIterationTask.Task;
+ currTcsIterationTask = null;
+ curr = head = null;
+ keyValuePairCount = 0;
+ return task;
+ }
+
+ ///
+ /// Try write key value pair for main store directly to the client buffer
+ ///
+ ///
+ ///
+ ///
+ ///
+ public bool TryWriteKeyValueSpanByte(ref SpanByte key, ref SpanByte value, out Task task)
+ {
+ task = null;
+ // Try write key value pair directly to client buffer
+ if (!WriteSerializedSpanByte(ref key, ref value))
+ {
+ // If failed to write because no space left send outstanding data and retrieve task
+ // Caller is responsible for retrying
+ task = SendAndResetIterationBuffer();
+ return false;
+ }
+
+ keyValuePairCount++;
+ return true;
+
+ bool WriteSerializedSpanByte(ref SpanByte key, ref SpanByte value)
+ {
+ var totalLen = key.TotalSize + value.TotalSize + 2 + 2;
+ if (totalLen > (int)(end - curr))
+ return false;
+
+ key.CopyTo(curr);
+ curr += key.TotalSize;
+ value.CopyTo(curr);
+ curr += value.TotalSize;
+ return true;
+ }
+ }
+
+ ///
+ /// Try write key value pair for object store directly to the client buffer
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ public bool TryWriteKeyValueByteArray(byte[] key, byte[] value, long expiration, out Task task)
+ {
+ task = null;
+ // Try write key value pair directly to client buffer
+ if (!WriteSerializedKeyValueByteArray(key, value, expiration))
+ {
+ // If failed to write because no space left send outstanding data and retrieve task
+ // Caller is responsible for retrying
+ task = SendAndResetIterationBuffer();
+ return false;
+ }
+
+ keyValuePairCount++;
+ return true;
+
+ bool WriteSerializedKeyValueByteArray(byte[] key, byte[] value, long expiration)
+ {
+ // We include space for newline at the end, to be added before sending
+ int totalLen = 4 + key.Length + 4 + value.Length + 8 + 2;
+ if (totalLen > (int)(end - curr))
+ return false;
+
+ *(int*)curr = key.Length;
+ curr += 4;
+ fixed (byte* keyPtr = key)
+ Buffer.MemoryCopy(keyPtr, curr, key.Length, key.Length);
+ curr += key.Length;
+
+ *(int*)curr = value.Length;
+ curr += 4;
+ fixed (byte* valPtr = value)
+ Buffer.MemoryCopy(valPtr, curr, value.Length, value.Length);
+ curr += value.Length;
+
+ *(long*)curr = expiration;
+ curr += 8;
+
+ return true;
+ }
+ }
+
+ long lastLog = 0;
+ long totalKeyCount = 0;
+ long totalPayloadSize = 0;
+ TimeSpan iterationProgressFreq;
+
+ ///
+ /// Logging of migrate session status
+ ///
+ ///
+ ///
+ ///
+ private void TrackIterationProgress(int keyCount, int size, bool completed = false)
+ {
+ totalKeyCount += keyCount;
+ totalPayloadSize += size;
+ var duration = TimeSpan.FromTicks(Stopwatch.GetTimestamp() - lastLog);
+ if (completed || lastLog == 0 || duration >= iterationProgressFreq)
+ {
+ logger?.LogTrace("[{op}]: isMainStore:({storeType}) totalKeyCount:({totalKeyCount}), totalPayloadSize:({totalPayloadSize} KB)",
+ completed ? "COMPLETED" : "MIGRATING",
+ isMainStore,
+ totalKeyCount.ToString("N0"),
+ ((long)((double)totalPayloadSize / 1024)).ToString("N0"));
+ lastLog = Stopwatch.GetTimestamp();
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/libs/client/ClientSession/GarnetClientSessionMigrationExtensions.cs b/libs/client/ClientSession/GarnetClientSessionMigrationExtensions.cs
index d508fe25f9..b23f56d34a 100644
--- a/libs/client/ClientSession/GarnetClientSessionMigrationExtensions.cs
+++ b/libs/client/ClientSession/GarnetClientSessionMigrationExtensions.cs
@@ -8,8 +8,6 @@
using System.Threading.Tasks;
using Garnet.common;
using Garnet.networking;
-using Microsoft.Extensions.Logging;
-using Tsavorite.core;
namespace Garnet.client
{
@@ -166,49 +164,16 @@ public Task SetSlotRange(Memory state, string nodeid, List<(int, i
return tcs.Task;
}
- ///
- /// Check if migrate command parameters need to be initialized
- ///
- public bool InitMigrateCommand => curr == null;
-
- ///
- /// Getter to compute how much space to leave at the front of the buffer
- /// in order to write the maximum possible RESP length header (of length bufferSize)
- ///
- int ExtraSpace =>
- 1 // $
- + bufferSizeDigits // Number of digits in maximum possible length (will be written with zero padding)
- + 2 // \r\n
- + 4; // We write a 4-byte int keyCount at the start of the payload
-
- bool isMainStore;
- byte* curr, head;
- int keyCount;
- TaskCompletionSource currTcsMigrate = null;
-
- ///
- /// Flush and initialize buffers/parameters used for migrate command
- ///
- ///
- public void InitMigrateBuffer(TimeSpan migrateProgressFreq = default)
- {
- Flush();
- currTcsMigrate = null;
- curr = head = null;
- keyCount = 0;
- this.migrateProgressFreq = default ? TimeSpan.FromSeconds(5) : migrateProgressFreq;
- }
-
///
/// Write parameters of CLUSTER MIGRATE directly to the client buffer
///
///
///
///
- public void SetClusterMigrate(string sourceNodeId, bool replace, bool isMainStore)
+ public void SetClusterMigrateHeader(string sourceNodeId, bool replace, bool isMainStore)
{
- currTcsMigrate = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
- tcsQueue.Enqueue(currTcsMigrate);
+ currTcsIterationTask = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
+ tcsQueue.Enqueue(currTcsIterationTask);
curr = offset;
this.isMainStore = isMainStore;
var storeType = isMainStore ? MAIN_STORE : OBJECT_STORE;
@@ -274,39 +239,6 @@ public void SetClusterMigrate(string sourceNodeId, bool replace, bool isMainStor
curr += ExtraSpace;
}
- ///
- /// Send key value pair and reset migrate buffers
- ///
- public Task SendAndResetMigrate()
- {
- if (keyCount == 0) return null;
-
- Debug.Assert(end - curr >= 2);
- *curr++ = (byte)'\r';
- *curr++ = (byte)'\n';
-
- // Payload format = [$length\r\n][number of keys (4 bytes)][raw key value pairs]\r\n
- var size = (int)(curr - 2 - head - (ExtraSpace - 4));
- TrackMigrateProgress(keyCount, size);
- var success = RespWriteUtils.TryWritePaddedBulkStringLength(size, ExtraSpace - 4, ref head, end);
- Debug.Assert(success);
-
- // Number of key value pairs in payload
- *(int*)head = keyCount;
-
- // Reset offset and flush buffer
- offset = curr;
- Flush();
- Interlocked.Increment(ref numCommands);
-
- // Return outstanding task and reset current tcs
- var task = currTcsMigrate.Task;
- currTcsMigrate = null;
- curr = head = null;
- keyCount = 0;
- return task;
- }
-
///
/// Signal completion of migration by sending an empty payload
///
@@ -316,7 +248,7 @@ public Task SendAndResetMigrate()
///
public Task CompleteMigrate(string sourceNodeId, bool replace, bool isMainStore)
{
- SetClusterMigrate(sourceNodeId, replace, isMainStore);
+ SetClusterMigrateHeader(sourceNodeId, replace, isMainStore);
Debug.Assert(end - curr >= 2);
*curr++ = (byte)'\r';
@@ -324,12 +256,12 @@ public Task CompleteMigrate(string sourceNodeId, bool replace, bool isMa
// Payload format = [$length\r\n][number of keys (4 bytes)][raw key value pairs]\r\n
var size = (int)(curr - 2 - head - (ExtraSpace - 4));
- TrackMigrateProgress(keyCount, size, completed: true);
+ TrackIterationProgress(keyValuePairCount, size, completed: true);
var success = RespWriteUtils.TryWritePaddedBulkStringLength(size, ExtraSpace - 4, ref head, end);
Debug.Assert(success);
// Number of key value pairs in payload
- *(int*)head = keyCount;
+ *(int*)head = keyValuePairCount;
// Reset offset and flush buffer
offset = curr;
@@ -337,123 +269,11 @@ public Task CompleteMigrate(string sourceNodeId, bool replace, bool isMa
Interlocked.Increment(ref numCommands);
// Return outstanding task and reset current tcs
- var task = currTcsMigrate.Task;
- currTcsMigrate = null;
+ var task = currTcsIterationTask.Task;
+ currTcsIterationTask = null;
curr = head = null;
- keyCount = 0;
+ keyValuePairCount = 0;
return task;
}
-
- ///
- /// Try write key value pair for main store directly to the client buffer
- ///
- ///
- ///
- ///
- ///
- public bool TryWriteKeyValueSpanByte(ref SpanByte key, ref SpanByte value, out Task migrateTask)
- {
- migrateTask = null;
- // Try write key value pair directly to client buffer
- if (!WriteSerializedSpanByte(ref key, ref value))
- {
- // If failed to write because no space left send outstanding data and retrieve task
- // Caller is responsible for retrying
- migrateTask = SendAndResetMigrate();
- return false;
- }
-
- keyCount++;
- return true;
- }
-
- ///
- /// Try write key value pair for object store directly to the client buffer
- ///
- ///
- ///
- ///
- ///
- ///
- public bool TryWriteKeyValueByteArray(byte[] key, byte[] value, long expiration, out Task migrateTask)
- {
- migrateTask = null;
- // Try write key value pair directly to client buffer
- if (!WriteSerializedKeyValueByteArray(key, value, expiration))
- {
- // If failed to write because no space left send outstanding data and retrieve task
- // Caller is responsible for retrying
- migrateTask = SendAndResetMigrate();
- return false;
- }
-
- keyCount++;
- return true;
- }
-
- private bool WriteSerializedSpanByte(ref SpanByte key, ref SpanByte value)
- {
- var totalLen = key.TotalSize + value.TotalSize + 2 + 2;
- if (totalLen > (int)(end - curr))
- return false;
-
- key.CopyTo(curr);
- curr += key.TotalSize;
- value.CopyTo(curr);
- curr += value.TotalSize;
- return true;
- }
-
- private bool WriteSerializedKeyValueByteArray(byte[] key, byte[] value, long expiration)
- {
- // We include space for newline at the end, to be added before sending
- int totalLen = 4 + key.Length + 4 + value.Length + 8 + 2;
- if (totalLen > (int)(end - curr))
- return false;
-
- *(int*)curr = key.Length;
- curr += 4;
- fixed (byte* keyPtr = key)
- Buffer.MemoryCopy(keyPtr, curr, key.Length, key.Length);
- curr += key.Length;
-
- *(int*)curr = value.Length;
- curr += 4;
- fixed (byte* valPtr = value)
- Buffer.MemoryCopy(valPtr, curr, value.Length, value.Length);
- curr += value.Length;
-
- *(long*)curr = expiration;
- curr += 8;
-
- return true;
- }
-
- long lastLog = 0;
- long totalKeyCount = 0;
- long totalPayloadSize = 0;
- TimeSpan migrateProgressFreq;
-
- ///
- /// Logging of migrate session status
- ///
- ///
- ///
- ///
- private void TrackMigrateProgress(int keyCount, int size, bool completed = false)
- {
- totalKeyCount += keyCount;
- totalPayloadSize += size;
- var duration = TimeSpan.FromTicks(Stopwatch.GetTimestamp() - lastLog);
- if (completed || lastLog == 0 || duration >= migrateProgressFreq)
- {
- logger?.LogTrace("[{op}]: isMainStore:({storeType}) totalKeyCount:({totalKeyCount}), totalPayloadSize:({totalPayloadSize} KB)",
- completed ? "COMPLETED" : "MIGRATING",
- isMainStore,
- totalKeyCount.ToString("N0"),
- ((long)((double)totalPayloadSize / 1024)).ToString("N0"));
- lastLog = Stopwatch.GetTimestamp();
- }
- }
}
}
\ No newline at end of file
diff --git a/libs/cluster/Server/Migration/MigrateSessionKeys.cs b/libs/cluster/Server/Migration/MigrateSessionKeys.cs
index 61942ec69c..abc5028eba 100644
--- a/libs/cluster/Server/Migration/MigrateSessionKeys.cs
+++ b/libs/cluster/Server/Migration/MigrateSessionKeys.cs
@@ -2,7 +2,6 @@
// Licensed under the MIT license.
using System;
-using System.Runtime.CompilerServices;
using Garnet.server;
using Microsoft.Extensions.Logging;
using Tsavorite.core;
@@ -74,7 +73,7 @@ private bool MigrateKeysFromMainStore()
}
// Flush data in client buffer
- if (!HandleMigrateTaskResponse(_gcs.SendAndResetMigrate()))
+ if (!HandleMigrateTaskResponse(_gcs.SendAndResetIterationBuffer()))
return false;
DeleteKeys();
@@ -130,7 +129,7 @@ private bool MigrateKeysFromObjectStore()
}
// Flush data in client buffer
- if (!HandleMigrateTaskResponse(_gcs.SendAndResetMigrate()))
+ if (!HandleMigrateTaskResponse(_gcs.SendAndResetIterationBuffer()))
return false;
}
finally
@@ -183,14 +182,14 @@ public bool MigrateKeys()
return false;
// Migrate main store keys
- _gcs.InitMigrateBuffer(clusterProvider.storeWrapper.loggingFrequncy);
+ _gcs.InitializeIterationBuffer(clusterProvider.storeWrapper.loggingFrequncy);
if (!MigrateKeysFromMainStore())
return false;
// Migrate object store keys
if (!clusterProvider.serverOptions.DisableObjects)
{
- _gcs.InitMigrateBuffer(clusterProvider.storeWrapper.loggingFrequncy);
+ _gcs.InitializeIterationBuffer(clusterProvider.storeWrapper.loggingFrequncy);
if (!MigrateKeysFromObjectStore())
return false;
}
diff --git a/libs/cluster/Server/Migration/MigrateSessionSend.cs b/libs/cluster/Server/Migration/MigrateSessionSend.cs
index 6beaeb2b63..9b9a1ad802 100644
--- a/libs/cluster/Server/Migration/MigrateSessionSend.cs
+++ b/libs/cluster/Server/Migration/MigrateSessionSend.cs
@@ -19,8 +19,8 @@ internal sealed unsafe partial class MigrateSession : IDisposable
private bool WriteOrSendMainStoreKeyValuePair(ref SpanByte key, ref SpanByte value)
{
// Check if we need to initialize cluster migrate command arguments
- if (_gcs.InitMigrateCommand)
- _gcs.SetClusterMigrate(_sourceNodeId, _replaceOption, isMainStore: true);
+ if (_gcs.NeedsInitialization)
+ _gcs.SetClusterMigrateHeader(_sourceNodeId, _replaceOption, isMainStore: true);
// Try write serialized key value to client buffer
while (!_gcs.TryWriteKeyValueSpanByte(ref key, ref value, out var task))
@@ -30,7 +30,7 @@ private bool WriteOrSendMainStoreKeyValuePair(ref SpanByte key, ref SpanByte val
return false;
// re-initialize cluster migrate command parameters
- _gcs.SetClusterMigrate(_sourceNodeId, _replaceOption, isMainStore: true);
+ _gcs.SetClusterMigrateHeader(_sourceNodeId, _replaceOption, isMainStore: true);
}
return true;
}
@@ -45,15 +45,15 @@ private bool WriteOrSendMainStoreKeyValuePair(ref SpanByte key, ref SpanByte val
private bool WriteOrSendObjectStoreKeyValuePair(byte[] key, byte[] value, long expiration)
{
// Check if we need to initialize cluster migrate command arguments
- if (_gcs.InitMigrateCommand)
- _gcs.SetClusterMigrate(_sourceNodeId, _replaceOption, isMainStore: false);
+ if (_gcs.NeedsInitialization)
+ _gcs.SetClusterMigrateHeader(_sourceNodeId, _replaceOption, isMainStore: false);
while (!_gcs.TryWriteKeyValueByteArray(key, value, expiration, out var task))
{
// Flush key value pairs in the buffer
if (!HandleMigrateTaskResponse(task))
return false;
- _gcs.SetClusterMigrate(_sourceNodeId, _replaceOption, isMainStore: false);
+ _gcs.SetClusterMigrateHeader(_sourceNodeId, _replaceOption, isMainStore: false);
}
return true;
}
diff --git a/libs/cluster/Server/Replication/PrimaryOps/AofSyncTaskInfo.cs b/libs/cluster/Server/Replication/PrimaryOps/AofSyncTaskInfo.cs
index 5464171dad..7065ece0e4 100644
--- a/libs/cluster/Server/Replication/PrimaryOps/AofSyncTaskInfo.cs
+++ b/libs/cluster/Server/Replication/PrimaryOps/AofSyncTaskInfo.cs
@@ -5,6 +5,7 @@
using System.Threading;
using System.Threading.Tasks;
using Garnet.client;
+using Garnet.common;
using Microsoft.Extensions.Logging;
using Tsavorite.core;
@@ -23,6 +24,21 @@ internal sealed class AofSyncTaskInfo : IBulkLogEntryConsumer, IDisposable
readonly long startAddress;
public long previousAddress;
+ ///
+ /// Used to mark if syncing is in progress
+ ///
+ SingleWriterMultiReaderLock aofSyncInProgress;
+
+ ///
+ /// Check if client connection is healthy
+ ///
+ public bool IsConnected => garnetClient != null && garnetClient.IsConnected;
+
+ ///
+ /// Return start address for this AOF iterator
+ ///
+ public long StartAddress => startAddress;
+
public AofSyncTaskInfo(
ClusterProvider clusterProvider,
AofTaskStore aofTaskStore,
@@ -40,7 +56,7 @@ public AofSyncTaskInfo(
this.garnetClient = garnetClient;
this.startAddress = startAddress;
previousAddress = startAddress;
- this.cts = new CancellationTokenSource();
+ cts = new CancellationTokenSource();
}
public void Dispose()
@@ -53,6 +69,11 @@ public void Dispose()
// Finally, dispose the cts
cts?.Dispose();
+
+ // Dispose only if AOF sync has not started
+ // otherwise sync task will dispose the client
+ if (aofSyncInProgress.TryWriteLock())
+ garnetClient?.Dispose();
}
public unsafe void Consume(byte* payloadPtr, int payloadLength, long currentAddress, long nextAddress, bool isProtected)
@@ -87,9 +108,17 @@ public async Task ReplicaSyncTask()
{
logger?.LogInformation("Starting ReplicationManager.ReplicaSyncTask for remote node {remoteNodeId} starting from address {address}", remoteNodeId, startAddress);
+ var failedToStart = false;
try
{
- garnetClient.Connect();
+ if (!aofSyncInProgress.TryWriteLock())
+ {
+ logger?.LogWarning("{method} AOF sync for {remoteNodeId} failed to start", nameof(ReplicaSyncTask), remoteNodeId);
+ failedToStart = true;
+ return;
+ }
+
+ if (!IsConnected) garnetClient.Connect();
iter = clusterProvider.storeWrapper.appendOnlyFile.ScanSingle(startAddress, long.MaxValue, scanUncommitted: true, recover: false, logger: logger);
@@ -105,7 +134,7 @@ public async Task ReplicaSyncTask()
}
finally
{
- garnetClient.Dispose();
+ if (!failedToStart) garnetClient.Dispose();
var (address, port) = clusterProvider.clusterManager.CurrentConfig.GetWorkerAddressFromNodeId(remoteNodeId);
logger?.LogWarning("AofSync task terminated; client disposed {remoteNodeId} {address} {port} {currentAddress}", remoteNodeId, address, port, previousAddress);
diff --git a/libs/cluster/Session/RespClusterMigrateCommands.cs b/libs/cluster/Session/RespClusterMigrateCommands.cs
index b50a23cb29..26f6664b4c 100644
--- a/libs/cluster/Session/RespClusterMigrateCommands.cs
+++ b/libs/cluster/Session/RespClusterMigrateCommands.cs
@@ -103,7 +103,7 @@ private bool NetworkClusterMigrate(out bool invalidParameters)
var keyCount = *(int*)payloadPtr;
payloadPtr += 4;
var i = 0;
- TrackImportProgress(keyCount, isMainStore: true, keyCount == 0);
+ TrackImportProgress(keyCount, isMainStore: false, keyCount == 0);
while (i < keyCount)
{
if (!RespReadUtils.TryReadSerializedData(out var key, out var data, out var expiration, ref payloadPtr, payloadEndPtr))
diff --git a/libs/common/SingleWriterMultiReaderLock.cs b/libs/common/SingleWriterMultiReaderLock.cs
index 6e6b12ec2d..6c448399f5 100644
--- a/libs/common/SingleWriterMultiReaderLock.cs
+++ b/libs/common/SingleWriterMultiReaderLock.cs
@@ -87,6 +87,25 @@ public void ReadUnlock()
Interlocked.Decrement(ref _lock);
}
+ ///
+ /// Continuously attempt to acquire write lock until lock is acquired or it is write locked
+ ///
+ /// Return true if current thread is the one that acquired write lock
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ public bool OneWriteLock()
+ {
+ while (true)
+ {
+ var isWriteLocked = IsWriteLocked;
+ var acquiredWriteLock = TryWriteLock();
+ if (isWriteLocked || acquiredWriteLock)
+ {
+ return acquiredWriteLock;
+ }
+ Thread.Yield();
+ }
+ }
+
///
public override string ToString()
=> _lock.ToString();
diff --git a/playground/TstRunner/Program.cs b/playground/TstRunner/Program.cs
index ad8d1b36a2..515fb2288a 100644
--- a/playground/TstRunner/Program.cs
+++ b/playground/TstRunner/Program.cs
@@ -21,7 +21,7 @@ static void Main()
while (true)
{
var clusterMigrateTests = new ClusterMigrateTests(false);
- var clusterReplicationTests = new ClusterReplicationTests(false);
+ var clusterReplicationTests = new ClusterReplicationBaseTests();
Console.WriteLine($">>>>>>>>>> run: {i} StartedOn: {DateTime.Now}");
swatch.Start();
diff --git a/test/Garnet.test.cluster/ClusterReplicationAsyncReplayTests.cs b/test/Garnet.test.cluster/ClusterReplicationAsyncReplayTests.cs
deleted file mode 100644
index df98c227eb..0000000000
--- a/test/Garnet.test.cluster/ClusterReplicationAsyncReplayTests.cs
+++ /dev/null
@@ -1,10 +0,0 @@
-// Copyright (c) Microsoft Corporation.
-// Licensed under the MIT license.
-
-using NUnit.Framework;
-
-namespace Garnet.test.cluster
-{
- [TestFixture(false, true), NonParallelizable]
- public unsafe class ClusterReplicationAsyncReplayTests(bool UseTLS = false, bool asyncReplay = false) : ClusterReplicationTests(UseTLS, asyncReplay) { }
-}
\ No newline at end of file
diff --git a/test/Garnet.test.cluster/ClusterReplicationTLSTests.cs b/test/Garnet.test.cluster/ClusterReplicationTLSTests.cs
deleted file mode 100644
index 766819b0ec..0000000000
--- a/test/Garnet.test.cluster/ClusterReplicationTLSTests.cs
+++ /dev/null
@@ -1,72 +0,0 @@
-// Copyright (c) Microsoft Corporation.
-// Licensed under the MIT license.
-
-using NUnit.Framework;
-
-namespace Garnet.test.cluster
-{
- [TestFixture, NonParallelizable]
- public unsafe class ClusterTLSRT
- {
- ClusterReplicationTests tests;
-
- [SetUp]
- public void Setup()
- {
- tests = new ClusterReplicationTests(UseTLS: true);
- tests.Setup();
- }
-
- [TearDown]
- public void TearDown()
- {
- tests.TearDown();
- tests = null;
- }
-
- [Test, Order(1)]
- [Category("REPLICATION")]
- public void ClusterTLSR([Values] bool disableObjects)
- => tests.ClusterSRTest(disableObjects);
-
- [Test, Order(2)]
- [Category("REPLICATION")]
- public void ClusterTLSRCheckpointRestartSecondary([Values] bool performRMW, [Values] bool disableObjects)
- => tests.ClusterSRNoCheckpointRestartSecondary(performRMW, disableObjects);
-
- [Test, Order(3)]
- [Category("REPLICATION")]
- public void ClusterTLSRPrimaryCheckpoint([Values] bool performRMW, [Values] bool disableObjects)
- => tests.ClusterSRPrimaryCheckpoint(performRMW, disableObjects);
-
- [Test, Order(4)]
- [Category("REPLICATION")]
- public void ClusterTLSRPrimaryCheckpointRetrieve([Values] bool performRMW, [Values] bool disableObjects, [Values] bool lowMemory, [Values] bool manySegments)
- => tests.ClusterSRPrimaryCheckpointRetrieve(performRMW, disableObjects, lowMemory, manySegments);
-
- [Test, Order(5)]
- [Category("REPLICATION")]
- public void ClusterTLSCheckpointRetrieveDisableStorageTier([Values] bool performRMW, [Values] bool disableObjects)
- => tests.ClusterCheckpointRetrieveDisableStorageTier(performRMW, disableObjects);
-
- [Test, Order(6)]
- [Category("REPLICATION")]
- public void ClusterTLSRAddReplicaAfterPrimaryCheckpoint([Values] bool performRMW, [Values] bool disableObjects, [Values] bool lowMemory)
- => tests.ClusterSRAddReplicaAfterPrimaryCheckpoint(performRMW, disableObjects, lowMemory);
-
- [Test, Order(7)]
- [Category("REPLICATION")]
- public void ClusterTLSRPrimaryRestart([Values] bool performRMW, [Values] bool disableObjects)
- => tests.ClusterSRPrimaryRestart(performRMW, disableObjects);
-
- [Test, Order(8)]
- [Category("REPLICATION")]
- public void ClusterTLSRRedirectWrites()
- => tests.ClusterSRRedirectWrites();
-
- [Test, Order(9)]
- [Category("REPLICATION")]
- public void ClusterTLSRReplicaOfTest([Values] bool performRMW)
- => tests.ClusterSRReplicaOfTest(performRMW);
- }
-}
\ No newline at end of file
diff --git a/test/Garnet.test.cluster/ClusterTestUtils.cs b/test/Garnet.test.cluster/ClusterTestUtils.cs
index 9d78ea8c22..3bb8189cdd 100644
--- a/test/Garnet.test.cluster/ClusterTestUtils.cs
+++ b/test/Garnet.test.cluster/ClusterTestUtils.cs
@@ -1864,7 +1864,7 @@ public string ClusterReplicate(IPEndPoint endPoint, string primaryNodeId, bool a
try
{
var server = redis.GetServer(endPoint);
- var args = async ? new List