Skip to content

Commit

Permalink
Merged latest
Browse files Browse the repository at this point in the history
  • Loading branch information
Vijay-Nirmal committed Mar 9, 2025
2 parents 23d4ca3 + a01bb51 commit 9f437c3
Show file tree
Hide file tree
Showing 241 changed files with 9,881 additions and 4,352 deletions.
3 changes: 3 additions & 0 deletions .editorconfig
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ dotnet_diagnostic.CA1866.severity = warning
dotnet_diagnostic.CA1867.severity = warning

# Usage rules
# Treat unnecessary usings as warnings
dotnet_diagnostic.IDE0005.severity = warning

# CA2254: Template should be a static expression
dotnet_diagnostic.CA2254.severity = warning

Expand Down
2 changes: 1 addition & 1 deletion Version.props
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
<Project>
<!-- VersionPrefix property for builds and packages -->
<PropertyGroup>
<VersionPrefix>1.0.57</VersionPrefix>
<VersionPrefix>1.0.59</VersionPrefix>
</PropertyGroup>
</Project>
2 changes: 0 additions & 2 deletions benchmark/BDN.benchmark/Embedded/EmbeddedNetworkHandler.cs
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

using System;
using System.Diagnostics;
using System.Threading.Tasks;
using Garnet.common;
using Garnet.networking;
using Microsoft.Extensions.Logging;
Expand Down
2 changes: 0 additions & 2 deletions benchmark/BDN.benchmark/Embedded/EmbeddedRespServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,8 @@
// Licensed under the MIT license.

using Garnet;
using Garnet.common;
using Garnet.server;
using Microsoft.Extensions.Logging;
using Tsavorite.core;

