diff --git a/Version.props b/Version.props
index dcc15afe0e..5a3d59468b 100644
--- a/Version.props
+++ b/Version.props
@@ -1,6 +1,6 @@
- 1.0.56
+ 1.0.57
diff --git a/libs/cluster/ClusterFactory.cs b/libs/cluster/ClusterFactory.cs
index 8a8ef903e6..a174beaa5a 100644
--- a/libs/cluster/ClusterFactory.cs
+++ b/libs/cluster/ClusterFactory.cs
@@ -13,8 +13,8 @@ namespace Garnet.cluster
public class ClusterFactory : IClusterFactory
{
///
- public DeviceLogCommitCheckpointManager CreateCheckpointManager(INamedDeviceFactory deviceFactory, ICheckpointNamingScheme checkpointNamingScheme, bool isMainStore, ILogger logger = default)
- => new ReplicationLogCheckpointManager(deviceFactory, checkpointNamingScheme, isMainStore, logger: logger);
+ public DeviceLogCommitCheckpointManager CreateCheckpointManager(INamedDeviceFactoryCreator deviceFactoryCreator, ICheckpointNamingScheme checkpointNamingScheme, bool isMainStore, ILogger logger = default)
+ => new ReplicationLogCheckpointManager(deviceFactoryCreator, checkpointNamingScheme, isMainStore, logger: logger);
///
public IClusterProvider CreateClusterProvider(StoreWrapper store)
diff --git a/libs/cluster/Server/Replication/ReplicationLogCheckpointManager.cs b/libs/cluster/Server/Replication/ReplicationLogCheckpointManager.cs
index 7ab020d178..09ccc5217c 100644
--- a/libs/cluster/Server/Replication/ReplicationLogCheckpointManager.cs
+++ b/libs/cluster/Server/Replication/ReplicationLogCheckpointManager.cs
@@ -11,12 +11,12 @@
namespace Garnet.cluster
{
internal sealed class ReplicationLogCheckpointManager(
- INamedDeviceFactory deviceFactory,
+ INamedDeviceFactoryCreator deviceFactoryCreator,
ICheckpointNamingScheme checkpointNamingScheme,
bool isMainStore,
bool removeOutdated = false,
int fastCommitThrottleFreq = 0,
- ILogger logger = null) : DeviceLogCommitCheckpointManager(deviceFactory, checkpointNamingScheme, removeOutdated: false, fastCommitThrottleFreq, logger), IDisposable
+ ILogger logger = null) : DeviceLogCommitCheckpointManager(deviceFactoryCreator, checkpointNamingScheme, removeOutdated: false, fastCommitThrottleFreq, logger), IDisposable
{
public long CurrentSafeAofAddress = 0;
public long RecoveredSafeAofAddress = 0;
diff --git a/libs/common/HashSlotUtils.cs b/libs/common/HashSlotUtils.cs
index 3fbb6a6c25..f1811ce3a7 100644
--- a/libs/common/HashSlotUtils.cs
+++ b/libs/common/HashSlotUtils.cs
@@ -102,7 +102,10 @@ public static unsafe ushort HashSlot(byte* keyPtr, int ksize)
var end = keyPtr + ksize;
// Find first occurence of '{'
- while (startPtr < end && *startPtr != '{') { startPtr++; }
+ while (startPtr < end && *startPtr != '{')
+ {
+ startPtr++;
+ }
// Return early if did not find '{'
if (startPtr == end) return (ushort)(Hash(keyPtr, ksize) & 16383);
diff --git a/libs/common/StreamProvider.cs b/libs/common/StreamProvider.cs
index 68912f2d6d..ad3bf0d36c 100644
--- a/libs/common/StreamProvider.cs
+++ b/libs/common/StreamProvider.cs
@@ -149,20 +149,21 @@ public static IStreamProvider GetStreamProvider(FileLocationType locationType, s
internal class AzureStreamProvider : StreamProviderBase
{
private readonly string _connectionString;
+ private readonly AzureStorageNamedDeviceFactoryCreator azureStorageNamedDeviceFactoryCreator;
public AzureStreamProvider(string connectionString)
{
this._connectionString = connectionString;
+ this.azureStorageNamedDeviceFactoryCreator = new AzureStorageNamedDeviceFactoryCreator(this._connectionString, default);
}
protected override IDevice GetDevice(string path)
{
var fileInfo = new FileInfo(path);
- INamedDeviceFactory settingsDeviceFactoryCreator = new AzureStorageNamedDeviceFactory(this._connectionString, default);
// Get the container info, if it does not exist it will be created
- settingsDeviceFactoryCreator.Initialize($"{fileInfo.Directory?.Name}");
- var settingsDevice = settingsDeviceFactoryCreator.Get(new FileDescriptor("", fileInfo.Name));
+ var settingsDeviceFactory = azureStorageNamedDeviceFactoryCreator.Create($"{fileInfo.Directory?.Name}");
+ var settingsDevice = settingsDeviceFactory.Get(new FileDescriptor("", fileInfo.Name));
settingsDevice.Initialize(MaxConfigFileSizeAligned, epoch: null, omitSegmentIdFromFilename: false);
return settingsDevice;
}
@@ -183,19 +184,20 @@ protected override long GetBytesToWrite(byte[] bytes, IDevice device)
internal class LocalFileStreamProvider : StreamProviderBase
{
private readonly bool readOnly;
+ private readonly LocalStorageNamedDeviceFactoryCreator localDeviceFactoryCreator;
public LocalFileStreamProvider(bool readOnly = false)
{
this.readOnly = readOnly;
+ this.localDeviceFactoryCreator = new LocalStorageNamedDeviceFactoryCreator(disableFileBuffering: false, readOnly: readOnly);
}
protected override IDevice GetDevice(string path)
{
var fileInfo = new FileInfo(path);
- INamedDeviceFactory settingsDeviceFactoryCreator = new LocalStorageNamedDeviceFactory(disableFileBuffering: false, readOnly: readOnly);
- settingsDeviceFactoryCreator.Initialize("");
- var settingsDevice = settingsDeviceFactoryCreator.Get(new FileDescriptor(fileInfo.DirectoryName, fileInfo.Name));
+ var settingsDeviceFactory = localDeviceFactoryCreator.Create("");
+ var settingsDevice = settingsDeviceFactory.Get(new FileDescriptor(fileInfo.DirectoryName, fileInfo.Name));
settingsDevice.Initialize(-1, epoch: null, omitSegmentIdFromFilename: true);
return settingsDevice;
}
diff --git a/libs/host/Configuration/Options.cs b/libs/host/Configuration/Options.cs
index 41716c4d72..b4dfa81aa1 100644
--- a/libs/host/Configuration/Options.cs
+++ b/libs/host/Configuration/Options.cs
@@ -765,8 +765,8 @@ public GarnetServerOptions GetServerOptions(ILogger logger = null)
ThreadPoolMaxIOCompletionThreads = ThreadPoolMaxIOCompletionThreads,
NetworkConnectionLimit = NetworkConnectionLimit,
DeviceFactoryCreator = useAzureStorage
- ? () => new AzureStorageNamedDeviceFactory(AzureStorageConnectionString, logger)
- : () => new LocalStorageNamedDeviceFactory(useNativeDeviceLinux: UseNativeDeviceLinux.GetValueOrDefault(), logger: logger),
+ ? new AzureStorageNamedDeviceFactoryCreator(AzureStorageConnectionString, logger)
+ : new LocalStorageNamedDeviceFactoryCreator(useNativeDeviceLinux: UseNativeDeviceLinux.GetValueOrDefault(), logger: logger),
CheckpointThrottleFlushDelayMs = CheckpointThrottleFlushDelayMs,
EnableScatterGatherGet = EnableScatterGatherGet.GetValueOrDefault(),
ReplicaSyncDelayMs = ReplicaSyncDelayMs,
diff --git a/libs/host/GarnetServer.cs b/libs/host/GarnetServer.cs
index e1c61454bb..65950431bd 100644
--- a/libs/host/GarnetServer.cs
+++ b/libs/host/GarnetServer.cs
@@ -304,15 +304,14 @@ private void CreateMainStore(IClusterFactory clusterFactory, out string checkpoi
kvSettings.ThrottleCheckpointFlushDelayMs = opts.CheckpointThrottleFlushDelayMs;
kvSettings.CheckpointVersionSwitchBarrier = opts.EnableCluster;
- var checkpointFactory = opts.DeviceFactoryCreator();
if (opts.EnableCluster)
{
- kvSettings.CheckpointManager = clusterFactory.CreateCheckpointManager(checkpointFactory,
+ kvSettings.CheckpointManager = clusterFactory.CreateCheckpointManager(opts.DeviceFactoryCreator,
new DefaultCheckpointNamingScheme(checkpointDir + "/Store/checkpoints"), isMainStore: true, logger);
}
else
{
- kvSettings.CheckpointManager = new DeviceLogCommitCheckpointManager(checkpointFactory,
+ kvSettings.CheckpointManager = new DeviceLogCommitCheckpointManager(opts.DeviceFactoryCreator,
new DefaultCheckpointNamingScheme(checkpointDir + "/Store/checkpoints"), removeOutdated: true);
}
@@ -335,11 +334,11 @@ private void CreateObjectStore(IClusterFactory clusterFactory, CustomCommandMana
if (opts.EnableCluster)
objKvSettings.CheckpointManager = clusterFactory.CreateCheckpointManager(
- opts.DeviceFactoryCreator(),
+ opts.DeviceFactoryCreator,
new DefaultCheckpointNamingScheme(CheckpointDir + "/ObjectStore/checkpoints"),
isMainStore: false, logger);
else
- objKvSettings.CheckpointManager = new DeviceLogCommitCheckpointManager(opts.DeviceFactoryCreator(),
+ objKvSettings.CheckpointManager = new DeviceLogCommitCheckpointManager(opts.DeviceFactoryCreator,
new DefaultCheckpointNamingScheme(CheckpointDir + "/ObjectStore/checkpoints"),
removeOutdated: true);
@@ -407,9 +406,8 @@ public void Dispose(bool deleteDir = true)
logFactory?.Delete(new FileDescriptor { directoryName = "" });
if (opts.CheckpointDir != opts.LogDir && !string.IsNullOrEmpty(opts.CheckpointDir))
{
- var ckptdir = opts.DeviceFactoryCreator();
- ckptdir.Initialize(opts.CheckpointDir);
- ckptdir.Delete(new FileDescriptor { directoryName = "" });
+ var checkpointDeviceFactory = opts.DeviceFactoryCreator.Create(opts.CheckpointDir);
+ checkpointDeviceFactory.Delete(new FileDescriptor { directoryName = "" });
}
}
}
diff --git a/libs/resources/RespCommandsDocs.json b/libs/resources/RespCommandsDocs.json
index 15cc917695..9e217fb06f 100644
--- a/libs/resources/RespCommandsDocs.json
+++ b/libs/resources/RespCommandsDocs.json
@@ -1027,6 +1027,14 @@
"Token": "NO"
}
]
+ },
+ {
+ "TypeDiscriminator": "RespCommandBasicArgument",
+ "Name": "MAXAGE",
+ "DisplayText": "maxage",
+ "Type": "Integer",
+ "Token": "MAXAGE",
+ "ArgumentFlags": "Optional"
}
]
}
@@ -1175,7 +1183,7 @@
{
"Command": "CLUSTER",
"Name": "CLUSTER",
- "Summary": "A container for Redis Cluster commands.",
+ "Summary": "A container for Redis Cluster internal commands.",
"Group": "Cluster",
"Complexity": "Depends on subcommand.",
"SubCommands": [
@@ -1429,6 +1437,13 @@
"Group": "Cluster",
"Complexity": "O(N) where N is the total number of Cluster nodes"
},
+ {
+ "Command": "CLUSTER_PUBLISH",
+ "Name": "CLUSTER|PUBLISH",
+ "Summary": "Processes a forwarded published message from any node in the cluster",
+ "Group": "Cluster",
+ "Complexity": "O(1)"
+ },
{
"Command": "CLUSTER_REPLICAS",
"Name": "CLUSTER|REPLICAS",
@@ -1570,6 +1585,13 @@
"Complexity": "O(N) where N is the total number of Cluster nodes",
"DocFlags": "Deprecated",
"ReplacedBy": "\u0060CLUSTER SHARDS\u0060"
+ },
+ {
+ "Command": "CLUSTER_SPUBLISH",
+ "Name": "CLUSTER|SPUBLISH",
+ "Summary": "Processes a forwarded published message from a node in the same shard",
+ "Group": "Cluster",
+ "Complexity": "O(1)"
}
]
},
@@ -1603,22 +1625,6 @@
}
]
},
- {
- "Command": "COMMAND_INFO",
- "Name": "COMMAND|INFO",
- "Summary": "Returns information about one, multiple or all commands.",
- "Group": "Server",
- "Complexity": "O(N) where N is the number of commands to look up",
- "Arguments": [
- {
- "TypeDiscriminator": "RespCommandBasicArgument",
- "Name": "COMMAND-NAME",
- "DisplayText": "command-name",
- "Type": "String",
- "ArgumentFlags": "Optional, Multiple"
- }
- ]
- },
{
"Command": "COMMAND_GETKEYS",
"Name": "COMMAND|GETKEYS",
@@ -1662,6 +1668,22 @@
"ArgumentFlags": "Optional, Multiple"
}
]
+ },
+ {
+ "Command": "COMMAND_INFO",
+ "Name": "COMMAND|INFO",
+ "Summary": "Returns information about one, multiple or all commands.",
+ "Group": "Server",
+ "Complexity": "O(N) where N is the number of commands to look up",
+ "Arguments": [
+ {
+ "TypeDiscriminator": "RespCommandBasicArgument",
+ "Name": "COMMAND-NAME",
+ "DisplayText": "command-name",
+ "Type": "String",
+ "ArgumentFlags": "Optional, Multiple"
+ }
+ ]
}
]
},
@@ -1780,9 +1802,10 @@
{
"Command": "DEBUG",
"Name": "DEBUG",
- "Summary": "Depends on subcommand.",
+ "Summary": "A container for debugging commands.",
"Group": "Server",
- "Complexity": "O(1)"
+ "Complexity": "Depends on subcommand.",
+ "DocFlags": "SysCmd"
},
{
"Command": "DECR",
@@ -3890,6 +3913,14 @@
"Type": "Integer",
"Token": "COUNT",
"ArgumentFlags": "Optional"
+ },
+ {
+ "TypeDiscriminator": "RespCommandBasicArgument",
+ "Name": "NOVALUES",
+ "DisplayText": "novalues",
+ "Type": "PureToken",
+ "Token": "NOVALUES",
+ "ArgumentFlags": "Optional"
}
]
},
@@ -5419,7 +5450,7 @@
"Name": "NUMPARAMS",
"DisplayText": "numParams",
"Type": "Integer",
- "Summary": "Number of parameters of the command to register"
+ "Summary": "Numer of parameters of the command to register"
},
{
"TypeDiscriminator": "RespCommandBasicArgument",
@@ -5615,12 +5646,51 @@
},
{
"TypeDiscriminator": "RespCommandBasicArgument",
- "Name": "SERIALIZEDVALUE",
+ "Name": "SERIALIZED-VALUE",
"DisplayText": "serialized-value",
"Type": "String"
+ },
+ {
+ "TypeDiscriminator": "RespCommandBasicArgument",
+ "Name": "REPLACE",
+ "DisplayText": "replace",
+ "Type": "PureToken",
+ "Token": "REPLACE",
+ "ArgumentFlags": "Optional"
+ },
+ {
+ "TypeDiscriminator": "RespCommandBasicArgument",
+ "Name": "ABSTTL",
+ "DisplayText": "absttl",
+ "Type": "PureToken",
+ "Token": "ABSTTL",
+ "ArgumentFlags": "Optional"
+ },
+ {
+ "TypeDiscriminator": "RespCommandBasicArgument",
+ "Name": "SECONDS",
+ "DisplayText": "seconds",
+ "Type": "Integer",
+ "Token": "IDLETIME",
+ "ArgumentFlags": "Optional"
+ },
+ {
+ "TypeDiscriminator": "RespCommandBasicArgument",
+ "Name": "FREQUENCY",
+ "DisplayText": "frequency",
+ "Type": "Integer",
+ "Token": "FREQ",
+ "ArgumentFlags": "Optional"
}
]
},
+ {
+ "Command": "ROLE",
+ "Name": "ROLE",
+ "Summary": "Returns the replication role.",
+ "Group": "Server",
+ "Complexity": "O(1)"
+ },
{
"Command": "RPOP",
"Name": "RPOP",
@@ -6558,6 +6628,27 @@
}
]
},
+ {
+ "Command": "SPUBLISH",
+ "Name": "SPUBLISH",
+ "Summary": "Post a message to a shard channel",
+ "Group": "PubSub",
+ "Complexity": "O(N) where N is the number of clients subscribed to the receiving shard channel.",
+ "Arguments": [
+ {
+ "TypeDiscriminator": "RespCommandBasicArgument",
+ "Name": "SHARDCHANNEL",
+ "DisplayText": "shardchannel",
+ "Type": "String"
+ },
+ {
+ "TypeDiscriminator": "RespCommandBasicArgument",
+ "Name": "MESSAGE",
+ "DisplayText": "message",
+ "Type": "String"
+ }
+ ]
+ },
{
"Command": "SRANDMEMBER",
"Name": "SRANDMEMBER",
@@ -6643,23 +6734,18 @@
]
},
{
- "Command": "SPUBLISH",
- "Name": "SPUBLISH",
- "Summary": "Posts a message to a shard channel.",
+ "Command": "SSUBSCRIBE",
+ "Name": "SSUBSCRIBE",
+ "Summary": "Listens for messages published to shard channels.",
"Group": "PubSub",
- "Complexity": "O(N) where N is the number of clients subscribed to the receiving shard channel.",
+ "Complexity": "O(N) where N is the number of shard channels to subscribe to.",
"Arguments": [
{
"TypeDiscriminator": "RespCommandBasicArgument",
"Name": "SHARDCHANNEL",
"DisplayText": "shardchannel",
- "Type": "String"
- },
- {
- "TypeDiscriminator": "RespCommandBasicArgument",
- "Name": "MESSAGE",
- "DisplayText": "message",
- "Type": "String"
+ "Type": "String",
+ "ArgumentFlags": "Multiple"
}
]
},
@@ -6695,22 +6781,6 @@
}
]
},
- {
- "Command": "SSUBSCRIBE",
- "Name": "SSUBSCRIBE",
- "Summary": "Listens for messages published to shard channels.",
- "Group": "PubSub",
- "Complexity": "O(N) where N is the number of shard channels to subscribe to.",
- "Arguments": [
- {
- "TypeDiscriminator": "RespCommandBasicArgument",
- "Name": "shardchannel",
- "DisplayText": "channel",
- "Type": "String",
- "ArgumentFlags": "Multiple"
- }
- ]
- },
{
"Command": "SUBSTR",
"Name": "SUBSTR",
@@ -6821,13 +6891,6 @@
}
]
},
- {
- "Command": "ROLE",
- "Name": "ROLE",
- "Summary": "Returns the replication role.",
- "Group": "Server",
- "Complexity": "O(1)"
- },
{
"Command": "UNLINK",
"Name": "UNLINK",
@@ -7593,7 +7656,7 @@
{
"Command": "ZRANGEBYLEX",
"Name": "ZRANGEBYLEX",
- "Summary": "Returns the number of members in a sorted set within a lexicographical range.",
+ "Summary": "Returns members in a sorted set within a lexicographical range.",
"Group": "SortedSet",
"Complexity": "O(log(N)\u002BM) with N being the number of elements in the sorted set and M the number of elements being returned. If M is constant (e.g. always asking for the first 10 elements with LIMIT), you can consider it O(log(N)).",
"DocFlags": "Deprecated",
@@ -7610,13 +7673,13 @@
"TypeDiscriminator": "RespCommandBasicArgument",
"Name": "MIN",
"DisplayText": "min",
- "Type": "Double"
+ "Type": "String"
},
{
"TypeDiscriminator": "RespCommandBasicArgument",
"Name": "MAX",
"DisplayText": "max",
- "Type": "Double"
+ "Type": "String"
},
{
"TypeDiscriminator": "RespCommandContainerArgument",
diff --git a/libs/resources/RespCommandsInfo.json b/libs/resources/RespCommandsInfo.json
index 2d84a2c6ce..d74adfda93 100644
--- a/libs/resources/RespCommandsInfo.json
+++ b/libs/resources/RespCommandsInfo.json
@@ -617,43 +617,6 @@
"Flags": "Admin, NoMulti, NoScript",
"AclCategories": "Admin, Dangerous, Slow, Garnet"
},
- {
- "Command": "CLUSTER_PUBLISH",
- "Name": "CLUSTER|PUBLISH",
- "IsInternal": true,
- "Arity": 4,
- "Flags": "Loading, NoScript, PubSub, Stale",
- "FirstKey": 1,
- "LastKey": 1,
- "Step": 1,
- "AclCategories": "Admin, PubSub, Slow, Garnet"
- },
- {
- "Command": "CLUSTER_SPUBLISH",
- "Name": "CLUSTER|SPUBLISH",
- "IsInternal": true,
- "Arity": 4,
- "Flags": "Loading, NoScript, PubSub, Stale",
- "FirstKey": 1,
- "LastKey": 1,
- "Step": 1,
- "AclCategories": "Admin, PubSub, Slow, Garnet",
- "KeySpecifications": [
- {
- "BeginSearch": {
- "TypeDiscriminator": "BeginSearchIndex",
- "Index": 1
- },
- "FindKeys": {
- "TypeDiscriminator": "FindKeysRange",
- "LastKey": 0,
- "KeyStep": 1,
- "Limit": 0
- },
- "Flags": "RO"
- }
- ]
- },
{
"Command": "CLUSTER_BUMPEPOCH",
"Name": "CLUSTER|BUMPEPOCH",
@@ -837,6 +800,17 @@
"nondeterministic_output"
]
},
+ {
+ "Command": "CLUSTER_PUBLISH",
+ "Name": "CLUSTER|PUBLISH",
+ "IsInternal": true,
+ "Arity": 4,
+ "Flags": "Loading, NoScript, PubSub, Stale",
+ "FirstKey": 1,
+ "LastKey": 1,
+ "Step": 1,
+ "AclCategories": "Admin, PubSub, Slow, Garnet"
+ },
{
"Command": "CLUSTER_REPLICAS",
"Name": "CLUSTER|REPLICAS",
@@ -926,6 +900,32 @@
"Arity": 1,
"Flags": "Admin, NoMulti, NoScript",
"AclCategories": "Admin, Dangerous, Slow, Garnet"
+ },
+ {
+ "Command": "CLUSTER_SPUBLISH",
+ "Name": "CLUSTER|SPUBLISH",
+ "IsInternal": true,
+ "Arity": 4,
+ "Flags": "Loading, NoScript, PubSub, Stale",
+ "FirstKey": 1,
+ "LastKey": 1,
+ "Step": 1,
+ "AclCategories": "Admin, PubSub, Slow, Garnet",
+ "KeySpecifications": [
+ {
+ "BeginSearch": {
+ "TypeDiscriminator": "BeginSearchIndex",
+ "Index": 1
+ },
+ "FindKeys": {
+ "TypeDiscriminator": "FindKeysRange",
+ "LastKey": 0,
+ "KeyStep": 1,
+ "Limit": 0
+ },
+ "Flags": "RO"
+ }
+ ]
}
]
},
@@ -956,16 +956,6 @@
"nondeterministic_output_order"
]
},
- {
- "Command": "COMMAND_INFO",
- "Name": "COMMAND|INFO",
- "Arity": -2,
- "Flags": "Loading, Stale",
- "AclCategories": "Connection, Slow",
- "Tips": [
- "nondeterministic_output_order"
- ]
- },
{
"Command": "COMMAND_GETKEYS",
"Name": "COMMAND|GETKEYS",
@@ -979,6 +969,16 @@
"Arity": -3,
"Flags": "Loading, Stale",
"AclCategories": "Connection, Slow"
+ },
+ {
+ "Command": "COMMAND_INFO",
+ "Name": "COMMAND|INFO",
+ "Arity": -2,
+ "Flags": "Loading, Stale",
+ "AclCategories": "Connection, Slow",
+ "Tips": [
+ "nondeterministic_output_order"
+ ]
}
]
},
@@ -1082,7 +1082,7 @@
"Command": "DEBUG",
"Name": "DEBUG",
"Arity": -2,
- "Flags": "Admin, Noscript, Loading, Stale",
+ "Flags": "Admin, Loading, NoScript, Stale",
"AclCategories": "Admin, Dangerous, Slow"
},
{
@@ -1179,7 +1179,10 @@
"FirstKey": 1,
"LastKey": 1,
"Step": 1,
- "AclCategories": "KeySpace, Read",
+ "AclCategories": "KeySpace, Read, Slow",
+ "Tips": [
+ "nondeterministic_output"
+ ],
"KeySpecifications": [
{
"BeginSearch": {
@@ -3569,13 +3572,20 @@
"FindKeys": {
"TypeDiscriminator": "FindKeysRange",
"LastKey": 0,
- "KeyStep": 0,
+ "KeyStep": 1,
"Limit": 0
},
"Flags": "OW, Update"
}
]
},
+ {
+ "Command": "ROLE",
+ "Name": "ROLE",
+ "Arity": 1,
+ "Flags": "Fast, Loading, NoScript, Stale",
+ "AclCategories": "Admin, Dangerous, Fast"
+ },
{
"Command": "RPOP",
"Name": "RPOP",
@@ -4110,7 +4120,7 @@
"KeyStep": 1,
"Limit": 0
},
- "Flags": "RW, Update"
+ "Flags": "OW, Update"
},
{
"BeginSearch": {
@@ -4327,6 +4337,31 @@
}
]
},
+ {
+ "Command": "SPUBLISH",
+ "Name": "SPUBLISH",
+ "Arity": 3,
+ "Flags": "Loading, NoScript, PubSub, Stale",
+ "FirstKey": 1,
+ "LastKey": 1,
+ "Step": 1,
+ "AclCategories": "PubSub, Read, Slow",
+ "KeySpecifications": [
+ {
+ "BeginSearch": {
+ "TypeDiscriminator": "BeginSearchIndex",
+ "Index": 1
+ },
+ "FindKeys": {
+ "TypeDiscriminator": "FindKeysRange",
+ "LastKey": 0,
+ "KeyStep": 1,
+ "Limit": 0
+ },
+ "Flags": "RO"
+ }
+ ]
+ },
{
"Command": "SRANDMEMBER",
"Name": "SRANDMEMBER",
@@ -4408,38 +4443,6 @@
}
]
},
- {
- "Command": "STRLEN",
- "Name": "STRLEN",
- "Arity": 2,
- "Flags": "Fast, ReadOnly",
- "FirstKey": 1,
- "LastKey": 1,
- "Step": 1,
- "AclCategories": "Fast, Read, String",
- "KeySpecifications": [
- {
- "BeginSearch": {
- "TypeDiscriminator": "BeginSearchIndex",
- "Index": 1
- },
- "FindKeys": {
- "TypeDiscriminator": "FindKeysRange",
- "LastKey": 0,
- "KeyStep": 1,
- "Limit": 0
- },
- "Flags": "RO"
- }
- ]
- },
- {
- "Command": "SUBSCRIBE",
- "Name": "SUBSCRIBE",
- "Arity": -2,
- "Flags": "Loading, NoScript, PubSub, Stale",
- "AclCategories": "PubSub, Slow"
- },
{
"Command": "SSUBSCRIBE",
"Name": "SSUBSCRIBE",
@@ -4448,7 +4451,7 @@
"FirstKey": 1,
"LastKey": -1,
"Step": 1,
- "AclCategories": "PubSub, Slow, Read",
+ "AclCategories": "PubSub, Read, Slow",
"KeySpecifications": [
{
"BeginSearch": {
@@ -4466,14 +4469,14 @@
]
},
{
- "Command": "SPUBLISH",
- "Name": "SPUBLISH",
- "Arity": 3,
- "Flags": "Loading, NoScript, PubSub, Stale",
+ "Command": "STRLEN",
+ "Name": "STRLEN",
+ "Arity": 2,
+ "Flags": "Fast, ReadOnly",
"FirstKey": 1,
"LastKey": 1,
"Step": 1,
- "AclCategories": "PubSub, Slow, Read",
+ "AclCategories": "Fast, Read, String",
"KeySpecifications": [
{
"BeginSearch": {
@@ -4490,6 +4493,13 @@
}
]
},
+ {
+ "Command": "SUBSCRIBE",
+ "Name": "SUBSCRIBE",
+ "Arity": -2,
+ "Flags": "Loading, NoScript, PubSub, Stale",
+ "AclCategories": "PubSub, Slow"
+ },
{
"Command": "SUBSTR",
"Name": "SUBSTR",
@@ -4644,13 +4654,6 @@
}
]
},
- {
- "Command": "ROLE",
- "Name": "ROLE",
- "Arity": 1,
- "Flags": "NoScript, Loading, Stale, Fast",
- "AclCategories": "Admin, Fast, Dangerous"
- },
{
"Command": "UNLINK",
"Name": "UNLINK",
diff --git a/libs/server/Cluster/IClusterFactory.cs b/libs/server/Cluster/IClusterFactory.cs
index 675afe876d..d0496596f1 100644
--- a/libs/server/Cluster/IClusterFactory.cs
+++ b/libs/server/Cluster/IClusterFactory.cs
@@ -14,7 +14,7 @@ public interface IClusterFactory
///
/// Create checkpoint manager
///
- DeviceLogCommitCheckpointManager CreateCheckpointManager(INamedDeviceFactory deviceFactory, ICheckpointNamingScheme checkpointNamingScheme, bool isMainStore, ILogger logger = default);
+ DeviceLogCommitCheckpointManager CreateCheckpointManager(INamedDeviceFactoryCreator deviceFactoryCreator, ICheckpointNamingScheme checkpointNamingScheme, bool isMainStore, ILogger logger = default);
///
/// Create cluster provider
diff --git a/libs/server/Resp/Bitmap/BitmapManagerBitCount.cs b/libs/server/Resp/Bitmap/BitmapManagerBitCount.cs
index 73857ee1f9..44eb959b6e 100644
--- a/libs/server/Resp/Bitmap/BitmapManagerBitCount.cs
+++ b/libs/server/Resp/Bitmap/BitmapManagerBitCount.cs
@@ -2,6 +2,7 @@
// Licensed under the MIT license.
using System.Diagnostics;
+using System.Numerics;
using System.Runtime.Intrinsics;
using System.Runtime.Intrinsics.X86;
@@ -95,11 +96,12 @@ public static long BitCountDriver(long startOffset, long endOffset, byte offsetT
if (endOffset - startOffset < 128)
count += __scalar_popc(value, startOffset, endOffset);
- else
- if (Avx2.IsSupported)
+ else if (Avx2.IsSupported)
count += __simd_popcX256(value, startOffset, endOffset);
- else
+ else if (Ssse3.IsSupported)
count += __simd_popcX128(value, startOffset, endOffset);
+ else
+ count += __scalar_popc(value, startOffset, endOffset);
return count;
}
@@ -122,10 +124,10 @@ private static long __scalar_popc(byte* bitmap, long start, long end)
#region popc_4x8
while (curr < vend)
{
- ulong v00 = Popcnt.X64.PopCount(*(ulong*)(curr));
- ulong v01 = Popcnt.X64.PopCount(*(ulong*)(curr + 8));
- ulong v02 = Popcnt.X64.PopCount(*(ulong*)(curr + 16));
- ulong v03 = Popcnt.X64.PopCount(*(ulong*)(curr + 24));
+ ulong v00 = (ulong)BitOperations.PopCount(*(ulong*)(curr));
+ ulong v01 = (ulong)BitOperations.PopCount(*(ulong*)(curr + 8));
+ ulong v02 = (ulong)BitOperations.PopCount(*(ulong*)(curr + 16));
+ ulong v03 = (ulong)BitOperations.PopCount(*(ulong*)(curr + 24));
v00 = v00 + v01;
v02 = v02 + v03;
count += v00 + v02;
@@ -140,7 +142,7 @@ private static long __scalar_popc(byte* bitmap, long start, long end)
vend = curr + (len - (len & tail));
while (curr < vend)
{
- count += Popcnt.X64.PopCount(*(ulong*)(curr));
+ count += (ulong)BitOperations.PopCount(*(ulong*)(curr));
curr += batchSize;
}
@@ -152,7 +154,7 @@ private static long __scalar_popc(byte* bitmap, long start, long end)
if (tail >= 3) tt |= (ulong)(((ulong)curr[2]) << 16);
if (tail >= 2) tt |= (ulong)(((ulong)curr[1]) << 8);
if (tail >= 1) tt |= (ulong)(((ulong)curr[0]));
- count += Popcnt.X64.PopCount(tt);
+ count += (ulong)BitOperations.PopCount(tt);
#endregion
return (long)count;
diff --git a/libs/server/Servers/GarnetServerOptions.cs b/libs/server/Servers/GarnetServerOptions.cs
index d3dd7b777a..36a2ff3866 100644
--- a/libs/server/Servers/GarnetServerOptions.cs
+++ b/libs/server/Servers/GarnetServerOptions.cs
@@ -277,9 +277,9 @@ public class GarnetServerOptions : ServerOptions
public int NetworkConnectionLimit = -1;
///
- /// Creator of device factories
+ /// Instance of interface to create named device factories
///
- public Func DeviceFactoryCreator = null;
+ public INamedDeviceFactoryCreator DeviceFactoryCreator = null;
///
/// Whether and by how much should we throttle the disk IO for checkpoints (default = 0)
@@ -500,7 +500,7 @@ public KVSettings GetSettings(ILoggerFactory loggerFactory,
}
logger?.LogInformation("[Store] Using log mutable percentage of {MutablePercent}%", MutablePercent);
- DeviceFactoryCreator ??= () => new LocalStorageNamedDeviceFactory(useNativeDeviceLinux: UseNativeDeviceLinux, logger: logger);
+ DeviceFactoryCreator ??= new LocalStorageNamedDeviceFactoryCreator(useNativeDeviceLinux: UseNativeDeviceLinux, logger: logger);
if (LatencyMonitor && MetricsSamplingFrequency == 0)
throw new Exception("LatencyMonitor requires MetricsSamplingFrequency to be set");
@@ -740,7 +740,7 @@ public void GetAofSettings(out TsavoriteLogSettings tsavoriteLogSettings)
throw new Exception("AOF Page size cannot be more than the AOF memory size.");
}
tsavoriteLogSettings.LogCommitManager = new DeviceLogCommitCheckpointManager(
- MainMemoryReplication ? new NullNamedDeviceFactory() : DeviceFactoryCreator(),
+ MainMemoryReplication ? new NullNamedDeviceFactoryCreator() : DeviceFactoryCreator,
new DefaultCheckpointNamingScheme(CheckpointDir + "/AOF"),
removeOutdated: true,
fastCommitThrottleFreq: EnableFastCommit ? FastCommitThrottleFreq : 0);
@@ -753,9 +753,7 @@ public void GetAofSettings(out TsavoriteLogSettings tsavoriteLogSettings)
///
public INamedDeviceFactory GetInitializedDeviceFactory(string baseName)
{
- var deviceFactory = GetDeviceFactory();
- deviceFactory.Initialize(baseName);
- return deviceFactory;
+ return DeviceFactoryCreator.Create(baseName);
}
///
@@ -834,11 +832,5 @@ IDevice GetAofDevice()
if (UseAofNullDevice) return new NullDevice();
else return GetInitializedDeviceFactory(CheckpointDir).Get(new FileDescriptor("AOF", "aof.log"));
}
-
- ///
- /// Get device factory
- ///
- ///
- public INamedDeviceFactory GetDeviceFactory() => DeviceFactoryCreator();
}
}
\ No newline at end of file
diff --git a/libs/storage/Tsavorite/cs/src/core/Allocator/AllocatorScan.cs b/libs/storage/Tsavorite/cs/src/core/Allocator/AllocatorScan.cs
index fc732747b3..e137bcd9aa 100644
--- a/libs/storage/Tsavorite/cs/src/core/Allocator/AllocatorScan.cs
+++ b/libs/storage/Tsavorite/cs/src/core/Allocator/AllocatorScan.cs
@@ -245,7 +245,7 @@ private protected bool ScanLookup 0)
bContext.CompletePending(wait: true);
diff --git a/libs/storage/Tsavorite/cs/src/core/ClientSession/ClientSession.cs b/libs/storage/Tsavorite/cs/src/core/ClientSession/ClientSession.cs
index 38df35ab13..486efe368f 100644
--- a/libs/storage/Tsavorite/cs/src/core/ClientSession/ClientSession.cs
+++ b/libs/storage/Tsavorite/cs/src/core/ClientSession/ClientSession.cs
@@ -506,8 +506,9 @@ public bool IterateLookup(ref TScanFunctions scanFunctions, long
/// TailAddress). A snapshot can be taken by calling ShiftReadOnlyToTail() and then using that TailAddress as endAddress and maxAddress.
/// If true, validate that the cursor is on a valid address boundary, and snap it to the highest lower address if it is not.
/// Maximum address for determining liveness, records after this address are not considered when checking validity.
- /// True if Scan completed and pushed records; false if Scan ended early due to finding less than records
- /// or one of the TScanIterator reader functions returning false
+ /// True if Scan completed and pushed records and there may be more records; false if Scan ended early due to finding less than records
+ /// or one of the TScanIterator reader functions returning false, or if we determined that there are no records remaining. In other words, if this returns true,
+ /// there may be more records satisfying the iteration criteria beyond .
public bool ScanCursor(ref long cursor, long count, TScanFunctions scanFunctions, long endAddress = long.MaxValue, bool validateCursor = false, long maxAddress = long.MaxValue)
where TScanFunctions : IScanIteratorFunctions
=> store.hlogBase.ScanCursor(store, scanCursorState ??= new(), ref cursor, count, scanFunctions, endAddress, validateCursor, maxAddress);
diff --git a/libs/storage/Tsavorite/cs/src/core/Index/CheckpointManagement/DeviceLogCommitCheckpointManager.cs b/libs/storage/Tsavorite/cs/src/core/Index/CheckpointManagement/DeviceLogCommitCheckpointManager.cs
index 90638842ab..ed4137bd95 100644
--- a/libs/storage/Tsavorite/cs/src/core/Index/CheckpointManagement/DeviceLogCommitCheckpointManager.cs
+++ b/libs/storage/Tsavorite/cs/src/core/Index/CheckpointManagement/DeviceLogCommitCheckpointManager.cs
@@ -49,15 +49,15 @@ public class DeviceLogCommitCheckpointManager : ILogCommitManager, ICheckpointMa
///
/// Create new instance of log commit manager
///
- /// Factory for getting devices
+ /// Factory for getting devices
/// Checkpoint naming helper
/// Remote older Tsavorite log commits
/// FastCommit throttle frequency - use only in FastCommit mode
/// Remote older Tsavorite log commits
- public DeviceLogCommitCheckpointManager(INamedDeviceFactory deviceFactory, ICheckpointNamingScheme checkpointNamingScheme, bool removeOutdated = true, int fastCommitThrottleFreq = 0, ILogger logger = null)
+ public DeviceLogCommitCheckpointManager(INamedDeviceFactoryCreator deviceFactoryCreator, ICheckpointNamingScheme checkpointNamingScheme, bool removeOutdated = true, int fastCommitThrottleFreq = 0, ILogger logger = null)
{
this.logger = logger;
- this.deviceFactory = deviceFactory;
+ this.deviceFactory = deviceFactoryCreator.Create(checkpointNamingScheme.BaseName);
this.checkpointNamingScheme = checkpointNamingScheme;
this.fastCommitThrottleFreq = fastCommitThrottleFreq;
@@ -76,7 +76,6 @@ public DeviceLogCommitCheckpointManager(INamedDeviceFactory deviceFactory, IChec
// // We only keep the latest TsavoriteLog commit
flogCommitHistory = new long[flogCommitCount];
}
- deviceFactory.Initialize(checkpointNamingScheme.BaseName);
}
///
@@ -96,12 +95,12 @@ public void Purge(Guid token)
///
/// Create new instance of log commit manager
///
- /// Factory for getting devices
+ /// Creator of factory for getting devices
/// Overall location specifier (e.g., local path or cloud container name)
/// Remote older Tsavorite log commits
/// Remote older Tsavorite log commits
- public DeviceLogCommitCheckpointManager(INamedDeviceFactory deviceFactory, string baseName, bool removeOutdated = false, ILogger logger = null)
- : this(deviceFactory, new DefaultCheckpointNamingScheme(baseName), removeOutdated)
+ public DeviceLogCommitCheckpointManager(INamedDeviceFactoryCreator deviceFactoryCreator, string baseName, bool removeOutdated = false, ILogger logger = null)
+ : this(deviceFactoryCreator, new DefaultCheckpointNamingScheme(baseName), removeOutdated)
{
this.logger = logger;
}
diff --git a/libs/storage/Tsavorite/cs/src/core/Index/CheckpointManagement/INamedDeviceFactory.cs b/libs/storage/Tsavorite/cs/src/core/Index/CheckpointManagement/INamedDeviceFactory.cs
index 383daefec4..8cd2f95d4b 100644
--- a/libs/storage/Tsavorite/cs/src/core/Index/CheckpointManagement/INamedDeviceFactory.cs
+++ b/libs/storage/Tsavorite/cs/src/core/Index/CheckpointManagement/INamedDeviceFactory.cs
@@ -6,16 +6,10 @@
namespace Tsavorite.core
{
///
- /// Factory for getting IDevice instances for checkpointing
+ /// Factory for getting IDevice instances for checkpointing. The factory is specific to a particular base path or container.
///
public interface INamedDeviceFactory
{
- ///
- /// Initialize base name or container
- ///
- /// Base name or container
- void Initialize(string baseName);
-
///
/// Get IDevice instance for given file info
///
diff --git a/libs/storage/Tsavorite/cs/src/core/Index/CheckpointManagement/INamedDeviceFactoryCreator.cs b/libs/storage/Tsavorite/cs/src/core/Index/CheckpointManagement/INamedDeviceFactoryCreator.cs
new file mode 100644
index 0000000000..14d7048612
--- /dev/null
+++ b/libs/storage/Tsavorite/cs/src/core/Index/CheckpointManagement/INamedDeviceFactoryCreator.cs
@@ -0,0 +1,18 @@
+// Copyright (c) Microsoft Corporation.
+// Licensed under the MIT license.
+
+namespace Tsavorite.core
+{
+ ///
+ /// Factory creator for getting IDevice instances for checkpointing
+ ///
+ public interface INamedDeviceFactoryCreator
+ {
+ ///
+ /// Create factory for creating IDevice instances, for the given base name or container
+ ///
+ /// Base name or container
+ ///
+ INamedDeviceFactory Create(string baseName);
+ }
+}
\ No newline at end of file
diff --git a/libs/storage/Tsavorite/cs/src/core/Index/CheckpointManagement/LocalStorageNamedDeviceFactory.cs b/libs/storage/Tsavorite/cs/src/core/Index/CheckpointManagement/LocalStorageNamedDeviceFactory.cs
index 8d8e8d6f00..2fbcc066ef 100644
--- a/libs/storage/Tsavorite/cs/src/core/Index/CheckpointManagement/LocalStorageNamedDeviceFactory.cs
+++ b/libs/storage/Tsavorite/cs/src/core/Index/CheckpointManagement/LocalStorageNamedDeviceFactory.cs
@@ -15,7 +15,7 @@ namespace Tsavorite.core
///
public class LocalStorageNamedDeviceFactory : INamedDeviceFactory
{
- string baseName;
+ readonly string baseName;
readonly bool deleteOnClose;
readonly int? throttleLimit;
readonly bool preallocateFile;
@@ -32,8 +32,10 @@ public class LocalStorageNamedDeviceFactory : INamedDeviceFactory
/// Whether file buffering (during write) is disabled (default of true requires aligned writes)
/// Throttle limit (max number of pending I/Os) for this device instance
/// Use native device on Linux
- ///
- public LocalStorageNamedDeviceFactory(bool preallocateFile = false, bool deleteOnClose = false, bool disableFileBuffering = true, int? throttleLimit = null, bool useNativeDeviceLinux = false, bool readOnly = false, ILogger logger = null)
+ /// Whether files are opened as readonly
+ /// Base name
+ /// Logger
+ public LocalStorageNamedDeviceFactory(bool preallocateFile = false, bool deleteOnClose = false, bool disableFileBuffering = true, int? throttleLimit = null, bool useNativeDeviceLinux = false, bool readOnly = false, string baseName = null, ILogger logger = null)
{
this.preallocateFile = preallocateFile;
this.deleteOnClose = deleteOnClose;
@@ -41,13 +43,8 @@ public LocalStorageNamedDeviceFactory(bool preallocateFile = false, bool deleteO
this.throttleLimit = throttleLimit;
this.useNativeDeviceLinux = useNativeDeviceLinux;
this.readOnly = readOnly;
- this.logger = logger;
- }
-
- ///
- public void Initialize(string baseName)
- {
this.baseName = baseName;
+ this.logger = logger;
}
///
diff --git a/libs/storage/Tsavorite/cs/src/core/Index/CheckpointManagement/LocalStorageNamedDeviceFactoryCreator.cs b/libs/storage/Tsavorite/cs/src/core/Index/CheckpointManagement/LocalStorageNamedDeviceFactoryCreator.cs
new file mode 100644
index 0000000000..63265edb11
--- /dev/null
+++ b/libs/storage/Tsavorite/cs/src/core/Index/CheckpointManagement/LocalStorageNamedDeviceFactoryCreator.cs
@@ -0,0 +1,47 @@
+// Copyright (c) Microsoft Corporation.
+// Licensed under the MIT license.
+
+using Microsoft.Extensions.Logging;
+
+namespace Tsavorite.core
+{
+ ///
+ /// Local storage named device factory creator
+ ///
+ public class LocalStorageNamedDeviceFactoryCreator : INamedDeviceFactoryCreator
+ {
+ readonly bool preallocateFile;
+ readonly bool deleteOnClose;
+ readonly int? throttleLimit;
+ readonly bool disableFileBuffering;
+ readonly bool useNativeDeviceLinux;
+ readonly bool readOnly;
+ readonly ILogger logger;
+
+ ///
+ /// Create instance of factory
+ ///
+ /// Whether files should be preallocated
+ /// Whether file should be deleted on close
+ /// Whether file buffering (during write) is disabled (default of true requires aligned writes)
+ /// Throttle limit (max number of pending I/Os) for this device instance
+ /// Use native device on Linux
+ /// Whether files are opened as readonly
+ /// Logger
+ public LocalStorageNamedDeviceFactoryCreator(bool preallocateFile = false, bool deleteOnClose = false, bool disableFileBuffering = true, int? throttleLimit = null, bool useNativeDeviceLinux = false, bool readOnly = false, ILogger logger = null)
+ {
+ this.preallocateFile = preallocateFile;
+ this.deleteOnClose = deleteOnClose;
+ this.disableFileBuffering = disableFileBuffering;
+ this.throttleLimit = throttleLimit;
+ this.useNativeDeviceLinux = useNativeDeviceLinux;
+ this.readOnly = readOnly;
+ this.logger = logger;
+ }
+
+ public INamedDeviceFactory Create(string baseName)
+ {
+ return new LocalStorageNamedDeviceFactory(preallocateFile, deleteOnClose, disableFileBuffering, throttleLimit, useNativeDeviceLinux, readOnly, baseName, logger);
+ }
+ }
+}
\ No newline at end of file
diff --git a/libs/storage/Tsavorite/cs/src/core/Index/CheckpointManagement/NullNamedDeviceFactory.cs b/libs/storage/Tsavorite/cs/src/core/Index/CheckpointManagement/NullNamedDeviceFactory.cs
index f4d42f86fe..4af3c099e1 100644
--- a/libs/storage/Tsavorite/cs/src/core/Index/CheckpointManagement/NullNamedDeviceFactory.cs
+++ b/libs/storage/Tsavorite/cs/src/core/Index/CheckpointManagement/NullNamedDeviceFactory.cs
@@ -6,7 +6,7 @@
namespace Tsavorite.core
{
///
- /// Local storage device factory
+ /// Null device factory
///
public class NullNamedDeviceFactory : INamedDeviceFactory
{
diff --git a/libs/storage/Tsavorite/cs/src/core/Index/CheckpointManagement/NullNamedDeviceFactoryCreator.cs b/libs/storage/Tsavorite/cs/src/core/Index/CheckpointManagement/NullNamedDeviceFactoryCreator.cs
new file mode 100644
index 0000000000..b230cdea57
--- /dev/null
+++ b/libs/storage/Tsavorite/cs/src/core/Index/CheckpointManagement/NullNamedDeviceFactoryCreator.cs
@@ -0,0 +1,18 @@
+// Copyright (c) Microsoft Corporation.
+// Licensed under the MIT license.
+
+namespace Tsavorite.core
+{
+ ///
+ /// Creator of factory for getting null device instances
+ ///
+ public class NullNamedDeviceFactoryCreator : INamedDeviceFactoryCreator
+ {
+ static readonly NullNamedDeviceFactory nullNamedDeviceFactory = new();
+
+ public INamedDeviceFactory Create(string baseName)
+ {
+ return nullNamedDeviceFactory;
+ }
+ }
+}
\ No newline at end of file
diff --git a/libs/storage/Tsavorite/cs/src/core/Index/Tsavorite/Implementation/ContinuePending.cs b/libs/storage/Tsavorite/cs/src/core/Index/Tsavorite/Implementation/ContinuePending.cs
index 2d0b56fb76..b6c479f673 100644
--- a/libs/storage/Tsavorite/cs/src/core/Index/Tsavorite/Implementation/ContinuePending.cs
+++ b/libs/storage/Tsavorite/cs/src/core/Index/Tsavorite/Implementation/ContinuePending.cs
@@ -281,6 +281,7 @@ internal OperationStatus ContinuePendingConditionalCopyToTail
{
// If the key was found at or above minAddress, do nothing.
+ // If we're here we know the key matches because AllocatorBase.AsyncGetFromDiskCallback skips colliding keys by following the .PreviousAddress chain.
if (request.logicalAddress >= pendingContext.minAddress)
return OperationStatus.SUCCESS;
@@ -337,6 +338,7 @@ internal OperationStatus ContinuePendingConditionalScanPush
{
// If the key was found at or above minAddress, do nothing; we'll push it when we get to it. If we flagged the iteration to stop, do nothing.
+ // If we're here we know the key matches because AllocatorBase.AsyncGetFromDiskCallback skips colliding keys by following the .PreviousAddress chain.
if (request.logicalAddress >= pendingContext.minAddress || pendingContext.scanCursorState.stop)
return OperationStatus.SUCCESS;
diff --git a/libs/storage/Tsavorite/cs/src/core/Index/Tsavorite/Tsavorite.cs b/libs/storage/Tsavorite/cs/src/core/Index/Tsavorite/Tsavorite.cs
index 111cfbc7aa..e57be1d8bf 100644
--- a/libs/storage/Tsavorite/cs/src/core/Index/Tsavorite/Tsavorite.cs
+++ b/libs/storage/Tsavorite/cs/src/core/Index/Tsavorite/Tsavorite.cs
@@ -106,7 +106,7 @@ public TsavoriteKV(KVSettings kvSettings, TStoreFunctions storeFun
checkpointManager = checkpointSettings.CheckpointManager ??
new DeviceLogCommitCheckpointManager
- (new LocalStorageNamedDeviceFactory(),
+ (new LocalStorageNamedDeviceFactoryCreator(),
new DefaultCheckpointNamingScheme(
new DirectoryInfo(checkpointSettings.CheckpointDir ?? ".").FullName), removeOutdated: checkpointSettings.RemoveOutdated);
diff --git a/libs/storage/Tsavorite/cs/src/core/TsavoriteLog/TsavoriteLog.cs b/libs/storage/Tsavorite/cs/src/core/TsavoriteLog/TsavoriteLog.cs
index 59302dd37d..9cacbed79a 100644
--- a/libs/storage/Tsavorite/cs/src/core/TsavoriteLog/TsavoriteLog.cs
+++ b/libs/storage/Tsavorite/cs/src/core/TsavoriteLog/TsavoriteLog.cs
@@ -190,7 +190,7 @@ private TsavoriteLog(TsavoriteLogSettings logSettings, bool syncRecover, ILogger
AutoCommit = logSettings.AutoCommit;
logCommitManager = logSettings.LogCommitManager ??
new DeviceLogCommitCheckpointManager
- (new LocalStorageNamedDeviceFactory(),
+ (new LocalStorageNamedDeviceFactoryCreator(),
new DefaultCheckpointNamingScheme(
logSettings.LogCommitDir ??
new FileInfo(logSettings.LogDevice.FileName).Directory.FullName),
diff --git a/libs/storage/Tsavorite/cs/src/devices/AzureStorageDevice/AzureStorageNamedDeviceFactory.cs b/libs/storage/Tsavorite/cs/src/devices/AzureStorageDevice/AzureStorageNamedDeviceFactory.cs
index 2bf672f308..3ee90cb997 100644
--- a/libs/storage/Tsavorite/cs/src/devices/AzureStorageDevice/AzureStorageNamedDeviceFactory.cs
+++ b/libs/storage/Tsavorite/cs/src/devices/AzureStorageDevice/AzureStorageNamedDeviceFactory.cs
@@ -3,7 +3,6 @@
using System.Collections.Generic;
using System.Linq;
-using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Tsavorite.core;
@@ -14,51 +13,26 @@ namespace Tsavorite.devices
///
public class AzureStorageNamedDeviceFactory : INamedDeviceFactory
{
- readonly BlobUtilsV12.ServiceClients pageBlobAccount;
- BlobUtilsV12.ContainerClients pageBlobContainer;
- BlobUtilsV12.BlobDirectory pageBlobDirectory;
readonly ILogger logger;
+ readonly BlobUtilsV12.ContainerClients pageBlobContainer;
+ readonly BlobUtilsV12.BlobDirectory pageBlobDirectory;
- ///
- /// Create instance of factory for Azure devices
- ///
- ///
- ///
- public AzureStorageNamedDeviceFactory(string connectionString, ILogger logger = null)
- : this(BlobUtilsV12.GetServiceClients(connectionString), logger)
- {
- }
-
- ///
- /// Create instance of factory for Azure devices
- ///
- ///
- ///
- AzureStorageNamedDeviceFactory(BlobUtilsV12.ServiceClients pageBlobAccount, ILogger logger = null)
+ ///
+ internal AzureStorageNamedDeviceFactory(string baseName, BlobUtilsV12.ServiceClients pageBlobAccount, ILogger logger)
{
- this.pageBlobAccount = pageBlobAccount;
this.logger = logger;
- }
- ///
- public void Initialize(string baseName)
- => InitializeAsync(baseName).GetAwaiter().GetResult();
-
-
- async Task InitializeAsync(string baseName)
- {
var path = baseName.Split('/');
var containerName = path[0];
var dirName = string.Join('/', path.Skip(1));
pageBlobContainer = BlobUtilsV12.GetContainerClients(pageBlobAccount, containerName);
- if (!await pageBlobContainer.WithRetries.ExistsAsync())
- await pageBlobContainer.WithRetries.CreateIfNotExistsAsync();
+ if (!pageBlobContainer.WithRetries.ExistsAsync().GetAwaiter().GetResult())
+ _ = pageBlobContainer.WithRetries.CreateIfNotExistsAsync().GetAwaiter().GetResult();
pageBlobDirectory = new BlobUtilsV12.BlobDirectory(pageBlobContainer, dirName);
}
-
///
public void Delete(FileDescriptor fileInfo)
{
diff --git a/libs/storage/Tsavorite/cs/src/devices/AzureStorageDevice/AzureStorageNamedDeviceFactoryCreator.cs b/libs/storage/Tsavorite/cs/src/devices/AzureStorageDevice/AzureStorageNamedDeviceFactoryCreator.cs
new file mode 100644
index 0000000000..0ace2cb9a1
--- /dev/null
+++ b/libs/storage/Tsavorite/cs/src/devices/AzureStorageDevice/AzureStorageNamedDeviceFactoryCreator.cs
@@ -0,0 +1,40 @@
+// Copyright (c) Microsoft Corporation.
+// Licensed under the MIT license.
+
+using Microsoft.Extensions.Logging;
+using Tsavorite.core;
+
+namespace Tsavorite.devices
+{
+ public class AzureStorageNamedDeviceFactoryCreator : INamedDeviceFactoryCreator
+ {
+ readonly BlobUtilsV12.ServiceClients pageBlobAccount;
+ readonly ILogger logger;
+
+ ///
+ /// Creator of factory for Azure devices
+ ///
+ ///
+ ///
+ public AzureStorageNamedDeviceFactoryCreator(string connectionString, ILogger logger = null)
+ : this(BlobUtilsV12.GetServiceClients(connectionString), logger)
+ {
+ }
+
+ ///
+ /// Creator of factory for Azure devices
+ ///
+ ///
+ ///
+ AzureStorageNamedDeviceFactoryCreator(BlobUtilsV12.ServiceClients pageBlobAccount, ILogger logger = null)
+ {
+ this.pageBlobAccount = pageBlobAccount;
+ this.logger = logger;
+ }
+
+ public INamedDeviceFactory Create(string baseName)
+ {
+ return new AzureStorageNamedDeviceFactory(baseName, pageBlobAccount, logger);
+ }
+ }
+}
\ No newline at end of file
diff --git a/libs/storage/Tsavorite/cs/test/CheckpointManagerTests.cs b/libs/storage/Tsavorite/cs/test/CheckpointManagerTests.cs
index 38f5d00693..6c21404529 100644
--- a/libs/storage/Tsavorite/cs/test/CheckpointManagerTests.cs
+++ b/libs/storage/Tsavorite/cs/test/CheckpointManagerTests.cs
@@ -30,14 +30,14 @@ public async Task CheckpointManagerPurgeCheck([Values] DeviceMode deviceMode)
if (deviceMode == DeviceMode.Local)
{
checkpointManager = new DeviceLogCommitCheckpointManager(
- new LocalStorageNamedDeviceFactory(),
+ new LocalStorageNamedDeviceFactoryCreator(),
new DefaultCheckpointNamingScheme(Path.Join(TestUtils.MethodTestDir, "checkpoints")), false); // PurgeAll deletes this directory
}
else
{
TestUtils.IgnoreIfNotRunningAzureTests();
checkpointManager = new DeviceLogCommitCheckpointManager(
- new AzureStorageNamedDeviceFactory(TestUtils.AzureEmulatedStorageString),
+ TestUtils.AzureStorageNamedDeviceFactoryCreator,
new AzureCheckpointNamingScheme($"{TestUtils.AzureTestContainer}/{TestUtils.AzureTestDirectory}"), false);
}
diff --git a/libs/storage/Tsavorite/cs/test/DeltaLogTests.cs b/libs/storage/Tsavorite/cs/test/DeltaLogTests.cs
index d502fbd46d..304691e2e8 100644
--- a/libs/storage/Tsavorite/cs/test/DeltaLogTests.cs
+++ b/libs/storage/Tsavorite/cs/test/DeltaLogTests.cs
@@ -79,7 +79,6 @@ public void DeltaLogTest1([Values] TestUtils.DeviceType deviceType)
for (int j = 0; j < len; j++)
{
unsafe { ClassicAssert.AreEqual((byte)_len, *(byte*)(address + j)); }
- ;
}
}
ClassicAssert.AreEqual(TotalCount, i, $"i={i} and TotalCount={TotalCount}");
diff --git a/libs/storage/Tsavorite/cs/test/DeviceLogTests.cs b/libs/storage/Tsavorite/cs/test/DeviceLogTests.cs
index 9cb35c2303..ae65d8200a 100644
--- a/libs/storage/Tsavorite/cs/test/DeviceLogTests.cs
+++ b/libs/storage/Tsavorite/cs/test/DeviceLogTests.cs
@@ -28,7 +28,7 @@ public async ValueTask PageBlobTsavoriteLogTest1([Values] LogChecksumType logChe
TestUtils.IgnoreIfNotRunningAzureTests();
var device = new AzureStorageDevice(TestUtils.AzureEmulatedStorageString, TestUtils.AzureTestContainer, TestUtils.AzureTestDirectory, "Tsavoritelog.log", deleteOnClose: true, logger: TestUtils.TestLoggerFactory.CreateLogger("asd"));
var checkpointManager = new DeviceLogCommitCheckpointManager(
- new AzureStorageNamedDeviceFactory(TestUtils.AzureEmulatedStorageString),
+ TestUtils.AzureStorageNamedDeviceFactoryCreator,
new AzureCheckpointNamingScheme($"{TestUtils.AzureTestContainer}/{TestUtils.AzureTestDirectory}"));
await TsavoriteLogTest1(logChecksum, device, checkpointManager, iteratorType);
device.Dispose();
@@ -43,7 +43,7 @@ public async ValueTask PageBlobTsavoriteLogTestWithLease([Values] LogChecksumTyp
TestUtils.IgnoreIfNotRunningAzureTests();
var device = new AzureStorageDevice(TestUtils.AzureEmulatedStorageString, TestUtils.AzureTestContainer, TestUtils.AzureTestDirectory, "TsavoritelogLease.log", deleteOnClose: true, underLease: true, blobManager: null, logger: TestUtils.TestLoggerFactory.CreateLogger("asd"));
var checkpointManager = new DeviceLogCommitCheckpointManager(
- new AzureStorageNamedDeviceFactory(TestUtils.AzureEmulatedStorageString),
+ TestUtils.AzureStorageNamedDeviceFactoryCreator,
new AzureCheckpointNamingScheme($"{TestUtils.AzureTestContainer}/{TestUtils.AzureTestDirectory}"));
await TsavoriteLogTest1(logChecksum, device, checkpointManager, iteratorType);
device.Dispose();
diff --git a/libs/storage/Tsavorite/cs/test/LockableUnsafeContextTests.cs b/libs/storage/Tsavorite/cs/test/LockableUnsafeContextTests.cs
index 54e0452d30..31f63ce1ba 100644
--- a/libs/storage/Tsavorite/cs/test/LockableUnsafeContextTests.cs
+++ b/libs/storage/Tsavorite/cs/test/LockableUnsafeContextTests.cs
@@ -1337,7 +1337,6 @@ void updater(int key, int iter)
Assert.Fail($"Unexpected updateOp {updateOp}");
return;
}
- ;
ClassicAssert.IsFalse(status.IsFaulted, $"Unexpected UpdateOp {updateOp}, status {status}");
}
catch (Exception)
diff --git a/libs/storage/Tsavorite/cs/test/LogResumeTests.cs b/libs/storage/Tsavorite/cs/test/LogResumeTests.cs
index a030915cd9..5abbae3742 100644
--- a/libs/storage/Tsavorite/cs/test/LogResumeTests.cs
+++ b/libs/storage/Tsavorite/cs/test/LogResumeTests.cs
@@ -106,7 +106,7 @@ public async Task TsavoriteLogResumePersistedReader2([Values] LogChecksumType lo
var input2 = new byte[] { 4, 5, 6, 7, 8, 9, 10 };
var input3 = new byte[] { 11, 12 };
- using (var logCommitManager = new DeviceLogCommitCheckpointManager(new LocalStorageNamedDeviceFactory(), new DefaultCheckpointNamingScheme(TestUtils.MethodTestDir), removeOutdated))
+ using (var logCommitManager = new DeviceLogCommitCheckpointManager(new LocalStorageNamedDeviceFactoryCreator(), new DefaultCheckpointNamingScheme(TestUtils.MethodTestDir), removeOutdated))
{
using (var l = new TsavoriteLog(new TsavoriteLogSettings { LogDevice = device, PageSizeBits = 16, MemorySizeBits = 17, LogChecksum = logChecksum, LogCommitManager = logCommitManager }))
{
@@ -145,7 +145,7 @@ public async Task TsavoriteLogResumePersistedReader3([Values] LogChecksumType lo
var input2 = new byte[] { 4, 5, 6, 7, 8, 9, 10 };
var input3 = new byte[] { 11, 12 };
- using (var logCommitManager = new DeviceLogCommitCheckpointManager(new LocalStorageNamedDeviceFactory(), new DefaultCheckpointNamingScheme(TestUtils.MethodTestDir), removeOutdated))
+ using (var logCommitManager = new DeviceLogCommitCheckpointManager(new LocalStorageNamedDeviceFactoryCreator(), new DefaultCheckpointNamingScheme(TestUtils.MethodTestDir), removeOutdated))
{
using (var l = new TsavoriteLog(new TsavoriteLogSettings { LogDevice = device, PageSizeBits = 16, MemorySizeBits = 17, LogChecksum = logChecksum, LogCommitManager = logCommitManager }))
{
diff --git a/libs/storage/Tsavorite/cs/test/LogTests.cs b/libs/storage/Tsavorite/cs/test/LogTests.cs
index b8b48714eb..40c6eea941 100644
--- a/libs/storage/Tsavorite/cs/test/LogTests.cs
+++ b/libs/storage/Tsavorite/cs/test/LogTests.cs
@@ -82,7 +82,7 @@ protected void BaseSetup(bool deleteOnClose = true)
TestUtils.DeleteDirectory(TestUtils.MethodTestDir, wait: true);
manager = new DeviceLogCommitCheckpointManager(
- new LocalStorageNamedDeviceFactory(deleteOnClose: deleteOnClose),
+ new LocalStorageNamedDeviceFactoryCreator(deleteOnClose: deleteOnClose),
new DefaultCheckpointNamingScheme(TestUtils.MethodTestDir), false);
this.deleteOnClose = deleteOnClose;
}
diff --git a/libs/storage/Tsavorite/cs/test/RecoveryChecks.cs b/libs/storage/Tsavorite/cs/test/RecoveryChecks.cs
index 6bfe90fde8..4b7c1cd6c3 100644
--- a/libs/storage/Tsavorite/cs/test/RecoveryChecks.cs
+++ b/libs/storage/Tsavorite/cs/test/RecoveryChecks.cs
@@ -814,14 +814,14 @@ public async ValueTask IncrSnapshotRecoveryCheck([Values] DeviceMode deviceMode)
if (deviceMode == DeviceMode.Local)
{
checkpointManager = new DeviceLogCommitCheckpointManager(
- new LocalStorageNamedDeviceFactory(),
+ new LocalStorageNamedDeviceFactoryCreator(),
new DefaultCheckpointNamingScheme(TestUtils.MethodTestDir + "/checkpoints/")); // PurgeAll deletes this directory
}
else
{
TestUtils.IgnoreIfNotRunningAzureTests();
checkpointManager = new DeviceLogCommitCheckpointManager(
- new AzureStorageNamedDeviceFactory(TestUtils.AzureEmulatedStorageString),
+ TestUtils.AzureStorageNamedDeviceFactoryCreator,
new AzureCheckpointNamingScheme($"{TestUtils.AzureTestContainer}/{TestUtils.AzureTestDirectory}"));
}
diff --git a/libs/storage/Tsavorite/cs/test/ReproReadCacheTest.cs b/libs/storage/Tsavorite/cs/test/ReproReadCacheTest.cs
index dbacbc5332..280f6b3938 100644
--- a/libs/storage/Tsavorite/cs/test/ReproReadCacheTest.cs
+++ b/libs/storage/Tsavorite/cs/test/ReproReadCacheTest.cs
@@ -79,7 +79,6 @@ public void Setup()
kvSettings.ReadCacheSecondChanceFraction = 0.1;
kvSettings.ReadCacheEnabled = true;
}
- ;
continue;
}
if (arg is DeviceType deviceType)
diff --git a/libs/storage/Tsavorite/cs/test/SharedDirectoryTests.cs b/libs/storage/Tsavorite/cs/test/SharedDirectoryTests.cs
index d060378032..8794cc71bf 100644
--- a/libs/storage/Tsavorite/cs/test/SharedDirectoryTests.cs
+++ b/libs/storage/Tsavorite/cs/test/SharedDirectoryTests.cs
@@ -196,7 +196,7 @@ private static void Test(TsavoriteTestInstance tsavoriteInstance, Guid checkpoin
var checkpointInfo = default(HybridLogRecoveryInfo);
checkpointInfo.Recover(checkpointToken,
new DeviceLogCommitCheckpointManager(
- new LocalStorageNamedDeviceFactory(),
+ new LocalStorageNamedDeviceFactoryCreator(),
new DefaultCheckpointNamingScheme(
new DirectoryInfo(tsavoriteInstance.CheckpointDirectory).FullName)));
diff --git a/libs/storage/Tsavorite/cs/test/SimpleRecoveryTest.cs b/libs/storage/Tsavorite/cs/test/SimpleRecoveryTest.cs
index 0ca58a3bd5..12791eab5c 100644
--- a/libs/storage/Tsavorite/cs/test/SimpleRecoveryTest.cs
+++ b/libs/storage/Tsavorite/cs/test/SimpleRecoveryTest.cs
@@ -65,7 +65,7 @@ public async ValueTask PageBlobSimpleRecoveryTest(
{
IgnoreIfNotRunningAzureTests();
checkpointManager = new DeviceLogCommitCheckpointManager(
- new AzureStorageNamedDeviceFactory(AzureEmulatedStorageString),
+ TestUtils.AzureStorageNamedDeviceFactoryCreator,
new AzureCheckpointNamingScheme($"{AzureTestContainer}/{AzureTestDirectory}"));
await SimpleRecoveryTest1_Worker(checkpointType, completionSyncMode, testCommitCookie);
checkpointManager.PurgeAll();
@@ -82,7 +82,7 @@ public async ValueTask LocalDeviceSimpleRecoveryTest(
[Values] bool testCommitCookie)
{
checkpointManager = new DeviceLogCommitCheckpointManager(
- new LocalStorageNamedDeviceFactory(),
+ new LocalStorageNamedDeviceFactoryCreator(),
new DefaultCheckpointNamingScheme(Path.Join(MethodTestDir, "chkpt")));
await SimpleRecoveryTest1_Worker(checkpointType, completionSyncMode, testCommitCookie);
checkpointManager.PurgeAll();
@@ -196,7 +196,7 @@ public async ValueTask SimpleRecoveryTest2(
[Values(CheckpointType.Snapshot, CheckpointType.FoldOver)] CheckpointType checkpointType,
[Values] CompletionSyncMode completionSyncMode)
{
- checkpointManager = new DeviceLogCommitCheckpointManager(new LocalStorageNamedDeviceFactory(), new DefaultCheckpointNamingScheme(Path.Join(MethodTestDir, "checkpoints4")), false);
+ checkpointManager = new DeviceLogCommitCheckpointManager(new LocalStorageNamedDeviceFactoryCreator(), new DefaultCheckpointNamingScheme(Path.Join(MethodTestDir, "checkpoints4")), false);
log = Devices.CreateLogDevice(Path.Join(MethodTestDir, "SimpleRecoveryTest2.log"), deleteOnClose: true);
store1 = new(new()
@@ -322,7 +322,7 @@ public async ValueTask ShouldRecoverBeginAddress([Values] CompletionSyncMode com
[Category("TsavoriteKV"), Category("CheckpointRestore")]
public async ValueTask SimpleReadAndUpdateInfoTest([Values] CompletionSyncMode completionSyncMode)
{
- checkpointManager = new DeviceLogCommitCheckpointManager(new LocalStorageNamedDeviceFactory(), new DefaultCheckpointNamingScheme(Path.Join(MethodTestDir, "checkpoints")), false);
+ checkpointManager = new DeviceLogCommitCheckpointManager(new LocalStorageNamedDeviceFactoryCreator(), new DefaultCheckpointNamingScheme(Path.Join(MethodTestDir, "checkpoints")), false);
log = Devices.CreateLogDevice(Path.Join(MethodTestDir, "SimpleReadAndUpdateInfoTest.log"), deleteOnClose: true);
store1 = new(new()
diff --git a/libs/storage/Tsavorite/cs/test/SpanByteIterationTests.cs b/libs/storage/Tsavorite/cs/test/SpanByteIterationTests.cs
index 109b31319c..9a379b7cf2 100644
--- a/libs/storage/Tsavorite/cs/test/SpanByteIterationTests.cs
+++ b/libs/storage/Tsavorite/cs/test/SpanByteIterationTests.cs
@@ -66,6 +66,26 @@ public readonly void OnException(Exception exception, long numberOfRecords) { }
public readonly void OnStop(bool completed, long numberOfRecords) { }
}
+ internal struct IterationCollisionTestFunctions : IScanIteratorFunctions
+ {
+ internal List keys;
+ public IterationCollisionTestFunctions() => keys = new();
+
+ public unsafe bool SingleReader(ref SpanByte key, ref SpanByte value, RecordMetadata recordMetadata, long numberOfRecords, out CursorRecordResult cursorRecordResult)
+ {
+ keys.Add(*(long*)key.ToPointer());
+ cursorRecordResult = CursorRecordResult.Accept; // default; not used here
+ return true;
+ }
+
+ public bool ConcurrentReader(ref SpanByte key, ref SpanByte value, RecordMetadata recordMetadata, long numberOfRecords, out CursorRecordResult cursorRecordResult)
+ => SingleReader(ref key, ref value, recordMetadata, numberOfRecords, out cursorRecordResult);
+
+ public readonly bool OnStart(long beginAddress, long endAddress) => true;
+ public readonly void OnException(Exception exception, long numberOfRecords) { }
+ public readonly void OnStop(bool completed, long numberOfRecords) { }
+ }
+
[Test]
[Category(TsavoriteKVTestCategory)]
[Category(SmokeTestCategory)]
diff --git a/libs/storage/Tsavorite/cs/test/SpanByteLogScanTests.cs b/libs/storage/Tsavorite/cs/test/SpanByteLogScanTests.cs
index e93701eaff..788523d795 100644
--- a/libs/storage/Tsavorite/cs/test/SpanByteLogScanTests.cs
+++ b/libs/storage/Tsavorite/cs/test/SpanByteLogScanTests.cs
@@ -8,8 +8,8 @@
using NUnit.Framework;
using NUnit.Framework.Legacy;
using Tsavorite.core;
-using Tsavorite.test.Revivification;
using static Tsavorite.core.Utility;
+using static Tsavorite.test.SpanByteIterationTests;
using static Tsavorite.test.TestUtils;
namespace Tsavorite.test.spanbyte
@@ -43,6 +43,7 @@ internal class SpanByteLogScanTests
private IDevice log;
const int TotalRecords = 2000;
const int PageSizeBits = 15;
+ const int ComparerModulo = 100;
[SetUp]
public void Setup()
@@ -52,7 +53,7 @@ public void Setup()
{
if (arg is HashModulo mod && mod == HashModulo.Hundred)
{
- comparer = new SpanByteComparerModulo(100);
+ comparer = new SpanByteComparerModulo(ComparerModulo);
continue;
}
}
@@ -444,5 +445,39 @@ public unsafe void SpanByteJumpToBeginAddressTest()
ClassicAssert.AreEqual(expectedKey, int.Parse(MemoryMarshal.Cast(iter.GetValue().AsSpan())));
}
}
+
+ [Test]
+ [Category(TsavoriteKVTestCategory)]
+ [Category(IteratorCategory)]
+ [Category(SmokeTestCategory)]
+ public void SpanByteIterationPendingCollisionTest([Values(HashModulo.Hundred)] HashModulo hashMod)
+ {
+ using var session = store.NewSession(new VLVectorFunctions());
+ var bContext = session.BasicContext;
+ IterationCollisionTestFunctions scanIteratorFunctions = new();
+
+ const int totalRecords = 2000;
+ var start = store.Log.TailAddress;
+
+ // Note: We only have a single value element; we are not exercising the "Variable Length" aspect here.
+ Span keySpan = stackalloc long[1], valueSpan = stackalloc long[1];
+ SpanByte key = keySpan.AsSpanByte(), value = valueSpan.AsSpanByte();
+
+ // Initial population
+ for (int ii = 0; ii < totalRecords; ii++)
+ {
+ keySpan[0] = valueSpan[0] = ii;
+ _ = bContext.Upsert(ref key, ref value);
+ }
+
+ // Evict so we can test the pending scan push
+ store.Log.FlushAndEvict(wait: true);
+
+ long cursor = 0;
+ // Currently this returns false because there are some still-pending records when ScanLookup's GetNext loop ends (2000 is not an even multiple
+ // of 256, which is the CompletePending block size). If this returns true, it means the CompletePending block fired on the last valid record.
+ ClassicAssert.IsFalse(session.ScanCursor(ref cursor, totalRecords, scanIteratorFunctions), $"ScanCursor returned true even though all {scanIteratorFunctions.keys.Count} records were returned");
+ ClassicAssert.AreEqual(totalRecords, scanIteratorFunctions.keys.Count);
+ }
}
}
\ No newline at end of file
diff --git a/libs/storage/Tsavorite/cs/test/TestUtils.cs b/libs/storage/Tsavorite/cs/test/TestUtils.cs
index 751e2b75b3..7e35045467 100644
--- a/libs/storage/Tsavorite/cs/test/TestUtils.cs
+++ b/libs/storage/Tsavorite/cs/test/TestUtils.cs
@@ -3,6 +3,7 @@
using System;
using System.IO;
+using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
@@ -28,6 +29,7 @@ public static class TestUtils
internal const string CheckpointRestoreCategory = "CheckpointRestore";
internal const string MallocFixedPageSizeCategory = "MallocFixedPageSize";
internal const string RMWTestCategory = "RMW";
+ internal const string IteratorCategory = "Iterator";
internal const string ModifiedBitTestCategory = "ModifiedBitTest";
internal const string RevivificationCategory = "Revivification";
@@ -197,6 +199,7 @@ internal static string AzureTestContainer
internal static string AzureTestDirectory => TestContext.CurrentContext.Test.MethodName;
internal const string AzureEmulatedStorageString = "UseDevelopmentStorage=true;";
+ internal static AzureStorageNamedDeviceFactoryCreator AzureStorageNamedDeviceFactoryCreator = IsRunningAzureTests ? new(AzureEmulatedStorageString) : null;
public enum AllocatorType
{
diff --git a/playground/CommandInfoUpdater/CommandInfoUpdater.cs b/playground/CommandInfoUpdater/CommandInfoUpdater.cs
index 521ab9e277..a96d70dda2 100644
--- a/playground/CommandInfoUpdater/CommandInfoUpdater.cs
+++ b/playground/CommandInfoUpdater/CommandInfoUpdater.cs
@@ -15,7 +15,7 @@ namespace CommandInfoUpdater
///
public class CommandInfoUpdater
{
- const int QUERY_CMD_BATCH_SIZE = 25;
+ const int QUERY_CMD_BATCH_SIZE = 10;
private static readonly string CommandInfoFileName = "RespCommandsInfo.json";
private static readonly string GarnetCommandInfoJsonPath = "GarnetCommandsInfo.json";
diff --git a/playground/CommandInfoUpdater/GarnetCommandsDocs.json b/playground/CommandInfoUpdater/GarnetCommandsDocs.json
index 84c80cea0e..bf0bd62dc9 100644
--- a/playground/CommandInfoUpdater/GarnetCommandsDocs.json
+++ b/playground/CommandInfoUpdater/GarnetCommandsDocs.json
@@ -186,6 +186,12 @@
"Summary": "Forces garbage collection.",
"Group": "Server"
},
+ {
+ "Command": "HCOLLECT",
+ "Name": "HCOLLECT",
+ "Summary": "Manually trigger deletion of expired fields from memory",
+ "Group": "Hash"
+ },
{
"Command": "SECONDARYOF",
"Name": "SECONDARYOF",
diff --git a/playground/CommandInfoUpdater/GarnetCommandsInfo.json b/playground/CommandInfoUpdater/GarnetCommandsInfo.json
index 04570affc7..1cc3b765dd 100644
--- a/playground/CommandInfoUpdater/GarnetCommandsInfo.json
+++ b/playground/CommandInfoUpdater/GarnetCommandsInfo.json
@@ -393,6 +393,31 @@
"KeySpecifications": null,
"SubCommands": null
},
+ {
+ "Command": "HCOLLECT",
+ "Name": "HCOLLECT",
+ "Arity": 2,
+ "Flags": "Admin, Write",
+ "FirstKey": 1,
+ "LastKey": 1,
+ "Step": 1,
+ "AclCategories": "Admin, Hash, Write, Garnet",
+ "KeySpecifications": [
+ {
+ "BeginSearch": {
+ "TypeDiscriminator": "BeginSearchIndex",
+ "Index": 1
+ },
+ "FindKeys": {
+ "TypeDiscriminator": "FindKeysRange",
+ "LastKey": 0,
+ "KeyStep": 1,
+ "Limit": 0
+ },
+ "Flags": "RW, Access, Update"
+ }
+ ]
+ },
{
"Command": "LATENCY",
"Name": "LATENCY",
@@ -730,6 +755,56 @@
}
]
},
+ {
+ "Command": "SPUBLISH",
+ "Name": "SPUBLISH",
+ "Arity": 3,
+ "Flags": "Loading, NoScript, PubSub, Stale",
+ "FirstKey": 1,
+ "LastKey": 1,
+ "Step": 1,
+ "AclCategories": "PubSub, Slow, Read",
+ "KeySpecifications": [
+ {
+ "BeginSearch": {
+ "TypeDiscriminator": "BeginSearchIndex",
+ "Index": 1
+ },
+ "FindKeys": {
+ "TypeDiscriminator": "FindKeysRange",
+ "LastKey": 0,
+ "KeyStep": 1,
+ "Limit": 0
+ },
+ "Flags": "RO"
+ }
+ ]
+ },
+ {
+ "Command": "SSUBSCRIBE",
+ "Name": "SSUBSCRIBE",
+ "Arity": -2,
+ "Flags": "Loading, NoScript, PubSub, Stale",
+ "FirstKey": 1,
+ "LastKey": -1,
+ "Step": 1,
+ "AclCategories": "PubSub, Slow, Read",
+ "KeySpecifications": [
+ {
+ "BeginSearch": {
+ "TypeDiscriminator": "BeginSearchIndex",
+ "Index": 1
+ },
+ "FindKeys": {
+ "TypeDiscriminator": "FindKeysRange",
+ "LastKey": -1,
+ "KeyStep": 1,
+ "Limit": 0
+ },
+ "Flags": "RO"
+ }
+ ]
+ },
{
"Command": "WATCH",
"Name": "WATCH",
diff --git a/test/Garnet.test/GarnetServerConfigTests.cs b/test/Garnet.test/GarnetServerConfigTests.cs
index 2b997dc9c4..b2a1e8faad 100644
--- a/test/Garnet.test/GarnetServerConfigTests.cs
+++ b/test/Garnet.test/GarnetServerConfigTests.cs
@@ -218,8 +218,7 @@ public void ImportExportConfigAzure()
if (TestUtils.IsRunningAzureTests)
{
// Delete blob if exists
- var deviceFactory = new AzureStorageNamedDeviceFactory(AzureEmulatedStorageString, default);
- deviceFactory.Initialize(AzureTestDirectory);
+ var deviceFactory = TestUtils.AzureStorageNamedDeviceFactoryCreator.Create(AzureTestDirectory);
deviceFactory.Delete(new FileDescriptor { directoryName = "" });
var parseSuccessful = ServerSettingsManager.TryParseCommandLineArguments(null, out var options, out var invalidOptions, out var exitGracefully);
@@ -243,7 +242,6 @@ public void ImportExportConfigAzure()
ClassicAssert.IsTrue(options.MemorySize == "128m");
// Delete blob
- deviceFactory.Initialize(AzureTestDirectory);
deviceFactory.Delete(new FileDescriptor { directoryName = "" });
}
}
diff --git a/test/Garnet.test/RespCommandTests.cs b/test/Garnet.test/RespCommandTests.cs
index 7e8cb67c0b..b1847014db 100644
--- a/test/Garnet.test/RespCommandTests.cs
+++ b/test/Garnet.test/RespCommandTests.cs
@@ -137,7 +137,8 @@ public void CommandsDocsCoverageTest()
}
var allCommands = Enum.GetValues().Except(noMetadataCommands).Except(internalOnlyCommands);
- CollectionAssert.AreEquivalent(allCommands, commandsWithDocs, "Some commands have missing docs. Please see https://microsoft.github.io/garnet/docs/dev/garnet-api#adding-command-info for more details.");
+ Assert.That(commandsWithDocs, Is.SupersetOf(allCommands),
+ "Some commands have missing docs. Please see https://microsoft.github.io/garnet/docs/dev/garnet-api#adding-command-info for more details.");
}
///
diff --git a/test/Garnet.test/TestUtils.cs b/test/Garnet.test/TestUtils.cs
index d95d3ea4b4..062f03e4cc 100644
--- a/test/Garnet.test/TestUtils.cs
+++ b/test/Garnet.test/TestUtils.cs
@@ -72,6 +72,8 @@ internal static string AzureTestContainer
}
internal static string AzureTestDirectory => TestContext.CurrentContext.Test.MethodName;
internal const string AzureEmulatedStorageString = "UseDevelopmentStorage=true;";
+ internal static AzureStorageNamedDeviceFactoryCreator AzureStorageNamedDeviceFactoryCreator =
+ IsRunningAzureTests ? new AzureStorageNamedDeviceFactoryCreator(AzureEmulatedStorageString, null) : null;
public const string certFile = "testcert.pfx";
public const string certPassword = "placeholder";
@@ -287,8 +289,8 @@ public static GarnetServer CreateGarnetServer(
MetricsSamplingFrequency = metricsSamplingFreq,
LatencyMonitor = latencyMonitor,
DeviceFactoryCreator = useAzureStorage ?
- () => new AzureStorageNamedDeviceFactory(AzureEmulatedStorageString, logger)
- : () => new LocalStorageNamedDeviceFactory(logger: logger),
+ logger == null ? TestUtils.AzureStorageNamedDeviceFactoryCreator : new AzureStorageNamedDeviceFactoryCreator(AzureEmulatedStorageString, logger)
+ : new LocalStorageNamedDeviceFactoryCreator(logger: logger),
AuthSettings = authenticationSettings,
ExtensionBinPaths = extensionBinPaths,
ExtensionAllowUnsignedAssemblies = extensionAllowUnsignedAssemblies,
@@ -597,8 +599,8 @@ public static GarnetServerOptions GetGarnetServerOptions(
logger: logger)
: null,
DeviceFactoryCreator = useAzureStorage ?
- () => new AzureStorageNamedDeviceFactory(AzureEmulatedStorageString, logger)
- : () => new LocalStorageNamedDeviceFactory(logger: logger),
+ logger == null ? TestUtils.AzureStorageNamedDeviceFactoryCreator : new AzureStorageNamedDeviceFactoryCreator(AzureEmulatedStorageString, logger)
+ : new LocalStorageNamedDeviceFactoryCreator(logger: logger),
MainMemoryReplication = mainMemoryReplication,
AofMemorySize = aofMemorySize,
OnDemandCheckpoint = onDemandCheckpoint,