Skip to content

Commit

Permalink
Merge branch 'main' into tedhar/net9
Browse files Browse the repository at this point in the history
  • Loading branch information
darrenge authored Mar 8, 2025
2 parents 45a169f + a01bb51 commit af4a7eb
Show file tree
Hide file tree
Showing 172 changed files with 4,864 additions and 3,390 deletions.
3 changes: 3 additions & 0 deletions .editorconfig
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ dotnet_diagnostic.CA1866.severity = warning
dotnet_diagnostic.CA1867.severity = warning

# Usage rules
# Treat unnecessary usings as warnings
dotnet_diagnostic.IDE0005.severity = warning

# CA2254: Template should be a static expression
dotnet_diagnostic.CA2254.severity = warning

Expand Down
2 changes: 1 addition & 1 deletion Version.props
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
<Project>
<!-- VersionPrefix property for builds and packages -->
<PropertyGroup>
<VersionPrefix>1.0.58</VersionPrefix>
<VersionPrefix>1.0.59</VersionPrefix>
</PropertyGroup>
</Project>
2 changes: 0 additions & 2 deletions benchmark/BDN.benchmark/Embedded/EmbeddedNetworkHandler.cs
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

using System;
using System.Diagnostics;
using System.Threading.Tasks;
using Garnet.common;
using Garnet.networking;
using Microsoft.Extensions.Logging;
Expand Down
2 changes: 0 additions & 2 deletions benchmark/BDN.benchmark/Embedded/EmbeddedRespServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,8 @@
// Licensed under the MIT license.

using Garnet;
using Garnet.common;
using Garnet.server;
using Microsoft.Extensions.Logging;
using Tsavorite.core;

namespace Embedded.server
{
Expand Down
2 changes: 0 additions & 2 deletions benchmark/BDN.benchmark/Embedded/GarnetServerEmbedded.cs
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

using System;
using System.Net;
using System.Net.Security;
using System.Threading;
using Garnet.common;
using Garnet.networking;
using Garnet.server;
Expand Down
2 changes: 1 addition & 1 deletion benchmark/BDN.benchmark/Lua/LuaParams.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public LuaParams(LuaMemoryManagementMode mode, bool memoryLimit, TimeSpan? timeo
/// Get the equivalent <see cref="LuaOptions"/>.
/// </summary>
public LuaOptions CreateOptions()
=> new(Mode, MemoryLimit ? "2m" : "", Timeout ?? System.Threading.Timeout.InfiniteTimeSpan);
=> new(Mode, MemoryLimit ? "2m" : "", Timeout ?? System.Threading.Timeout.InfiniteTimeSpan, LuaLoggingMode.Enable);

/// <summary>
/// String representation
Expand Down
2 changes: 1 addition & 1 deletion benchmark/BDN.benchmark/Operations/OperationsBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public virtual void GlobalSetup()
QuietMode = true,
EnableLua = true,
DisablePubSub = true,
LuaOptions = new(LuaMemoryManagementMode.Native, "", Timeout.InfiniteTimeSpan),
LuaOptions = new(LuaMemoryManagementMode.Native, "", Timeout.InfiniteTimeSpan, LuaLoggingMode.Enable),
};

if (Params.useAof)
Expand Down
1 change: 0 additions & 1 deletion benchmark/Resp.benchmark/RespOnlineBench.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
// Licensed under the MIT license.

using System;
using System.Buffers;
using System.Diagnostics;
using System.Linq;
using System.Net;
Expand Down
60 changes: 34 additions & 26 deletions libs/cluster/Server/ClusterConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
using System.Diagnostics;
using Microsoft.Extensions.Logging;
using System.Net;
using System.Text;

namespace Garnet.cluster
{
Expand Down Expand Up @@ -489,14 +490,14 @@ public IPEndPoint GetEndpointFromNodeId(string nodeid)
/// <returns>Formatted string.</returns>
public string GetClusterInfo(ClusterProvider clusterProvider)
{
var nodes = "";
var nodesStringBuilder = new StringBuilder();
for (ushort i = 1; i <= NumWorkers; i++)
{
var info = default(ConnectionInfo);
_ = clusterProvider?.clusterManager?.GetConnectionInfo(workers[i].Nodeid, out info);
nodes += GetNodeInfo(i, info);
GetNodeInfo(i, info, nodesStringBuilder);
}
return nodes;
return nodesStringBuilder.ToString();
}

/// <summary>
Expand All @@ -516,24 +517,33 @@ public string GetNodeInfo(ushort workerId, ConnectionInfo info)
//<config-epoch>
//<link-state>
//<slot> <slot> ... <slot>
var nodeInfoStringBuilder = new StringBuilder();
GetNodeInfo(workerId, info, nodeInfoStringBuilder);
return nodeInfoStringBuilder.ToString();
}

return $"{workers[workerId].Nodeid} " +
$"{workers[workerId].Address}:{workers[workerId].Port}@{workers[workerId].Port + 10000},{workers[workerId].hostname} " +
$"{(workerId == 1 ? "myself," : "")}{(workers[workerId].Role == NodeRole.PRIMARY ? "master" : "slave")} " +
$"{(workers[workerId].Role == NodeRole.REPLICA ? workers[workerId].ReplicaOfNodeId : "-")} " +
$"{info.ping} " +
$"{info.pong} " +
$"{workers[workerId].ConfigEpoch} " +
$"{(info.connected || workerId == 1 ? "connected" : "disconnected")}" +
$"{GetSlotRange(workerId)}" +
$"{GetSpecialStates(workerId)}\n";
private void GetNodeInfo(ushort workerId, ConnectionInfo info, StringBuilder nodeInfoStringBuilder)
{
_ = nodeInfoStringBuilder
.Append(workers[workerId].Nodeid).Append(' ')
.Append(workers[workerId].Address).Append(':').Append(workers[workerId].Port)
.Append('@').Append(workers[workerId].Port + 10000).Append(',').Append(workers[workerId].hostname).Append(' ')
.Append(workerId == 1 ? "myself," : "")
.Append(workers[workerId].Role == NodeRole.PRIMARY ? "master" : "slave").Append(' ')
.Append(workers[workerId].Role == NodeRole.REPLICA ? workers[workerId].ReplicaOfNodeId : '-').Append(' ')
.Append(info.ping).Append(' ')
.Append(info.pong).Append(' ')
.Append(workers[workerId].ConfigEpoch).Append(' ')
.Append(info.connected || workerId == 1 ? "connected" : "disconnected");
AppendSlotRange(nodeInfoStringBuilder, workerId);
AppendSpecialStates(nodeInfoStringBuilder, workerId);
_ = nodeInfoStringBuilder.Append('\n');
}

private string GetSpecialStates(ushort workerId)
private void AppendSpecialStates(StringBuilder stringBuilder, uint workerId)
{
// Only print special states for local node
if (workerId != 1) return "";
var specialStates = "";
if (workerId != 1) return;
for (var slot = 0; slot < slotMap.Length; slot++)
{
var _workerId = slotMap[slot]._workerId;
Expand All @@ -545,14 +555,14 @@ private string GetSpecialStates(ushort workerId)
var _nodeId = workers[_workerId].Nodeid;
if (_nodeId == null) continue;

specialStates += _state switch
stringBuilder.Append(_state switch
{
SlotState.MIGRATING => $" [{slot}->-{_nodeId}]",
SlotState.IMPORTING => $" [{slot}-<-{_nodeId}]",
_ => ""
};
});
}
return specialStates;
return;
}