namespace Embedded.server
{
Expand Down
2 changes: 0 additions & 2 deletions benchmark/BDN.benchmark/Embedded/GarnetServerEmbedded.cs
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

using System;
using System.Net;
using System.Net.Security;
using System.Threading;
using Garnet.common;
using Garnet.networking;
using Garnet.server;
Expand Down
2 changes: 1 addition & 1 deletion benchmark/BDN.benchmark/Lua/LuaParams.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public LuaParams(LuaMemoryManagementMode mode, bool memoryLimit, TimeSpan? timeo
/// Get the equivalent <see cref="LuaOptions"/>.
/// </summary>
public LuaOptions CreateOptions()
=> new(Mode, MemoryLimit ? "2m" : "", Timeout ?? System.Threading.Timeout.InfiniteTimeSpan);
=> new(Mode, MemoryLimit ? "2m" : "", Timeout ?? System.Threading.Timeout.InfiniteTimeSpan, LuaLoggingMode.Enable);

/// <summary>
/// String representation
Expand Down
4 changes: 2 additions & 2 deletions benchmark/BDN.benchmark/Operations/OperationsBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,14 @@ public virtual void GlobalSetup()
QuietMode = true,
EnableLua = true,
DisablePubSub = true,
LuaOptions = new(LuaMemoryManagementMode.Native, "", Timeout.InfiniteTimeSpan),
LuaOptions = new(LuaMemoryManagementMode.Native, "", Timeout.InfiniteTimeSpan, LuaLoggingMode.Enable),
};

if (Params.useAof)
{
opts.EnableAOF = true;
opts.UseAofNullDevice = true;
opts.MainMemoryReplication = true;
opts.FastAofTruncate = true;
opts.CommitFrequencyMs = -1;
opts.AofPageSize = "128m";
opts.AofMemorySize = "256m";
Expand Down
1 change: 0 additions & 1 deletion benchmark/Resp.benchmark/RespOnlineBench.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
// Licensed under the MIT license.

using System;
using System.Buffers;
using System.Diagnostics;
using System.Linq;
using System.Net;
Expand Down
1 change: 0 additions & 1 deletion charts/garnet/templates/statefulset.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ spec:
{{- toYaml .Values.securityContext | nindent 12 }}
image: "{{ .Values.image.registry }}/{{ .Values.image.repository }}:{{ .Values.image.tag | default .Chart.AppVersion }}"
imagePullPolicy: {{ .Values.image.pullPolicy }}
command: ["./GarnetServer"]
{{- if or .Values.containers.args .Values.config.garnetConf .Values.config.existingSecret }}
args:
{{- range .Values.containers.args }}
Expand Down
13 changes: 10 additions & 3 deletions libs/client/ClientSession/GarnetClientSessionIncremental.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,15 @@

namespace Garnet.client
{
enum IncrementalSendType : byte
{
MIGRATE,
SYNC
}

public sealed unsafe partial class GarnetClientSession : IServerHook, IMessageConsumer
{
IncrementalSendType ist;
bool isMainStore;
byte* curr, head;
int keyValuePairCount;
Expand Down Expand Up @@ -183,9 +190,9 @@ private void TrackIterationProgress(int keyCount, int size, bool completed = fal
var duration = TimeSpan.FromTicks(Stopwatch.GetTimestamp() - lastLog);
if (completed || lastLog == 0 || duration >= iterationProgressFreq)
{
logger?.LogTrace("[{op}]: isMainStore:({storeType}) totalKeyCount:({totalKeyCount}), totalPayloadSize:({totalPayloadSize} KB)",
completed ? "COMPLETED" : "MIGRATING",
isMainStore,
logger?.LogTrace("[{op}]: store:({storeType}) totalKeyCount:({totalKeyCount}), totalPayloadSize:({totalPayloadSize} KB)",
completed ? "COMPLETED" : ist,
isMainStore ? "MAIN STORE" : "OBJECT STORE",
totalKeyCount.ToString("N0"),
((long)((double)totalPayloadSize / 1024)).ToString("N0"));
lastLog = Stopwatch.GetTimestamp();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ public void SetClusterMigrateHeader(string sourceNodeId, bool replace, bool isMa
tcsQueue.Enqueue(currTcsIterationTask);
curr = offset;
this.isMainStore = isMainStore;
this.ist = IncrementalSendType.MIGRATE;
var storeType = isMainStore ? MAIN_STORE : OBJECT_STORE;
var replaceOption = replace ? T : F;

Expand Down
115 changes: 115 additions & 0 deletions libs/client/ClientSession/GarnetClientSessionReplicationExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ public sealed unsafe partial class GarnetClientSession : IServerHook, IMessageCo
static ReadOnlySpan<byte> send_ckpt_metadata => "SEND_CKPT_METADATA"u8;
static ReadOnlySpan<byte> send_ckpt_file_segment => "SEND_CKPT_FILE_SEGMENT"u8;
static ReadOnlySpan<byte> begin_replica_recover => "BEGIN_REPLICA_RECOVER"u8;
static ReadOnlySpan<byte> attach_sync => "ATTACH_SYNC"u8;
static ReadOnlySpan<byte> sync => "SYNC"u8;

/// <summary>
/// Initiate checkpoint retrieval from replica by sending replica checkpoint information and AOF address range
Expand Down Expand Up @@ -352,5 +354,118 @@ public Task<string> ExecuteBeginReplicaRecover(bool sendStoreCheckpoint, bool se
Interlocked.Increment(ref numCommands);
return tcs.Task;
}

/// <summary>
/// Initiate attach from replica
/// </summary>
/// <param name="syncMetadata"></param>
/// <returns></returns>
public Task<string> ExecuteAttachSync(byte[] syncMetadata)
{
var tcs = new TaskCompletionSource<string>(TaskCreationOptions.RunContinuationsAsynchronously);
tcsQueue.Enqueue(tcs);
byte* curr = offset;
int arraySize = 3;

while (!RespWriteUtils.TryWriteArrayLength(arraySize, ref curr, end))
{
Flush();
curr = offset;
}
offset = curr;

//1
while (!RespWriteUtils.TryWriteDirect(CLUSTER, ref curr, end))
{
Flush();
curr = offset;
}
offset = curr;

//2
while (!RespWriteUtils.TryWriteBulkString(attach_sync, ref curr, end))
{
Flush();
curr = offset;
}
offset = curr;

//3
while (!RespWriteUtils.TryWriteBulkString(syncMetadata, ref curr, end))
{
Flush();
curr = offset;
}
offset = curr;

Flush();
Interlocked.Increment(ref numCommands);
return tcs.Task;
}

/// <summary>
/// Set CLUSTER SYNC header info
/// </summary>
/// <param name="sourceNodeId"></param>
/// <param name="isMainStore"></param>
public void SetClusterSyncHeader(string sourceNodeId, bool isMainStore)
{
currTcsIterationTask = new TaskCompletionSource<string>(TaskCreationOptions.RunContinuationsAsynchronously);
tcsQueue.Enqueue(currTcsIterationTask);
curr = offset;
this.isMainStore = isMainStore;
this.ist = IncrementalSendType.SYNC;
var storeType = isMainStore ? MAIN_STORE : OBJECT_STORE;

var arraySize = 5;
while (!RespWriteUtils.TryWriteArrayLength(arraySize, ref curr, end))
{
Flush();
curr = offset;
}
offset = curr;

// 1
while (!RespWriteUtils.TryWriteDirect(CLUSTER, ref curr, end))
{
Flush();
curr = offset;
}
offset = curr;

// 2
while (!RespWriteUtils.TryWriteBulkString(sync, ref curr, end))
{
Flush();
curr = offset;
}
offset = curr;

// 3
while (!RespWriteUtils.TryWriteAsciiBulkString(sourceNodeId, ref curr, end))
{
Flush();
curr = offset;
}
offset = curr;

// 4
while (!RespWriteUtils.TryWriteBulkString(storeType, ref curr, end))
{
Flush();
curr = offset;
}
offset = curr;

// 5
// Reserve space for the bulk string header + final newline
while (ExtraSpace + 2 > (int)(end - curr))
{
Flush();
curr = offset;
}
head = curr;
curr += ExtraSpace;
}
}
}
60 changes: 34 additions & 26 deletions libs/cluster/Server/ClusterConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
using System.Diagnostics;
using Microsoft.Extensions.Logging;
using System.Net;
using System.Text;

namespace Garnet.cluster
{
Expand Down Expand Up @@ -489,14 +490,14 @@ public IPEndPoint GetEndpointFromNodeId(string nodeid)
/// <returns>Formatted string.</returns>
public string GetClusterInfo(ClusterProvider clusterProvider)
{
var nodes = "";
var nodesStringBuilder = new StringBuilder();
for (ushort i = 1; i <= NumWorkers; i++)
{
var info = default(ConnectionInfo);
_ = clusterProvider?.clusterManager?.GetConnectionInfo(workers[i].Nodeid, out info);
nodes += GetNodeInfo(i, info);
GetNodeInfo(i, info, nodesStringBuilder);
}
return nodes;
return nodesStringBuilder.ToString();
}

/// <summary>
Expand All @@ -516,24 +517,33 @@ public string GetNodeInfo(ushort workerId, ConnectionInfo info)
//<config-epoch>
//<link-state>
//<slot> <slot> ... <slot>
var nodeInfoStringBuilder = new StringBuilder();
GetNodeInfo(workerId, info, nodeInfoStringBuilder);
return nodeInfoStringBuilder.ToString();
}

return $"{workers[workerId].Nodeid} " +
$"{workers[workerId].Address}:{workers[workerId].Port}@{workers[workerId].Port + 10000},{workers[workerId].hostname} " +
$"{(workerId == 1 ? "myself," : "")}{(workers[workerId].Role == NodeRole.PRIMARY ? "master" : "slave")} " +
$"{(workers[workerId].Role == NodeRole.REPLICA ? workers[workerId].ReplicaOfNodeId : "-")} " +
$"{info.ping} " +
$"{info.pong} " +
$"{workers[workerId].ConfigEpoch} " +
$"{(info.connected || workerId == 1 ? "connected" : "disconnected")}" +
$"{GetSlotRange(workerId)}" +
$"{GetSpecialStates(workerId)}\n";
private void GetNodeInfo(ushort workerId, ConnectionInfo info, StringBuilder nodeInfoStringBuilder)
{
_ = nodeInfoStringBuilder
.Append(workers[workerId].Nodeid).Append(' ')
.Append(workers[workerId].Address).Append(':').Append(workers[workerId].Port)
.Append('@').Append(workers[workerId].Port + 10000).Append(',').Append(workers[workerId].hostname).Append(' ')
.Append(workerId == 1 ? "myself," : "")
.Append(workers[workerId].Role == NodeRole.PRIMARY ? "master" : "slave").Append(' ')
.Append(workers[workerId].Role == NodeRole.REPLICA ? workers[workerId].ReplicaOfNodeId : '-').Append(' ')
.Append(info.ping).Append(' ')
.Append(info.pong).Append(' ')
.Append(workers[workerId].ConfigEpoch).Append(' ')
.Append(info.connected || workerId == 1 ? "connected" : "disconnected");
AppendSlotRange(nodeInfoStringBuilder, workerId);
AppendSpecialStates(nodeInfoStringBuilder, workerId);
_ = nodeInfoStringBuilder.Append('\n');
}

private string GetSpecialStates(ushort workerId)
private void AppendSpecialStates(StringBuilder stringBuilder, uint workerId)
{
// Only print special states for local node
if (workerId != 1) return "";
var specialStates = "";
if (workerId != 1) return;
for (var slot = 0; slot < slotMap.Length; slot++)
{
var _workerId = slotMap[slot]._workerId;
Expand All @@ -545,14 +555,14 @@ private string GetSpecialStates(ushort workerId)
var _nodeId = workers[_workerId].Nodeid;
if (_nodeId == null) continue;

specialStates += _state switch
stringBuilder.Append(_state switch
{
SlotState.MIGRATING => $" [{slot}->-{_nodeId}]",
SlotState.IMPORTING => $" [{slot}-<-{_nodeId}]",
_ => ""
};
});
}
return specialStates;
return;
}

/// <summary>
Expand Down Expand Up @@ -721,9 +731,8 @@ public string GetSlotsInfo()
return completeSlotInfo;
}

private string GetSlotRange(ushort workerId)
private void AppendSlotRange(StringBuilder stringBuilder, uint workerId)
{
string result = "";
ushort start = ushort.MaxValue, end = 0;
for (ushort i = 0; i < MAX_HASH_SLOT_VALUE; i++)
{
Expand All @@ -736,19 +745,18 @@ private string GetSlotRange(ushort workerId)
{
if (start != ushort.MaxValue)
{
if (end == start) result += $" {start}";
else result += $" {start}-{end}";
if (end == start) stringBuilder.Append($" {start}");
else stringBuilder.Append($" {start}-{end}");
start = ushort.MaxValue;
end = 0;
}
}
}
if (start != ushort.MaxValue)
{
if (end == start) result += $" {start}";
else result += $" {start}-{end}";
if (end == start) stringBuilder.Append($" {start}");
else stringBuilder.Append($" {start}-{end}");
}
return result;
}

/// <summary>
Expand Down
Loading

0 comments on commit 9f437c3

Please sign in to comment.