/// <summary>
Expand Down Expand Up @@ -721,9 +731,8 @@ public string GetSlotsInfo()
return completeSlotInfo;
}

private string GetSlotRange(ushort workerId)
private void AppendSlotRange(StringBuilder stringBuilder, uint workerId)
{
string result = "";
ushort start = ushort.MaxValue, end = 0;
for (ushort i = 0; i < MAX_HASH_SLOT_VALUE; i++)
{
Expand All @@ -736,19 +745,18 @@ private string GetSlotRange(ushort workerId)
{
if (start != ushort.MaxValue)
{
if (end == start) result += $" {start}";
else result += $" {start}-{end}";
if (end == start) stringBuilder.Append($" {start}");
else stringBuilder.Append($" {start}-{end}");
start = ushort.MaxValue;
end = 0;
}
}
}
if (start != ushort.MaxValue)
{
if (end == start) result += $" {start}";
else result += $" {start}-{end}";
if (end == start) stringBuilder.Append($" {start}");
else stringBuilder.Append($" {start}-{end}");
}
return result;
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ internal sealed unsafe class SnapshotIteratorManager
long currentFlushEventCount = 0;
long lastFlushEventCount = 0;


public long CheckpointCoveredAddress { get; private set; }

public SnapshotIteratorManager(ReplicationSyncManager replicationSyncManager, CancellationToken cancellationToken, ILogger logger = null)
Expand Down Expand Up @@ -195,8 +194,9 @@ public void OnStop(bool completed, long numberOfRecords, bool isMainStore, long
// Wait for flush and response to complete
replicationSyncManager.WaitForFlush().GetAwaiter().GetResult();

// Enqueue version change commit
replicationSyncManager.ClusterProvider.storeWrapper.EnqueueCommit(isMainStore, targetVersion);
// Enqueue commit end marker
var entryType = isMainStore ? AofEntryType.MainStoreStreamingCheckpointEndCommit : AofEntryType.ObjectStoreStreamingCheckpointEndCommit;
replicationSyncManager.ClusterProvider.storeWrapper.EnqueueCommit(entryType, targetVersion);

logger?.LogTrace("{OnStop} {store} {numberOfRecords} {targetVersion}",
nameof(OnStop), isMainStore ? "MAIN STORE" : "OBJECT STORE", numberOfRecords, targetVersion);
Expand Down
5 changes: 4 additions & 1 deletion libs/cluster/Server/Replication/ReplicationManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,10 @@ void CheckpointVersionShift(bool isMainStore, long oldVersion, long newVersion)
{
if (clusterProvider.clusterManager.CurrentConfig.LocalNodeRole == NodeRole.REPLICA)
return;
storeWrapper.EnqueueCommit(isMainStore, newVersion, streaming: true);
var entryType = clusterProvider.serverOptions.ReplicaDisklessSync ?
(isMainStore ? AofEntryType.MainStoreStreamingCheckpointStartCommit : AofEntryType.ObjectStoreStreamingCheckpointStartCommit) :
(isMainStore ? AofEntryType.MainStoreCheckpointStartCommit : AofEntryType.ObjectStoreCheckpointStartCommit);
storeWrapper.EnqueueCommit(entryType, newVersion);
}

/// <summary>
Expand Down
1 change: 0 additions & 1 deletion libs/common/Logging/FileLoggerProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
// Licensed under the MIT license.

using System;
using System.Diagnostics;
using System.IO;
using System.Text;
using Microsoft.Extensions.Logging;
Expand Down
62 changes: 53 additions & 9 deletions libs/common/RespReadUtils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -304,13 +304,13 @@ public static bool TryReadInt32Safe(ref byte* ptr, byte* end, out int value, out
/// <param name="length">If parsing was successful, contains the extracted length from the header.</param>
/// <param name="ptr">The starting position in the RESP string. Will be advanced if parsing is successful.</param>
/// <param name="end">The current end of the RESP string.</param>
/// <param name="isArray">Whether to parse an array length header ('*...\r\n') or a string length header ('$...\r\n').</param>
/// <param name="expectedSigil">Expected type of RESP header, defaults to string ('$').</param>
/// <returns>True if a length header was successfully read.</returns>
/// <exception cref="RespParsingException">Thrown if the length header is negative.</exception>
/// <exception cref="RespParsingException">Thrown if unexpected token is read.</exception>
/// <exception cref="RespParsingException">Thrown if integer overflow occurs.</exception>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static bool TryReadUnsignedLengthHeader(out int length, ref byte* ptr, byte* end, bool isArray = false)
public static bool TryReadUnsignedLengthHeader(out int length, ref byte* ptr, byte* end, char expectedSigil = '$')
{
length = -1;
if (ptr + 3 > end)
Expand All @@ -324,7 +324,7 @@ public static bool TryReadUnsignedLengthHeader(out int length, ref byte* ptr, by
RespParsingException.ThrowInvalidStringLength(length);
}

if (!TryReadSignedLengthHeader(out length, ref ptr, end, isArray))
if (!TryReadSignedLengthHeader(out length, ref ptr, end, expectedSigil))
return false;

return true;
Expand All @@ -342,12 +342,12 @@ public static bool TryReadUnsignedLengthHeader(out int length, ref byte* ptr, by
/// <param name="length">If parsing was successful, contains the extracted length from the header.</param>
/// <param name="ptr">The starting position in the RESP string. Will be advanced if parsing is successful.</param>
/// <param name="end">The current end of the RESP string.</param>
/// <param name="isArray">Whether to parse an array length header ('*...\r\n') or a string length header ('$...\r\n').</param>
/// <param name="expectedSigil">Expected type of RESP header, defaults to string ('$').</param>
/// <returns>True if a length header was successfully read.</returns>
/// <exception cref="RespParsingException">Thrown if unexpected token is read.</exception>
/// <exception cref="RespParsingException">Thrown if integer overflow occurs.</exception>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static bool TryReadSignedLengthHeader(out int length, ref byte* ptr, byte* end, bool isArray = false)
public static bool TryReadSignedLengthHeader(out int length, ref byte* ptr, byte* end, char expectedSigil = '$')
{
length = -1;
if (ptr + 3 > end)
Expand All @@ -357,7 +357,7 @@ public static bool TryReadSignedLengthHeader(out int length, ref byte* ptr, byte
var negative = *readHead == '-';

// String length headers must start with a '$', array headers with '*'
if (*ptr != (isArray ? '*' : '$'))
if (*ptr != expectedSigil)
{
RespParsingException.ThrowUnexpectedToken(*ptr);
}
Expand Down Expand Up @@ -486,7 +486,7 @@ public static bool TryReadInt64(out long number, ref byte* ptr, byte* end, out b
/// <exception cref="RespParsingException">Thrown if unexpected token is read.</exception>
/// <exception cref="RespParsingException">Thrown if integer overflow occurs.</exception>
public static bool TryReadUnsignedArrayLength(out int length, ref byte* ptr, byte* end)
=> TryReadUnsignedLengthHeader(out length, ref ptr, end, isArray: true);
=> TryReadUnsignedLengthHeader(out length, ref ptr, end, expectedSigil: '*');

/// <summary>
/// Tries to read a RESP array length header from the given ASCII-encoded RESP string
Expand All @@ -498,11 +498,55 @@ public static bool TryReadUnsignedArrayLength(out int length, ref byte* ptr, byt
/// <param name="ptr">The starting position in the RESP string. Will be advanced if parsing is successful.</param>
/// <param name="end">The current end of the RESP string.</param>
/// <returns>True if a length header was successfully read.</returns>
/// <exception cref="RespParsingException">Thrown if the length header is negative.</exception>
/// <exception cref="RespParsingException">Thrown if unexpected token is read.</exception>
/// <exception cref="RespParsingException">Thrown if integer overflow occurs.</exception>
public static bool TryReadSignedArrayLength(out int length, ref byte* ptr, byte* end)
=> TryReadSignedLengthHeader(out length, ref ptr, end, isArray: true);
=> TryReadSignedLengthHeader(out length, ref ptr, end, expectedSigil: '*');

/// <summary>
/// Tries to read a RESP3 map length header from the given ASCII-encoded RESP string
/// and, if successful, moves the given ptr to the end of the length header.
/// <para />
/// NOTE: It will not throw an exception if length header is negative.
/// </summary>
/// <param name="length">If parsing was successful, contains the extracted length from the header.</param>
/// <param name="ptr">The starting position in the RESP string. Will be advanced if parsing is successful.</param>
/// <param name="end">The current end of the RESP string.</param>
/// <returns>True if a length header was successfully read.</returns>
/// <exception cref="RespParsingException">Thrown if unexpected token is read.</exception>
/// <exception cref="RespParsingException">Thrown if integer overflow occurs.</exception>
public static bool TryReadSignedMapLength(out int length, ref byte* ptr, byte* end)
=> TryReadSignedLengthHeader(out length, ref ptr, end, expectedSigil: '%');

/// <summary>
/// Tries to read a RESP3 set length header from the given ASCII-encoded RESP string
/// and, if successful, moves the given ptr to the end of the length header.
/// <para />
/// NOTE: It will not throw an exception if length header is negative.
/// </summary>
/// <param name="length">If parsing was successful, contains the extracted length from the header.</param>
/// <param name="ptr">The starting position in the RESP string. Will be advanced if parsing is successful.</param>
/// <param name="end">The current end of the RESP string.</param>
/// <returns>True if a length header was successfully read.</returns>
/// <exception cref="RespParsingException">Thrown if unexpected token is read.</exception>
/// <exception cref="RespParsingException">Thrown if integer overflow occurs.</exception>
public static bool TryReadSignedSetLength(out int length, ref byte* ptr, byte* end)
=> TryReadSignedLengthHeader(out length, ref ptr, end, expectedSigil: '~');

/// <summary>
/// Tries to read a RESP3 verbatim string length header from the given ASCII-encoded RESP string
/// and, if successful, moves the given ptr to the end of the length header.
/// <para />
/// NOTE: It will not throw an exception if length header is negative.
/// </summary>
/// <param name="length">If parsing was successful, contains the extracted length from the header.</param>
/// <param name="ptr">The starting position in the RESP string. Will be advanced if parsing is successful.</param>
/// <param name="end">The current end of the RESP string.</param>
/// <returns>True if a length header was successfully read.</returns>
/// <exception cref="RespParsingException">Thrown if unexpected token is read.</exception>
/// <exception cref="RespParsingException">Thrown if integer overflow occurs.</exception>
public static bool TryReadVerbatimStringLength(out int length, ref byte* ptr, byte* end)
=> TryReadSignedLengthHeader(out length, ref ptr, end, expectedSigil: '=');

/// <summary>
/// Reads a signed 32-bit integer with length header
Expand Down
Loading

0 comments on commit af4a7eb

Please sign in to comment.