From 8bfb8c6e5bc78067b54edf7c7e2ebe5de995e9c0 Mon Sep 17 00:00:00 2001 From: Vasileios Zois <96085550+vazois@users.noreply.github.com> Date: Thu, 30 Jan 2025 16:55:40 -0800 Subject: [PATCH 1/2] Implement CLUSTER PUBSUB (#866) * skip Interlocked.Exchange and FlushConfig when no update detected on merge * special case of zero epoch necessitates tracking of explicit slot update * move key spec extraction to clusterProvider * implement CLUSTER PUBLISH api * implement SSUBSCRIBE api * implement SPUBLISH api * implement separate code path for CLUSTER SPUBLISH * expose max number of outstanding pubsub forwarding tasks parameter * add API to forward published messages using GarnetClient * add methods to extract nodeIds of all nodes in cluster and all nodes in a shard * hook up published message forwarding * PUBLISH and SPUBLISH benchmark addition * fix check for getting most recent config instance * optimization 1: calculate candidates for published message forwarding only when config gets updated * optimization2: ignore response from cluster publish * add publish BDN * minimize task creation * load no response task * change to pinned array * simplify config update for publish forward * wip; wait for response vs fire and forget * add protected connection initialization for gossip-publish * remove BDN because not useful * remove unused NoResponse flag * remove option for tasks * ensure noResponse for slot verification of cluster commands * add SPUBLISH and PUBLISH docs * correct typo * addressing comments v1 * addressing comments v2 * rename new lock interface to indicate clear semantics --------- Co-authored-by: Tal Zaccai --- .../BDN.benchmark/Cluster/ClusterContext.cs | 2 +- benchmark/Resp.benchmark/OpType.cs | 1 + benchmark/Resp.benchmark/Program.cs | 2 +- benchmark/Resp.benchmark/ReqGen.cs | 2 + benchmark/Resp.benchmark/ReqGenLoadBuffers.cs | 4 + benchmark/Resp.benchmark/ReqGenUtils.cs | 4 + benchmark/Resp.benchmark/RespOnlineBench.cs | 21 +- libs/client/GarnetClient.cs | 180 ++++++++---------- .../GarnetClientClusterCommands.cs | 10 + .../GarnetClientAPI/GarnetClientExecuteAPI.cs | 30 ++- libs/client/GarnetClientProcessReplies.cs | 2 +- libs/client/NetworkWriter.cs | 10 +- libs/cluster/Server/ClusterConfig.cs | 26 +++ libs/cluster/Server/ClusterProvider.cs | 70 +++++++ .../Server/Failover/ReplicaFailoverSession.cs | 27 +-- libs/cluster/Server/GarnetClientExtensions.cs | 7 + .../Server/GarnetClusterConnectionStore.cs | 55 ++++++ libs/cluster/Server/GarnetServerNode.cs | 63 +++++- libs/cluster/Server/Gossip.cs | 64 ++++++- libs/cluster/Session/ClusterCommands.cs | 2 +- libs/cluster/Session/ClusterSession.cs | 9 + .../Session/RespClusterBasicCommands.cs | 30 ++- .../SlotVerification/RespClusterSlotVerify.cs | 27 +++ libs/common/SingleWriterMultiReaderLock.cs | 5 +- libs/host/GarnetServer.cs | 2 +- libs/resources/RespCommandsDocs.json | 37 ++++ libs/resources/RespCommandsInfo.json | 87 +++++++++ libs/server/AOF/AofProcessor.cs | 1 + libs/server/Cluster/IClusterProvider.cs | 15 ++ libs/server/Cluster/IClusterSession.cs | 10 + libs/server/PubSub/SubscribeBroker.cs | 15 +- libs/server/Resp/CmdStrings.cs | 5 + libs/server/Resp/Parser/RespCommand.cs | 24 ++- libs/server/Resp/PubSubCommands.cs | 55 +++++- libs/server/Resp/RespServerSession.cs | 6 +- .../Resp/RespServerSessionSlotVerify.cs | 64 +------ libs/server/StoreWrapper.cs | 8 + libs/server/Transaction/TxnKeyManager.cs | 1 + .../GarnetCommandsDocs.json | 23 +++ .../GarnetCommandsInfo.json | 37 ++++ .../CommandInfoUpdater/SupportedCommand.cs | 6 +- test/Garnet.test/Resp/ACL/RespCommandTests.cs | 117 +++++++++++- 42 files changed, 923 insertions(+), 243 deletions(-) diff --git a/benchmark/BDN.benchmark/Cluster/ClusterContext.cs b/benchmark/BDN.benchmark/Cluster/ClusterContext.cs index 631a7dda90..5805a185de 100644 --- a/benchmark/BDN.benchmark/Cluster/ClusterContext.cs +++ b/benchmark/BDN.benchmark/Cluster/ClusterContext.cs @@ -68,7 +68,7 @@ public void CreateGetSet(int keySize = 8, int valueSize = 32, int batchSize = 10 benchUtils.RandomBytes(ref pairs[i].Item2); } - var setByteCount = batchSize * ("*2\r\n$3\r\nSET\r\n"u8.Length + 1 + NumUtils.CountDigits(keySize) + 2 + keySize + 2 + 1 + NumUtils.CountDigits(valueSize) + 2 + valueSize + 2); + var setByteCount = batchSize * ("*3\r\n$3\r\nSET\r\n"u8.Length + 1 + NumUtils.CountDigits(keySize) + 2 + keySize + 2 + 1 + NumUtils.CountDigits(valueSize) + 2 + valueSize + 2); var setReq = new Request(setByteCount); var curr = setReq.ptr; var end = curr + setReq.buffer.Length; diff --git a/benchmark/Resp.benchmark/OpType.cs b/benchmark/Resp.benchmark/OpType.cs index 4b4c3e4db5..b4d7f2fbd8 100644 --- a/benchmark/Resp.benchmark/OpType.cs +++ b/benchmark/Resp.benchmark/OpType.cs @@ -12,6 +12,7 @@ public enum OpType DBSIZE, READ_TXN, WRITE_TXN, READWRITETX, WATCH_TXN, SAMPLEUPDATETX, SAMPLEDELETETX, SCRIPTSET, SCRIPTGET, SCRIPTRETKEY, + PUBLISH, SPUBLISH, READONLY = 8888, AUTH = 9999, } diff --git a/benchmark/Resp.benchmark/Program.cs b/benchmark/Resp.benchmark/Program.cs index cb3a151e8d..42a41f8de2 100644 --- a/benchmark/Resp.benchmark/Program.cs +++ b/benchmark/Resp.benchmark/Program.cs @@ -219,7 +219,7 @@ static void RunBasicCommandsBenchmark(Options opts) int keyLen = opts.KeyLength; int valueLen = opts.ValueLength; - if (opts.Op == OpType.ZADD || opts.Op == OpType.ZREM || opts.Op == OpType.ZADDREM || opts.Op == OpType.PING || opts.Op == OpType.GEOADD || opts.Op == OpType.GEOADDREM || opts.Op == OpType.SETEX || opts.Op == OpType.ZCARD || opts.Op == OpType.ZADDCARD) + if (opts.Op == OpType.PUBLISH || opts.Op == OpType.SPUBLISH || opts.Op == OpType.ZADD || opts.Op == OpType.ZREM || opts.Op == OpType.ZADDREM || opts.Op == OpType.PING || opts.Op == OpType.GEOADD || opts.Op == OpType.GEOADDREM || opts.Op == OpType.SETEX || opts.Op == OpType.ZCARD || opts.Op == OpType.ZADDCARD) opts.SkipLoad = true; //if we have scripts ops we need to load them in memory diff --git a/benchmark/Resp.benchmark/ReqGen.cs b/benchmark/Resp.benchmark/ReqGen.cs index e7995d69c3..89cf9fab47 100644 --- a/benchmark/Resp.benchmark/ReqGen.cs +++ b/benchmark/Resp.benchmark/ReqGen.cs @@ -204,6 +204,8 @@ public static (int, int) OnResponse(byte* buf, int bytesRead, int opType) case OpType.DEL: case OpType.INCR: case OpType.DBSIZE: + case OpType.PUBLISH: + case OpType.SPUBLISH: for (int i = 0; i < bytesRead; i++) if (buf[i] == ':') count++; break; diff --git a/benchmark/Resp.benchmark/ReqGenLoadBuffers.cs b/benchmark/Resp.benchmark/ReqGenLoadBuffers.cs index fa34703eba..fd452599df 100644 --- a/benchmark/Resp.benchmark/ReqGenLoadBuffers.cs +++ b/benchmark/Resp.benchmark/ReqGenLoadBuffers.cs @@ -133,6 +133,8 @@ private bool GenerateBatch(int i, int start, int end, OpType opType) OpType.SCRIPTSET => System.Text.Encoding.ASCII.GetBytes($"*5\r\n$7\r\nEVALSHA\r\n{BenchUtils.sha1SetScript}\r\n$1\r\n1\r\n"), OpType.SCRIPTGET => System.Text.Encoding.ASCII.GetBytes($"*4\r\n$7\r\nEVALSHA\r\n{BenchUtils.sha1GetScript}\r\n$1\r\n1\r\n"), OpType.SCRIPTRETKEY => System.Text.Encoding.ASCII.GetBytes($"*4\r\n$7\r\nEVALSHA\r\n{BenchUtils.sha1RetKeyScript}\r\n$1\r\n1\r\n"), + OpType.PUBLISH => System.Text.Encoding.ASCII.GetBytes($"*3\r\n$7\r\nPUBLISH\r\n"), + OpType.SPUBLISH => System.Text.Encoding.ASCII.GetBytes($"*3\r\n$8\r\nSPUBLISH\r\n"), _ => null }; @@ -174,6 +176,8 @@ private bool GenerateBatch(int i, int start, int end, OpType opType) case OpType.SCRIPTSET: case OpType.SCRIPTGET: case OpType.SCRIPTRETKEY: + case OpType.PUBLISH: + case OpType.SPUBLISH: writeSuccess = GenerateSingleKeyValueOp(i, opHeader, start, end, opType); return writeSuccess; default: diff --git a/benchmark/Resp.benchmark/ReqGenUtils.cs b/benchmark/Resp.benchmark/ReqGenUtils.cs index 0095b15c96..be7f601c40 100644 --- a/benchmark/Resp.benchmark/ReqGenUtils.cs +++ b/benchmark/Resp.benchmark/ReqGenUtils.cs @@ -94,6 +94,8 @@ private bool WriteOp(ref byte* curr, byte* vend, OpType opType) case OpType.SCRIPTSET: case OpType.SCRIPTGET: case OpType.SCRIPTRETKEY: + case OpType.PUBLISH: + case OpType.SPUBLISH: if (!WriteKey(ref curr, vend, out keyData)) return false; break; @@ -189,6 +191,8 @@ private bool WriteOp(ref byte* curr, byte* vend, OpType opType) case OpType.MPFADD: case OpType.SET: case OpType.SCRIPTSET: + case OpType.PUBLISH: + case OpType.SPUBLISH: RandomString(); if (!WriteStringBytes(ref curr, vend, valueBuffer)) return false; diff --git a/benchmark/Resp.benchmark/RespOnlineBench.cs b/benchmark/Resp.benchmark/RespOnlineBench.cs index 15102a458c..ddb61f0b02 100644 --- a/benchmark/Resp.benchmark/RespOnlineBench.cs +++ b/benchmark/Resp.benchmark/RespOnlineBench.cs @@ -595,7 +595,6 @@ public async void OpRunnerGarnetClientSession(int thread_id) var op = SelectOpType(rand); var startTimestamp = Stopwatch.GetTimestamp(); var c = opts.Pool ? await gcsPool.GetAsync() : client; - _ = op switch { OpType.PING => await c.ExecuteAsync(["PING"]), @@ -605,6 +604,8 @@ public async void OpRunnerGarnetClientSession(int thread_id) OpType.DEL => await c.ExecuteAsync(["DEL", req.GenerateKey()]), OpType.SETBIT => await c.ExecuteAsync(["SETBIT", req.GenerateKey(), req.GenerateBitOffset()]), OpType.GETBIT => await c.ExecuteAsync(["GETBIT", req.GenerateKey(), req.GenerateBitOffset()]), + OpType.PUBLISH => await c.ExecuteAsync(["PUBLISH", req.GenerateKey(), req.GenerateValue()]), + OpType.SPUBLISH => await c.ExecuteAsync(["SPUBLISH", req.GenerateKey(), req.GenerateValue()]), OpType.ZADD => await ZADD(), OpType.ZREM => await ZREM(), OpType.ZCARD => await ZCARD(), @@ -717,6 +718,12 @@ public async void OpRunnerGarnetClientSessionParallel(int thread_id, int paralle case OpType.GETBIT: c.ExecuteBatch(["GETBIT", req.GenerateKey(), req.GenerateBitOffset()]); break; + case OpType.PUBLISH: + c.ExecuteBatch(["PUBLISH", req.GenerateKey(), req.GenerateValue()]); + break; + case OpType.SPUBLISH: + c.ExecuteBatch(["SPUBLISH", req.GenerateKey(), req.GenerateValue()]); + break; default: throw new Exception($"opType: {op} benchmark not supported with {opts.Client} ClientType!"); @@ -1046,6 +1053,12 @@ public async void OpRunnerSERedis(int thread_id) case OpType.DEL: await db.KeyDeleteAsync(req.GenerateKey()); break; + case OpType.PUBLISH: + await db.PublishAsync(RedisChannel.Literal(req.GenerateKey()), req.GenerateValue()); + break; + case OpType.SPUBLISH: + await db.ExecuteAsync("SPUBLISH", req.GenerateKey(), req.GenerateValue()); + break; case OpType.ZADD: { var key = req.GenerateKey(); @@ -1121,6 +1134,12 @@ public async void OpRunnerSERedisParallel(int thread_id, int parallel) case OpType.SET: tasks[offset++] = db.StringSetAsync(req.GenerateKey(), req.GenerateValue()); break; + case OpType.PUBLISH: + tasks[offset++] = db.PublishAsync(RedisChannel.Literal(req.GenerateKey()), req.GenerateValue()); + break; + case OpType.SPUBLISH: + tasks[offset++] = db.ExecuteAsync("SPUBLISH", req.GenerateKey(), req.GenerateValue()); + break; case OpType.SETEX: tasks[offset++] = db.StringSetAsync(req.GenerateKey(), req.GenerateValue(), TimeSpan.FromSeconds(opts.Ttl)); break; diff --git a/libs/client/GarnetClient.cs b/libs/client/GarnetClient.cs index 6ee24e0406..ba2089b2c9 100644 --- a/libs/client/GarnetClient.cs +++ b/libs/client/GarnetClient.cs @@ -50,7 +50,6 @@ public sealed partial class GarnetClient : IServerHook, IMessageConsumer, IDispo readonly int bufferSize; readonly int maxOutstandingTasks; NetworkWriter networkWriter; - INetworkSender networkSender; readonly TcsWrapper[] tcsArray; readonly SslClientAuthenticationOptions sslOptions; @@ -163,7 +162,7 @@ public GarnetClient( this.maxOutstandingTasks = maxOutstandingTasks; this.sslOptions = tlsOptions; this.disposed = 0; - this.tcsArray = new TcsWrapper[maxOutstandingTasks]; + this.tcsArray = GC.AllocateArray(maxOutstandingTasks, pinned: true); this.memoryPool = memoryPool ?? MemoryPool.Shared; this.logger = logger; this.latency = recordLatency ? new LongHistogram(1, TimeStamp.Seconds(100), 2) : null; @@ -191,7 +190,6 @@ public void Connect(CancellationToken token = default) socket = CreateSendSocket(timeoutMilliseconds); networkWriter = new NetworkWriter(this, socket, bufferSize, sslOptions, out networkHandler, sendPageSize, networkSendThrottleMax, logger); networkHandler.StartAsync(sslOptions, $"{address}:{port}", token).ConfigureAwait(false).GetAwaiter().GetResult(); - networkSender = networkHandler.GetNetworkSender(); if (timeoutMilliseconds > 0) { @@ -224,7 +222,6 @@ public async Task ConnectAsync(CancellationToken token = default) socket = CreateSendSocket(timeoutMilliseconds); networkWriter = new NetworkWriter(this, socket, bufferSize, sslOptions, out networkHandler, sendPageSize, networkSendThrottleMax, logger); await networkHandler.StartAsync(sslOptions, $"{address}:{port}", token).ConfigureAwait(false); - networkSender = networkHandler.GetNetworkSender(); if (timeoutMilliseconds > 0) { @@ -507,13 +504,15 @@ async ValueTask AwaitPreviousTaskAsync(int taskId) case TaskType.MemoryByteArrayAsync: if (oldTcs.memoryByteArrayTcs != null) await oldTcs.memoryByteArrayTcs.Task.ConfigureAwait(false); break; + case TaskType.LongAsync: + if (oldTcs.longTcs != null) await oldTcs.longTcs.Task.ConfigureAwait(false); + break; } } catch { if (Disposed) ThrowException(disposeException); } - await Task.Yield(); oldTcs = tcsArray[shortTaskId]; } } @@ -631,44 +630,30 @@ async ValueTask InternalExecuteAsync(TcsWrapper tcs, Memory op, string par } } - async ValueTask InternalExecuteAsync(Memory op, Memory clusterOp, string nodeId, long currentAddress, long nextAddress, long payloadPtr, int payloadLength, CancellationToken token = default) + async ValueTask InternalExecuteAsync(TcsWrapper tcs, Memory op, Memory param1, Memory param2, CancellationToken token = default) { - Debug.Assert(nodeId != null); - + tcs.timestamp = GetTimestamp(); int totalLen = 0; int arraySize = 1; totalLen += op.Length; - int len = clusterOp.Length; - totalLen += 1 + NumUtils.CountDigits(len) + 2 + len + 2; - arraySize++; - - len = Encoding.UTF8.GetByteCount(nodeId); - totalLen += 1 + NumUtils.CountDigits(len) + 2 + len + 2; - arraySize++; - - len = NumUtils.CountDigits(currentAddress); - totalLen += 1 + NumUtils.CountDigits(len) + 2 + len + 2; - arraySize++; - - len = NumUtils.CountDigits(nextAddress); - totalLen += 1 + NumUtils.CountDigits(len) + 2 + len + 2; - arraySize++; - - len = payloadLength; - totalLen += 1 + NumUtils.CountDigits(len) + 2 + len + 2; - arraySize++; - - totalLen += 1 + NumUtils.CountDigits(arraySize) + 2; - - if (totalLen > networkWriter.PageSize) + if (!param1.IsEmpty) { - ThrowException(new Exception($"Entry of size {totalLen} does not fit on page of size {networkWriter.PageSize}. Try increasing sendPageSize parameter to GarnetClient constructor.")); + int len = param1.Length; + totalLen += 1 + NumUtils.CountDigits(len) + 2 + len + 2; + arraySize++; + } + if (!param2.IsEmpty) + { + int len = param2.Length; + totalLen += 1 + NumUtils.CountDigits(len) + 2 + len + 2; + arraySize++; } - // No need for gate as this is a void return - // await InputGateAsync(token); + totalLen += 1 + NumUtils.CountDigits(arraySize) + 2; + CheckLength(totalLen, tcs); + await InputGateAsync(token); try { @@ -698,6 +683,9 @@ async ValueTask InternalExecuteAsync(Memory op, Memory clusterOp, st } } + // Console.WriteLine($"Allocated {taskId} @ {address}"); + tcs.nextTaskId = taskId; + unsafe { byte* curr = (byte*)networkWriter.GetPhysicalAddress(address); @@ -705,24 +693,49 @@ async ValueTask InternalExecuteAsync(Memory op, Memory clusterOp, st RespWriteUtils.TryWriteArrayLength(arraySize, ref curr, end); RespWriteUtils.TryWriteDirect(op.Span, ref curr, end); - RespWriteUtils.TryWriteBulkString(clusterOp.Span, ref curr, end); - RespWriteUtils.TryWriteUtf8BulkString(nodeId, ref curr, end); - RespWriteUtils.TryWriteArrayItem(currentAddress, ref curr, end); - RespWriteUtils.TryWriteArrayItem(nextAddress, ref curr, end); - RespWriteUtils.TryWriteBulkString(new Span((void*)payloadPtr, payloadLength), ref curr, end); + if (!param1.IsEmpty) + RespWriteUtils.TryWriteBulkString(param1.Span, ref curr, end); + if (!param2.IsEmpty) + RespWriteUtils.TryWriteBulkString(param2.Span, ref curr, end); Debug.Assert(curr == end); } #endregion - if (!IsConnected) + #region waitForEmptySlot + int shortTaskId = taskId & (maxOutstandingTasks - 1); + var oldTcs = tcsArray[shortTaskId]; + //1. if taskType != None, we are waiting for previous task to finish + //2. if taskType == None and my taskId is not the next in line wait for previous task to acquire slot + if (oldTcs.taskType != TaskType.None || !oldTcs.IsNext(taskId)) { - Dispose(); + // Console.WriteLine($"Before filling slot {taskId & (maxOutstandingTasks - 1)} for task {taskId} @ {address} : {tcs.taskType}"); + networkWriter.epoch.ProtectAndDrain(); + networkWriter.DoAggressiveShiftReadOnly(); + try + { + networkWriter.epoch.Suspend(); + await AwaitPreviousTaskAsync(taskId); // does not take token, as task is not cancelable at this point + } + finally + { + networkWriter.epoch.Resume(); + } + } + #endregion + + #region scheduleAwaitForResponse + // Console.WriteLine($"Filled slot {taskId & (maxOutstandingTasks - 1)} for task {taskId} @ {address} : {tcs.taskType}"); + tcsArray[shortTaskId].LoadFrom(tcs); + if (Disposed) + { + DisposeOffset(shortTaskId); ThrowException(disposeException); } // Console.WriteLine($"Filled {address}-{address + totalLen}"); networkWriter.epoch.ProtectAndDrain(); networkWriter.DoAggressiveShiftReadOnly(); + #endregion } finally { @@ -731,31 +744,33 @@ async ValueTask InternalExecuteAsync(Memory op, Memory clusterOp, st return; } - async ValueTask InternalExecuteAsync(TcsWrapper tcs, Memory op, Memory param1, Memory param2, CancellationToken token = default) + void InternalExecuteNoResponse(ref Memory op, ref ReadOnlySpan subop, ref Span param1, ref Span param2, CancellationToken token = default) { - tcs.timestamp = GetTimestamp(); - int totalLen = 0; - int arraySize = 1; + var totalLen = 0; + var arraySize = 4; + totalLen += 1 + NumUtils.CountDigits(arraySize) + 2; + // op (NOTE: true length because op already resp formatted) totalLen += op.Length; - if (!param1.IsEmpty) - { - int len = param1.Length; - totalLen += 1 + NumUtils.CountDigits(len) + 2 + len + 2; - arraySize++; - } - if (!param2.IsEmpty) + // subop + var len = subop.Length; + totalLen += 1 + NumUtils.CountDigits(len) + 2 + len + 2; + + // param1 + len = param1.Length; + totalLen += 1 + NumUtils.CountDigits(len) + 2 + len + 2; + + // param2 + len = param2.Length; + totalLen += 1 + NumUtils.CountDigits(len) + 2 + len + 2; + + if (totalLen > networkWriter.PageSize) { - int len = param2.Length; - totalLen += 1 + NumUtils.CountDigits(len) + 2 + len + 2; - arraySize++; + var e = new Exception($"Entry of size {totalLen} does not fit on page of size {networkWriter.PageSize}. Try increasing sendPageSize parameter to GarnetClient constructor."); + ThrowException(e); } - totalLen += 1 + NumUtils.CountDigits(arraySize) + 2; - CheckLength(totalLen, tcs); - await InputGateAsync(token); - try { networkWriter.epoch.Resume(); @@ -771,12 +786,12 @@ async ValueTask InternalExecuteAsync(TcsWrapper tcs, Memory op, Memory= 0) break; try { networkWriter.epoch.Suspend(); - await flushEvent.WaitAsync(token).ConfigureAwait(false); + flushEvent.Wait(token); } finally { @@ -784,53 +799,24 @@ async ValueTask InternalExecuteAsync(TcsWrapper tcs, Memory op, Memory CLUSTER = "$7\r\nCLUSTER\r\n"u8.ToArray(); static readonly Memory FAILOVER = "FAILOVER"u8.ToArray(); + /// + /// PUBLISH resp formatted + /// + public static ReadOnlySpan PUBLISH => "PUBLISH"u8; + + /// + /// PUBLISH resp formatted + /// + public static ReadOnlySpan SPUBLISH => "SPUBLISH"u8; + /// /// Issue cluster failover command to replica node /// diff --git a/libs/client/GarnetClientAPI/GarnetClientExecuteAPI.cs b/libs/client/GarnetClientAPI/GarnetClientExecuteAPI.cs index a20ae5823d..dbed29ec64 100644 --- a/libs/client/GarnetClientAPI/GarnetClientExecuteAPI.cs +++ b/libs/client/GarnetClientAPI/GarnetClientExecuteAPI.cs @@ -159,23 +159,6 @@ public async Task ExecuteForStringResultWithCancellationAsync(Memory - /// Execute command (async) - /// - /// Operation in resp format - /// - /// - /// - /// - /// - /// - /// - /// - public async Task ExecuteForVoidResultWithCancellationAsync(Memory respOp, Memory clusterOp, string nodeId, long currentAddress, long nextAddress, long payloadPtr, int payloadLength, CancellationToken token = default) - { - await InternalExecuteAsync(respOp, clusterOp, nodeId, currentAddress, nextAddress, payloadPtr, payloadLength, token); - } - /// /// Execute command (async) /// @@ -1061,7 +1044,6 @@ public async Task ExecuteForLongResultWithCancellationAsync(string op, ICo } } - /// /// Execute command (async) with cancellation token /// @@ -1087,7 +1069,17 @@ public async Task ExecuteForLongResultWithCancellationAsync(Memory r } } - + /// + /// Execute command expecting no response + /// + /// + /// + /// + /// + /// + /// + public void ExecuteNoResponse(Memory op, ReadOnlySpan param1, ref Span param2, ref Span param3, CancellationToken token = default) + => InternalExecuteNoResponse(ref op, ref param1, ref param2, ref param3, token); #endregion void TokenRegistrationLongCallback(object s) => ((TaskCompletionSource)s).TrySetCanceled(); diff --git a/libs/client/GarnetClientProcessReplies.cs b/libs/client/GarnetClientProcessReplies.cs index 917057e0f9..7685749914 100644 --- a/libs/client/GarnetClientProcessReplies.cs +++ b/libs/client/GarnetClientProcessReplies.cs @@ -231,11 +231,11 @@ unsafe int ProcessReplies(byte* recvBufferPtr, int bytesRead) Thread.Yield(); continue; } + switch (tcs.taskType) { case TaskType.None: return readHead; - case TaskType.StringCallback: if (!ProcessReplyAsString(ref ptr, end, out var resultString, out var error)) return readHead; diff --git a/libs/client/NetworkWriter.cs b/libs/client/NetworkWriter.cs index e9654e9d98..8c9f597ec5 100644 --- a/libs/client/NetworkWriter.cs +++ b/libs/client/NetworkWriter.cs @@ -140,9 +140,10 @@ public long GetTailAddress() /// /// Number of bytes to allocate /// + /// /// The allocated logical address, or negative in case of inability to allocate [MethodImpl(MethodImplOptions.AggressiveInlining)] - private long TryAllocate(int size, out int taskId) + private long TryAllocate(int size, out int taskId, bool skipTaskIdIncrement = false) { PageOffset localTailPageOffset = default; localTailPageOffset.PageAndOffset = TailPageOffset.PageAndOffset; @@ -158,7 +159,7 @@ private long TryAllocate(int size, out int taskId) } // Determine insertion index. - localTailPageOffset.PageAndOffset = Interlocked.Add(ref TailPageOffset.PageAndOffset, size + (1L << PageOffset.kTaskOffset)); + localTailPageOffset.PageAndOffset = skipTaskIdIncrement ? Interlocked.Add(ref TailPageOffset.PageAndOffset, size) : Interlocked.Add(ref TailPageOffset.PageAndOffset, size + (1L << PageOffset.kTaskOffset)); taskId = localTailPageOffset.PrevTaskId; int page = localTailPageOffset.Page; @@ -214,9 +215,10 @@ private long TryAllocate(int size, out int taskId) /// /// Number of slots to allocate /// + /// /// The allocated logical address [MethodImpl(MethodImplOptions.AggressiveInlining)] - public (int, long) TryAllocate(int size, out CompletionEvent flushEvent) + public (int, long) TryAllocate(int size, out CompletionEvent flushEvent, bool skipTaskIdIncrement = false) { const int kFlushSpinCount = 10; var spins = 0; @@ -224,7 +226,7 @@ private long TryAllocate(int size, out int taskId) { Debug.Assert(epoch.ThisInstanceProtected()); flushEvent = this.FlushEvent; - var logicalAddress = this.TryAllocate(size, out int taskId); + var logicalAddress = this.TryAllocate(size, out int taskId, skipTaskIdIncrement: skipTaskIdIncrement); // Console.WriteLine($"Allocated {logicalAddress}-{logicalAddress + size}"); if (logicalAddress >= 0) diff --git a/libs/cluster/Server/ClusterConfig.cs b/libs/cluster/Server/ClusterConfig.cs index c45320b0ce..c97a66aa1d 100644 --- a/libs/cluster/Server/ClusterConfig.cs +++ b/libs/cluster/Server/ClusterConfig.cs @@ -785,6 +785,32 @@ public List GetReplicas(string nodeid, ClusterProvider clusterProvider) return replicas; } + /// + /// Get all know node ids + /// + public void GetAllNodeIds(out List<(string, string, int)> allNodeIds) + { + allNodeIds = []; + for (ushort i = 2; i < workers.Length; i++) + allNodeIds.Add((workers[i].Nodeid, workers[i].Address, workers[i].Port)); + } + + /// + /// Get node-ids for nodes in the local shard + /// + /// + public void GetNodeIdsForShard(out List<(string, string, int)> shardNodeIds) + { + var primaryId = LocalNodeRole == NodeRole.PRIMARY ? LocalNodeId : workers[1].ReplicaOfNodeId; + shardNodeIds = []; + for (ushort i = 2; i < workers.Length; i++) + { + var replicaOf = workers[i].ReplicaOfNodeId; + if (primaryId != null && ((replicaOf != null && replicaOf.Equals(primaryId, StringComparison.OrdinalIgnoreCase)) || primaryId.Equals(workers[i].Nodeid))) + shardNodeIds.Add((workers[i].Nodeid, workers[i].Address, workers[i].Port)); + } + } + /// /// /// diff --git a/libs/cluster/Server/ClusterProvider.cs b/libs/cluster/Server/ClusterProvider.cs index c813b716e2..89ba7acc8d 100644 --- a/libs/cluster/Server/ClusterProvider.cs +++ b/libs/cluster/Server/ClusterProvider.cs @@ -333,6 +333,76 @@ public void PurgeBufferPool(ManagerType managerType) throw new GarnetException(); } + public void ExtractKeySpecs(RespCommandsInfo commandInfo, RespCommand cmd, ref SessionParseState parseState, ref ClusterSlotVerificationInput csvi) + { + var specs = commandInfo.KeySpecifications; + switch (specs.Length) + { + case 1: + var searchIndex = (BeginSearchIndex)specs[0].BeginSearch; + csvi.readOnly = specs[0].Flags.HasFlag(KeySpecificationFlags.RO); + switch (specs[0].FindKeys) + { + case FindKeysRange: + var findRange = (FindKeysRange)specs[0].FindKeys; + csvi.firstKey = searchIndex.Index - 1; + csvi.lastKey = findRange.LastKey < 0 ? findRange.LastKey + parseState.Count + 1 : findRange.LastKey - searchIndex.Index + 1; + csvi.step = findRange.KeyStep; + csvi.readOnly = !specs[0].Flags.HasFlag(KeySpecificationFlags.RW); + break; + case FindKeysKeyNum: + var findKeysKeyNum = (FindKeysKeyNum)specs[0].FindKeys; + csvi.firstKey = searchIndex.Index + findKeysKeyNum.FirstKey - 1; + csvi.lastKey = csvi.firstKey + parseState.GetInt(searchIndex.Index + findKeysKeyNum.KeyNumIdx - 1); + csvi.step = findKeysKeyNum.KeyStep; + break; + case FindKeysUnknown: + default: + throw new GarnetException("FindKeys spec not known"); + } + + break; + case 2: + searchIndex = (BeginSearchIndex)specs[0].BeginSearch; + switch (specs[0].FindKeys) + { + case FindKeysRange: + csvi.firstKey = RespCommand.BITOP == cmd ? searchIndex.Index - 2 : searchIndex.Index - 1; + break; + case FindKeysKeyNum: + case FindKeysUnknown: + default: + throw new GarnetException("FindKeys spec not known"); + } + + var searchIndex1 = (BeginSearchIndex)specs[1].BeginSearch; + switch (specs[1].FindKeys) + { + case FindKeysRange: + var findRange = (FindKeysRange)specs[1].FindKeys; + csvi.lastKey = findRange.LastKey < 0 ? findRange.LastKey + parseState.Count + 1 : findRange.LastKey + searchIndex1.Index - searchIndex.Index + 1; + csvi.step = findRange.KeyStep; + break; + case FindKeysKeyNum: + var findKeysKeyNum = (FindKeysKeyNum)specs[1].FindKeys; + csvi.keyNumOffset = searchIndex1.Index + findKeysKeyNum.KeyNumIdx - 1; + csvi.lastKey = searchIndex1.Index + parseState.GetInt(csvi.keyNumOffset); + csvi.step = findKeysKeyNum.KeyStep; + break; + case FindKeysUnknown: + default: + throw new GarnetException("FindKeys spec not known"); + } + + break; + default: + throw new GarnetException("KeySpecification not supported count"); + } + } + + public void ClusterPublish(RespCommand cmd, ref Span channel, ref Span message) + => clusterManager.TryClusterPublish(cmd, ref channel, ref message); + internal ReplicationLogCheckpointManager GetReplicationLogCheckpointManager(StoreType storeType) { Debug.Assert(serverOptions.EnableCluster); diff --git a/libs/cluster/Server/Failover/ReplicaFailoverSession.cs b/libs/cluster/Server/Failover/ReplicaFailoverSession.cs index dcbcc7dda9..f5994c9985 100644 --- a/libs/cluster/Server/Failover/ReplicaFailoverSession.cs +++ b/libs/cluster/Server/Failover/ReplicaFailoverSession.cs @@ -40,29 +40,14 @@ private async Task GetOrAddConnectionAsync(string nodeId) { _ = clusterProvider.clusterManager.clusterConnectionStore.GetConnection(nodeId, out var gsn); - // If connection not available try to initialize it + var (address, port) = oldConfig.GetEndpointFromNodeId(nodeId); + while (!clusterProvider.clusterManager.clusterConnectionStore.GetOrAdd(clusterProvider, address, port, clusterProvider.serverOptions.TlsOptions, nodeId, out gsn, logger: logger)) + _ = System.Threading.Thread.Yield(); + if (gsn == null) { - var (address, port) = oldConfig.GetEndpointFromNodeId(nodeId); - gsn = new GarnetServerNode( - clusterProvider, - address, - port, - clusterProvider.storeWrapper.serverOptions.TlsOptions?.TlsClientOptions, - logger: logger); - - // Try add connection to the connection store - if (!clusterProvider.clusterManager.clusterConnectionStore.AddConnection(gsn)) - { - // If failed to add dispose connection resources - gsn.Dispose(); - // Retry to get established connection if it was added after our first attempt - _ = clusterProvider.clusterManager.clusterConnectionStore.GetConnection(nodeId, out gsn); - } - - // Final check fail, if connection is not established. - if (gsn == null) - throw new GarnetException($"Connection not established to node {nodeId}"); + logger?.LogWarning("TryMeet: Could not establish connection to remote node [{nodeId} {address}:{port}] failed", nodeId, address, port); + return null; } await gsn.InitializeAsync(); diff --git a/libs/cluster/Server/GarnetClientExtensions.cs b/libs/cluster/Server/GarnetClientExtensions.cs index c81b226358..0cf3053a0f 100644 --- a/libs/cluster/Server/GarnetClientExtensions.cs +++ b/libs/cluster/Server/GarnetClientExtensions.cs @@ -7,6 +7,7 @@ using System.Threading.Tasks; using Garnet.client; using Garnet.common; +using Garnet.server; namespace Garnet.cluster { @@ -15,6 +16,9 @@ internal static partial class GarnetClientExtensions static readonly Memory GOSSIP = "GOSSIP"u8.ToArray(); static readonly Memory WITHMEET = "WITHMEET"u8.ToArray(); + static Memory PUBLISH => "PUBLISH"u8.ToArray(); + static Memory SPUBLISH => "SPUBLISH"u8.ToArray(); + /// /// Send config /// @@ -60,5 +64,8 @@ public static async Task failreplicationoffset(this GarnetClient client, l }; return await client.ExecuteForLongResultWithCancellationAsync(GarnetClient.CLUSTER, args, cancellationToken).ConfigureAwait(false); } + + public static void ClusterPublishNoResponse(this GarnetClient client, RespCommand cmd, ref Span channel, ref Span message, CancellationToken cancellationToken = default) + => client.ExecuteNoResponse(GarnetClient.CLUSTER, RespCommand.PUBLISH == cmd ? GarnetClient.PUBLISH : GarnetClient.SPUBLISH, ref channel, ref message, cancellationToken); } } \ No newline at end of file diff --git a/libs/cluster/Server/GarnetClusterConnectionStore.cs b/libs/cluster/Server/GarnetClusterConnectionStore.cs index a78583a3be..6c50c2fc5e 100644 --- a/libs/cluster/Server/GarnetClusterConnectionStore.cs +++ b/libs/cluster/Server/GarnetClusterConnectionStore.cs @@ -4,6 +4,7 @@ using System; using System.Security.Cryptography; using Garnet.common; +using Garnet.server.TLS; using Microsoft.Extensions.Logging; namespace Garnet.cluster @@ -92,6 +93,60 @@ public bool AddConnection(GarnetServerNode conn) return true; } + /// + /// Get or add a connection to the store. + /// + /// + /// + /// + /// + /// + /// + /// + /// + public bool GetOrAdd(ClusterProvider clusterProvider, string address, int port, IGarnetTlsOptions tlsOptions, string nodeId, out GarnetServerNode conn, ILogger logger = null) + { + conn = null; + try + { + _lock.WriteLock(); + if (_disposed) return false; + + if (UnsafeGetConnection(nodeId, out conn)) return true; + + conn = new GarnetServerNode(clusterProvider, address, port, tlsOptions?.TlsClientOptions, logger: logger) + { + NodeId = nodeId + }; + + // Iterate array of existing connections + for (int i = 0; i < numConnection; i++) + { + var _conn = connections[i]; + if (_conn.NodeId.Equals(conn.NodeId, StringComparison.OrdinalIgnoreCase)) + { + return false; + } + } + + if (numConnection == connections.Length) + { + var oldArray = connections; + var newArray = new GarnetServerNode[connections.Length * 2]; + Array.Copy(oldArray, newArray, oldArray.Length); + Array.Clear(oldArray); + connections = newArray; + } + connections[numConnection++] = conn; + } + finally + { + _lock.WriteUnlock(); + } + + return true; + } + public void CloseAll() { try diff --git a/libs/cluster/Server/GarnetServerNode.cs b/libs/cluster/Server/GarnetServerNode.cs index dae6afb3e4..0a735588a7 100644 --- a/libs/cluster/Server/GarnetServerNode.cs +++ b/libs/cluster/Server/GarnetServerNode.cs @@ -7,6 +7,7 @@ using System.Threading.Tasks; using Garnet.client; using Garnet.common; +using Garnet.server; using Microsoft.Extensions.Logging; namespace Garnet.cluster @@ -22,7 +23,7 @@ internal sealed class GarnetServerNode CancellationTokenSource internalCts = new(); volatile int initialized = 0; readonly ILogger logger = null; - int disposeCount = 0; + SingleWriterMultiReaderLock dispose; /// /// Last transmitted configuration @@ -59,6 +60,16 @@ internal sealed class GarnetServerNode /// public int Port; + /// + /// Default send page size for GarnetClient + /// + const int defaultSendPageSize = 1 << 17; + + /// + /// Default max outstanding tasks for GarnetClient + /// + const int defaultMaxOutstandingTask = 8; + /// /// GarnetServerNode constructor /// @@ -69,13 +80,15 @@ internal sealed class GarnetServerNode /// public GarnetServerNode(ClusterProvider clusterProvider, string address, int port, SslClientAuthenticationOptions tlsOptions, ILogger logger = null) { + var opts = clusterProvider.storeWrapper.serverOptions; this.clusterProvider = clusterProvider; this.Address = address; this.Port = port; this.gc = new GarnetClient( address, port, tlsOptions, - sendPageSize: 1 << 17, - maxOutstandingTasks: 8, + sendPageSize: opts.DisablePubSub ? defaultSendPageSize : Math.Max(defaultSendPageSize, (int)opts.PubSubPageSizeBytes()), + maxOutstandingTasks: defaultMaxOutstandingTask, + timeoutMilliseconds: opts.ClusterTimeout <= 0 ? 0 : TimeSpan.FromSeconds(opts.ClusterTimeout).Milliseconds, authUsername: clusterProvider.clusterManager.clusterProvider.ClusterUsername, authPassword: clusterProvider.clusterManager.clusterProvider.ClusterPassword, logger: logger); @@ -93,7 +106,7 @@ public GarnetServerNode(ClusterProvider clusterProvider, string address, int por public Task InitializeAsync() { // Ensure initialize executes only once - if (Interlocked.CompareExchange(ref initialized, 1, 0) != 0) return Task.CompletedTask; + if (initialized != 0 || Interlocked.CompareExchange(ref initialized, 1, 0) != 0) return Task.CompletedTask; cts = CancellationTokenSource.CreateLinkedTokenSource(clusterProvider.clusterManager.ctsGossip.Token, internalCts.Token); return gc.ReconnectAsync().WaitAsync(clusterProvider.clusterManager.gossipDelay, cts.Token); @@ -101,8 +114,13 @@ public Task InitializeAsync() public void Dispose() { - if (Interlocked.Increment(ref disposeCount) != 1) + // Single write lock acquisition only + if (!dispose.CloseLock()) + { logger?.LogTrace("GarnetServerNode.Dispose called multiple times"); + return; + } + try { cts?.Cancel(); @@ -146,9 +164,9 @@ byte[] GetMostRecentConfig() byte[] byteArray; if (conf != lastConfig) { - if (clusterProvider.replicationManager != null) conf.LazyUpdateLocalReplicationOffset(clusterProvider.replicationManager.ReplicationOffset); - byteArray = conf.ToByteArray(); lastConfig = conf; + if (clusterProvider.replicationManager != null) lastConfig.LazyUpdateLocalReplicationOffset(clusterProvider.replicationManager.ReplicationOffset); + byteArray = lastConfig.ToByteArray(); } else { @@ -246,6 +264,10 @@ public bool TryGossip() return false; } + /// + /// Get connection info + /// + /// public ConnectionInfo GetConnectionInfo() { var nowTicks = DateTimeOffset.UtcNow.Ticks; @@ -259,5 +281,32 @@ public ConnectionInfo GetConnectionInfo() lastIO = last_io_seconds, }; } + + /// + /// Send a CLUSTER PUBLISH message to another remote node + /// + /// + /// + /// + public void TryClusterPublish(RespCommand cmd, ref Span channel, ref Span message) + { + var locked = false; + try + { + // Try to acquire dispose lock to avoid a dispose during publish forwarding + if (!dispose.TryReadLock()) + { + logger?.LogWarning("Could not acquire readLock for publish forwarding"); + return; + } + + locked = true; + gc.ClusterPublishNoResponse(cmd, ref channel, ref message); + } + finally + { + if (locked) dispose.ReadUnlock(); + } + } } } \ No newline at end of file diff --git a/libs/cluster/Server/Gossip.cs b/libs/cluster/Server/Gossip.cs index e1b1987b60..a6c9bcb573 100644 --- a/libs/cluster/Server/Gossip.cs +++ b/libs/cluster/Server/Gossip.cs @@ -7,6 +7,7 @@ using System.Threading; using System.Threading.Tasks; using Garnet.common; +using Garnet.server; using Microsoft.Extensions.Logging; namespace Garnet.cluster @@ -200,6 +201,48 @@ public async Task TryMeetAsync(string address, int port, bool acquireLock = true } } + /// + /// Forward message by issuing CLUSTER PUBLISH|SPUBLISH + /// + /// + /// + /// + public void TryClusterPublish(RespCommand cmd, ref Span channel, ref Span message) + { + var conf = CurrentConfig; + List<(string, string, int)> nodeEntries = null; + if (cmd == RespCommand.PUBLISH) + conf.GetAllNodeIds(out nodeEntries); + else + conf.GetNodeIdsForShard(out nodeEntries); + foreach (var entry in nodeEntries) + { + try + { + var nodeId = entry.Item1; + var address = entry.Item2; + var port = entry.Item3; + GarnetServerNode gsn = null; + while (!clusterConnectionStore.GetOrAdd(clusterProvider, address, port, tlsOptions, nodeId, out gsn, logger: logger)) + Thread.Yield(); + + if (gsn == null) + continue; + + // Initialize GarnetServerNode + // Thread-Safe initialization executes only once + gsn.InitializeAsync().GetAwaiter().GetResult(); + + // Publish to remote nodes + gsn.TryClusterPublish(cmd, ref channel, ref message); + } + catch (Exception ex) + { + logger?.LogWarning(ex, $"{nameof(ClusterManager)}.{nameof(TryClusterPublish)}"); + } + } + } + /// /// Main gossip async task /// @@ -257,20 +300,25 @@ async Task InitConnections() // Establish new connection only if it is not in banlist and not in dictionary if (!workerBanList.ContainsKey(nodeId) && !clusterConnectionStore.GetConnection(nodeId, out var _)) { - var gsn = new GarnetServerNode(clusterProvider, address, port, tlsOptions?.TlsClientOptions, logger: logger) - { - NodeId = nodeId - }; try { + GarnetServerNode gsn = null; + while (!clusterConnectionStore.GetOrAdd(clusterProvider, address, port, tlsOptions, nodeId, out gsn, logger: logger)) + await Task.Yield(); + + if (gsn == null) + { + logger?.LogWarning("InitConnections: Could not establish connection to remote node [{nodeId} {address}:{port}] failed", nodeId, address, port); + _ = clusterConnectionStore.TryRemove(nodeId); + continue; + } + await gsn.InitializeAsync(); - if (!clusterConnectionStore.AddConnection(gsn)) - gsn.Dispose(); } catch (Exception ex) { - logger?.LogWarning("Connection to remote node [{nodeId} {address}:{port}] failed with message:{msg}", nodeId, address, port, ex.Message); - gsn?.Dispose(); + logger?.LogWarning(ex, "InitConnections: Could not establish connection to remote node [{nodeId} {address}:{port}] failed", nodeId, address, port); + _ = clusterConnectionStore.TryRemove(nodeId); } } } diff --git a/libs/cluster/Session/ClusterCommands.cs b/libs/cluster/Session/ClusterCommands.cs index 33469ce790..d8a682519b 100644 --- a/libs/cluster/Session/ClusterCommands.cs +++ b/libs/cluster/Session/ClusterCommands.cs @@ -4,7 +4,6 @@ using System; using System.Collections.Generic; using System.Text; -using Garnet.common; using Garnet.server; namespace Garnet.cluster @@ -165,6 +164,7 @@ private void ProcessClusterCommands(RespCommand command, out bool invalidParamet RespCommand.CLUSTER_MYID => NetworkClusterMyId(out invalidParameters), RespCommand.CLUSTER_MYPARENTID => NetworkClusterMyParentId(out invalidParameters), RespCommand.CLUSTER_NODES => NetworkClusterNodes(out invalidParameters), + RespCommand.CLUSTER_PUBLISH or RespCommand.CLUSTER_SPUBLISH => NetworkClusterPublish(out invalidParameters), RespCommand.CLUSTER_REPLICAS => NetworkClusterReplicas(out invalidParameters), RespCommand.CLUSTER_REPLICATE => NetworkClusterReplicate(out invalidParameters), RespCommand.CLUSTER_RESET => NetworkClusterReset(out invalidParameters), diff --git a/libs/cluster/Session/ClusterSession.cs b/libs/cluster/Session/ClusterSession.cs index 40255ea30e..df90f4f8f2 100644 --- a/libs/cluster/Session/ClusterSession.cs +++ b/libs/cluster/Session/ClusterSession.cs @@ -27,6 +27,7 @@ internal sealed unsafe partial class ClusterSession : IClusterSession BasicGarnetApi basicGarnetApi; readonly INetworkSender networkSender; readonly ILogger logger; + ClusterSlotVerificationInput csvi; // Authenticator used to validate permissions for cluster commands readonly IGarnetAuthenticator authenticator; @@ -74,6 +75,14 @@ public void ProcessClusterCommands(RespCommand command, ref SessionParseState pa { if (command.IsClusterSubCommand()) { + if (RespCommandsInfo.TryGetRespCommandInfo(command, out var commandInfo) && commandInfo.KeySpecifications != null) + { + csvi.keyNumOffset = -1; + clusterProvider.ExtractKeySpecs(commandInfo, command, ref parseState, ref csvi); + if (NetworkMultiKeySlotVerifyNoResponse(ref parseState, ref csvi, ref this.dcurr, ref this.dend)) + return; + } + ProcessClusterCommands(command, out invalidParameters); if (invalidParameters) diff --git a/libs/cluster/Session/RespClusterBasicCommands.cs b/libs/cluster/Session/RespClusterBasicCommands.cs index 16b5683b70..7bddbc7b0a 100644 --- a/libs/cluster/Session/RespClusterBasicCommands.cs +++ b/libs/cluster/Session/RespClusterBasicCommands.cs @@ -293,7 +293,7 @@ private bool NetworkClusterSetConfigEpoch(out bool invalidParameters) return true; } - if (clusterProvider.clusterManager.CurrentConfig.NumWorkers > 2) + if (clusterProvider.clusterManager.CurrentConfig.NumWorkers > 1) { while (!RespWriteUtils.TryWriteError(CmdStrings.RESP_ERR_GENERIC_CONFIG_EPOCH_ASSIGNMENT, ref dcurr, dend)) SendAndReset(); @@ -450,5 +450,33 @@ private bool NetworkClusterReset(out bool invalidParameters) return true; } + + /// + /// Implement CLUSTER PUBLISH command + /// + /// + /// + private bool NetworkClusterPublish(out bool invalidParameters) + { + invalidParameters = false; + + // CLUSTER PUBLISH|SPUBLISH channel message + // Expecting exactly 2 arguments + if (parseState.Count != 2) + { + invalidParameters = true; + return true; + } + + if (clusterProvider.storeWrapper.subscribeBroker == null) + { + while (!RespWriteUtils.TryWriteError("ERR PUBLISH is disabled, enable it with --pubsub option."u8, ref dcurr, dend)) + SendAndReset(); + return true; + } + + clusterProvider.storeWrapper.subscribeBroker.Publish(ref parseState, true); + return true; + } } } \ No newline at end of file diff --git a/libs/cluster/Session/SlotVerification/RespClusterSlotVerify.cs b/libs/cluster/Session/SlotVerification/RespClusterSlotVerify.cs index 31e5390b6e..af69ed8d2b 100644 --- a/libs/cluster/Session/SlotVerification/RespClusterSlotVerify.cs +++ b/libs/cluster/Session/SlotVerification/RespClusterSlotVerify.cs @@ -110,6 +110,14 @@ public bool NetworkKeyArraySlotVerify(Span keys, bool readOnly, byte s return true; } + /// + /// Verify multi-key slot ownership + /// + /// + /// + /// + /// + /// public unsafe bool NetworkMultiKeySlotVerify(ref SessionParseState parseState, ref ClusterSlotVerificationInput csvi, ref byte* dcurr, ref byte* dend) { // If cluster is not enabled or a transaction is running skip slot check @@ -124,5 +132,24 @@ public unsafe bool NetworkMultiKeySlotVerify(ref SessionParseState parseState, r WriteClusterSlotVerificationMessage(config, vres, ref dcurr, ref dend); return true; } + + /// + /// Verify multi-key slot ownership without generating a response + /// + /// + /// + /// + /// + /// + public unsafe bool NetworkMultiKeySlotVerifyNoResponse(ref SessionParseState parseState, ref ClusterSlotVerificationInput csvi, ref byte* dcurr, ref byte* dend) + { + // If cluster is not enabled or a transaction is running skip slot check + if (!clusterProvider.serverOptions.EnableCluster || txnManager.state == TxnState.Running) return false; + + var config = clusterProvider.clusterManager.CurrentConfig; + var vres = MultiKeySlotVerify(config, ref parseState, ref csvi); + + return vres.state != SlotVerifiedState.OK; + } } } \ No newline at end of file diff --git a/libs/common/SingleWriterMultiReaderLock.cs b/libs/common/SingleWriterMultiReaderLock.cs index 6c448399f5..c32db2876c 100644 --- a/libs/common/SingleWriterMultiReaderLock.cs +++ b/libs/common/SingleWriterMultiReaderLock.cs @@ -88,11 +88,12 @@ public void ReadUnlock() } /// - /// Continuously attempt to acquire write lock until lock is acquired or it is write locked + /// Try acquire write lock and spin wait until isWriteLocked + /// NOTE: once closed this lock should never be unlocked because is considered disposed /// /// Return true if current thread is the one that acquired write lock [MethodImpl(MethodImplOptions.AggressiveInlining)] - public bool OneWriteLock() + public bool CloseLock() { while (true) { diff --git a/libs/host/GarnetServer.cs b/libs/host/GarnetServer.cs index c4af9b8dfe..973399ad80 100644 --- a/libs/host/GarnetServer.cs +++ b/libs/host/GarnetServer.cs @@ -246,7 +246,7 @@ private void InitializeServer() this.server ??= new GarnetServerTcp(opts.Address, opts.Port, 0, opts.TlsOptions, opts.NetworkSendThrottleMax, opts.NetworkConnectionLimit, logger); storeWrapper = new StoreWrapper(version, redisProtocolVersion, server, store, objectStore, objectStoreSizeTracker, - customCommandManager, appendOnlyFile, opts, clusterFactory: clusterFactory, loggerFactory: loggerFactory); + customCommandManager, appendOnlyFile, opts, subscribeBroker, clusterFactory: clusterFactory, loggerFactory: loggerFactory); // Create session provider for Garnet Provider = new GarnetProvider(storeWrapper, subscribeBroker); diff --git a/libs/resources/RespCommandsDocs.json b/libs/resources/RespCommandsDocs.json index 456cf2345f..a22cf66e5b 100644 --- a/libs/resources/RespCommandsDocs.json +++ b/libs/resources/RespCommandsDocs.json @@ -6589,6 +6589,27 @@ } ] }, + { + "Command": "SPUBLISH", + "Name": "SPUBLISH", + "Summary": "Posts a message to a shard channel.", + "Group": "PubSub", + "Complexity": "O(N) where N is the number of clients subscribed to the receiving shard channel.", + "Arguments": [ + { + "TypeDiscriminator": "RespCommandBasicArgument", + "Name": "SHARDCHANNEL", + "DisplayText": "shardchannel", + "Type": "String" + }, + { + "TypeDiscriminator": "RespCommandBasicArgument", + "Name": "MESSAGE", + "DisplayText": "message", + "Type": "String" + } + ] + }, { "Command": "STRLEN", "Name": "STRLEN", @@ -6621,6 +6642,22 @@ } ] }, + { + "Command": "SSUBSCRIBE", + "Name": "SSUBSCRIBE", + "Summary": "Listens for messages published to shard channels.", + "Group": "PubSub", + "Complexity": "O(N) where N is the number of shard channels to subscribe to.", + "Arguments": [ + { + "TypeDiscriminator": "RespCommandBasicArgument", + "Name": "shardchannel", + "DisplayText": "channel", + "Type": "String", + "ArgumentFlags": "Multiple" + } + ] + }, { "Command": "SUBSTR", "Name": "SUBSTR", diff --git a/libs/resources/RespCommandsInfo.json b/libs/resources/RespCommandsInfo.json index 0ec5c0b623..159b575b5a 100644 --- a/libs/resources/RespCommandsInfo.json +++ b/libs/resources/RespCommandsInfo.json @@ -617,6 +617,43 @@ "Flags": "Admin, NoMulti, NoScript", "AclCategories": "Admin, Dangerous, Slow, Garnet" }, + { + "Command": "CLUSTER_PUBLISH", + "Name": "CLUSTER|PUBLISH", + "IsInternal": true, + "Arity": 4, + "Flags": "Loading, NoScript, PubSub, Stale", + "FirstKey": 1, + "LastKey": 1, + "Step": 1, + "AclCategories": "Admin, PubSub, Slow, Garnet" + }, + { + "Command": "CLUSTER_SPUBLISH", + "Name": "CLUSTER|SPUBLISH", + "IsInternal": true, + "Arity": 4, + "Flags": "Loading, NoScript, PubSub, Stale", + "FirstKey": 1, + "LastKey": 1, + "Step": 1, + "AclCategories": "Admin, PubSub, Slow, Garnet", + "KeySpecifications": [ + { + "BeginSearch": { + "TypeDiscriminator": "BeginSearchIndex", + "Index": 1 + }, + "FindKeys": { + "TypeDiscriminator": "FindKeysRange", + "LastKey": 0, + "KeyStep": 1, + "Limit": 0 + }, + "Flags": "RO" + } + ] + }, { "Command": "CLUSTER_BUMPEPOCH", "Name": "CLUSTER|BUMPEPOCH", @@ -4347,6 +4384,56 @@ "Flags": "Loading, NoScript, PubSub, Stale", "AclCategories": "PubSub, Slow" }, + { + "Command": "SSUBSCRIBE", + "Name": "SSUBSCRIBE", + "Arity": -2, + "Flags": "Loading, NoScript, PubSub, Stale", + "FirstKey": 1, + "LastKey": -1, + "Step": 1, + "AclCategories": "PubSub, Slow, Read", + "KeySpecifications": [ + { + "BeginSearch": { + "TypeDiscriminator": "BeginSearchIndex", + "Index": 1 + }, + "FindKeys": { + "TypeDiscriminator": "FindKeysRange", + "LastKey": -1, + "KeyStep": 1, + "Limit": 0 + }, + "Flags": "RO" + } + ] + }, + { + "Command": "SPUBLISH", + "Name": "SPUBLISH", + "Arity": 3, + "Flags": "Loading, NoScript, PubSub, Stale", + "FirstKey": 1, + "LastKey": 1, + "Step": 1, + "AclCategories": "PubSub, Slow, Read", + "KeySpecifications": [ + { + "BeginSearch": { + "TypeDiscriminator": "BeginSearchIndex", + "Index": 1 + }, + "FindKeys": { + "TypeDiscriminator": "FindKeysRange", + "LastKey": 0, + "KeyStep": 1, + "Limit": 0 + }, + "Flags": "RO" + } + ] + }, { "Command": "SUBSTR", "Name": "SUBSTR", diff --git a/libs/server/AOF/AofProcessor.cs b/libs/server/AOF/AofProcessor.cs index 2a916e3689..aed7b5d27f 100644 --- a/libs/server/AOF/AofProcessor.cs +++ b/libs/server/AOF/AofProcessor.cs @@ -78,6 +78,7 @@ public AofProcessor( storeWrapper.customCommandManager, recordToAof ? storeWrapper.appendOnlyFile : null, storeWrapper.serverOptions, + storeWrapper.subscribeBroker, accessControlList: storeWrapper.accessControlList, loggerFactory: storeWrapper.loggerFactory); diff --git a/libs/server/Cluster/IClusterProvider.cs b/libs/server/Cluster/IClusterProvider.cs index e3ebf5e7f8..c19d4964a2 100644 --- a/libs/server/Cluster/IClusterProvider.cs +++ b/libs/server/Cluster/IClusterProvider.cs @@ -69,6 +69,21 @@ public interface IClusterProvider : IDisposable void PurgeBufferPool(ManagerType managerType); /// + /// Extract key specs + /// + /// + /// + /// + void ExtractKeySpecs(RespCommandsInfo commandInfo, RespCommand cmd, ref SessionParseState parseState, ref ClusterSlotVerificationInput csvi); + + /// + /// Issue a cluster publish message to remote nodes + /// + /// + /// + /// + void ClusterPublish(RespCommand cmd, ref Span channel, ref Span message); + /// Is Primary /// /// diff --git a/libs/server/Cluster/IClusterSession.cs b/libs/server/Cluster/IClusterSession.cs index a7f76a8807..9e23d4b375 100644 --- a/libs/server/Cluster/IClusterSession.cs +++ b/libs/server/Cluster/IClusterSession.cs @@ -88,6 +88,16 @@ public interface IClusterSession /// unsafe bool NetworkMultiKeySlotVerify(ref SessionParseState parseState, ref ClusterSlotVerificationInput csvi, ref byte* dcurr, ref byte* dend); + /// + /// Array slot verify with no response + /// + /// + /// + /// + /// + /// + unsafe bool NetworkMultiKeySlotVerifyNoResponse(ref SessionParseState parseState, ref ClusterSlotVerificationInput csvi, ref byte* dcurr, ref byte* dend); + /// /// Sets the user currently authenticated in this session (used for permission checks) /// diff --git a/libs/server/PubSub/SubscribeBroker.cs b/libs/server/PubSub/SubscribeBroker.cs index f63cabd220..a6c5a719d9 100644 --- a/libs/server/PubSub/SubscribeBroker.cs +++ b/libs/server/PubSub/SubscribeBroker.cs @@ -383,14 +383,21 @@ public unsafe int PublishNow(byte* key, byte* value, int valueLength, bool ascii /// /// Publish the update made to key to all the subscribers, asynchronously /// - /// key that has been updated - /// value that has been updated - /// value length that has been updated + /// ParseState for publish message /// is payload ascii - public unsafe void Publish(byte* key, byte* value, int valueLength, bool ascii = false) + public unsafe void Publish(ref SessionParseState parseState, bool ascii = false) { if (subscriptions == null && prefixSubscriptions == null) return; + var key = parseState.GetArgSliceByRef(0).ptr - sizeof(int); + var value = parseState.GetArgSliceByRef(1).ptr - sizeof(int); + + var kSize = parseState.GetArgSliceByRef(0).Length; + var vSize = parseState.GetArgSliceByRef(1).Length; + *(int*)key = kSize; + *(int*)value = vSize; + var valueLength = vSize + sizeof(int); + var start = key; ref TKey k = ref keySerializer.ReadKeyByRef(ref key); // TODO: this needs to be a single atomic enqueue diff --git a/libs/server/Resp/CmdStrings.cs b/libs/server/Resp/CmdStrings.cs index 7174d6c6f1..bdafb73705 100644 --- a/libs/server/Resp/CmdStrings.cs +++ b/libs/server/Resp/CmdStrings.cs @@ -15,6 +15,9 @@ static partial class CmdStrings /// public static ReadOnlySpan CLIENT => "CLIENT"u8; public static ReadOnlySpan SUBSCRIBE => "SUBSCRIBE"u8; + public static ReadOnlySpan subscribe => "subcribe"u8; + public static ReadOnlySpan SSUBSCRIBE => "SSUBSCRIBE"u8; + public static ReadOnlySpan ssubscribe => "ssubcribe"u8; public static ReadOnlySpan RUNTXP => "RUNTXP"u8; public static ReadOnlySpan GET => "GET"u8; public static ReadOnlySpan get => "get"u8; @@ -359,6 +362,8 @@ static partial class CmdStrings public static ReadOnlySpan delkeysinslotrange => "DELKEYSINSLOTRANGE"u8; public static ReadOnlySpan setslotsrange => "SETSLOTSRANGE"u8; public static ReadOnlySpan slotstate => "SLOTSTATE"u8; + public static ReadOnlySpan publish => "PUBLISH"u8; + public static ReadOnlySpan spublish => "SPUBLISH"u8; public static ReadOnlySpan mtasks => "MTASKS"u8; public static ReadOnlySpan aofsync => "AOFSYNC"u8; public static ReadOnlySpan appendlog => "APPENDLOG"u8; diff --git a/libs/server/Resp/Parser/RespCommand.cs b/libs/server/Resp/Parser/RespCommand.cs index 637a2005b6..dbb14372ce 100644 --- a/libs/server/Resp/Parser/RespCommand.cs +++ b/libs/server/Resp/Parser/RespCommand.cs @@ -70,8 +70,10 @@ public enum RespCommand : ushort SISMEMBER, SMEMBERS, SMISMEMBER, + SPUBLISH, SRANDMEMBER, SSCAN, + SSUBSCRIBE, STRLEN, SUBSTR, SUNION, @@ -333,6 +335,8 @@ public enum RespCommand : ushort CLUSTER_MYID, CLUSTER_MYPARENTID, CLUSTER_NODES, + CLUSTER_PUBLISH, + CLUSTER_SPUBLISH, CLUSTER_REPLICAS, CLUSTER_REPLICATE, CLUSTER_RESET, @@ -707,6 +711,7 @@ private RespCommand FastParseCommand(out int count) (2 << 4) | 6 when lastWord == MemoryMarshal.Read("GETSET\r\n"u8) => RespCommand.GETSET, (2 << 4) | 7 when lastWord == MemoryMarshal.Read("UBLISH\r\n"u8) && ptr[8] == 'P' => RespCommand.PUBLISH, (2 << 4) | 7 when lastWord == MemoryMarshal.Read("FMERGE\r\n"u8) && ptr[8] == 'P' => RespCommand.PFMERGE, + (2 << 4) | 8 when lastWord == MemoryMarshal.Read("UBLISH\r\n"u8) && *(ushort*)(ptr + 8) == MemoryMarshal.Read("SP"u8) => RespCommand.SPUBLISH, (2 << 4) | 5 when lastWord == MemoryMarshal.Read("\nSETNX\r\n"u8) => RespCommand.SETNX, (3 << 4) | 5 when lastWord == MemoryMarshal.Read("\nSETEX\r\n"u8) => RespCommand.SETEX, (3 << 4) | 6 when lastWord == MemoryMarshal.Read("PSETEX\r\n"u8) => RespCommand.PSETEX, @@ -728,7 +733,6 @@ private RespCommand FastParseCommand(out int count) >= ((6 << 4) | 2) and <= ((6 << 4) | 5) when lastWord == MemoryMarshal.Read("BITPOS\r\n"u8) => RespCommand.BITPOS, >= ((7 << 4) | 2) and <= ((7 << 4) | 3) when lastWord == MemoryMarshal.Read("EXPIRE\r\n"u8) && ptr[8] == 'P' => RespCommand.PEXPIRE, >= ((8 << 4) | 1) and <= ((8 << 4) | 4) when lastWord == MemoryMarshal.Read("TCOUNT\r\n"u8) && *(ushort*)(ptr + 8) == MemoryMarshal.Read("BI"u8) => RespCommand.BITCOUNT, - _ => MatchedNone(this, oldReadHead) } }; @@ -1450,6 +1454,12 @@ private RespCommand FastParseArrayCommand(ref int count, ref ReadOnlySpan return RespCommand.HEXPIREAT; } break; + case 10: + if (*(ulong*)(ptr + 4) == MemoryMarshal.Read("SSUBSCRI"u8) && *(uint*)(ptr + 11) == MemoryMarshal.Read("BE\r\n"u8)) + { + return RespCommand.SSUBSCRIBE; + } + break; } // Reset optimistically changed state, if no matching command was found @@ -1668,6 +1678,10 @@ private RespCommand SlowParseCommand(ref int count, ref ReadOnlySpan speci { return RespCommand.SUBSCRIBE; } + else if (command.SequenceEqual(CmdStrings.SSUBSCRIBE)) + { + return RespCommand.SSUBSCRIBE; + } else if (command.SequenceEqual(CmdStrings.RUNTXP)) { return RespCommand.RUNTXP; @@ -1956,6 +1970,14 @@ private RespCommand SlowParseCommand(ref int count, ref ReadOnlySpan speci { return RespCommand.CLUSTER_SLOTSTATE; } + else if (subCommand.SequenceEqual(CmdStrings.publish)) + { + return RespCommand.CLUSTER_PUBLISH; + } + else if (subCommand.SequenceEqual(CmdStrings.spublish)) + { + return RespCommand.CLUSTER_SPUBLISH; + } else if (subCommand.SequenceEqual(CmdStrings.MIGRATE)) { return RespCommand.CLUSTER_MIGRATE; diff --git a/libs/server/Resp/PubSubCommands.cs b/libs/server/Resp/PubSubCommands.cs index 10131453cc..3c248dbcf1 100644 --- a/libs/server/Resp/PubSubCommands.cs +++ b/libs/server/Resp/PubSubCommands.cs @@ -94,11 +94,25 @@ public override unsafe void PrefixPublish(byte* patternPtr, int patternLength, r /// /// PUBLISH /// - private bool NetworkPUBLISH() + private bool NetworkPUBLISH(RespCommand cmd) { if (parseState.Count != 2) { - return AbortWithWrongNumberOfArguments(nameof(RespCommand.PUBLISH)); + var cmdName = cmd switch + { + RespCommand.PUBLISH => nameof(RespCommand.PUBLISH), + RespCommand.SPUBLISH => nameof(RespCommand.SPUBLISH), + _ => throw new NotImplementedException() + }; + return AbortWithWrongNumberOfArguments(cmdName); + } + + if (cmd == RespCommand.SPUBLISH && clusterSession == null) + { + // Print error message + while (!RespWriteUtils.TryWriteError(CmdStrings.RESP_ERR_GENERIC_CLUSTER_DISABLED, ref dcurr, dend)) + SendAndReset(); + return true; } Debug.Assert(isSubscriptionSession == false); @@ -123,21 +137,49 @@ private bool NetworkPUBLISH() *(int*)valPtr = vSize; var numClients = subscribeBroker.PublishNow(keyPtr, valPtr, vSize + sizeof(int), true); + if (storeWrapper.serverOptions.EnableCluster) + { + var _key = parseState.GetArgSliceByRef(0).Span; + var _val = parseState.GetArgSliceByRef(1).Span; + storeWrapper.clusterProvider.ClusterPublish(cmd, ref _key, ref _val); + } + while (!RespWriteUtils.TryWriteInt32(numClients, ref dcurr, dend)) SendAndReset(); return true; } - private bool NetworkSUBSCRIBE() + private bool NetworkSUBSCRIBE(RespCommand cmd) { if (parseState.Count < 1) { - return AbortWithWrongNumberOfArguments(nameof(RespCommand.SUBSCRIBE)); + var cmdName = cmd switch + { + RespCommand.SUBSCRIBE => nameof(RespCommand.SUBSCRIBE), + RespCommand.SSUBSCRIBE => nameof(RespCommand.SSUBSCRIBE), + _ => throw new NotImplementedException() + }; + return AbortWithWrongNumberOfArguments(cmdName); + } + + if (cmd == RespCommand.SSUBSCRIBE && clusterSession == null) + { + // Print error message + while (!RespWriteUtils.TryWriteError(CmdStrings.RESP_ERR_GENERIC_CLUSTER_DISABLED, ref dcurr, dend)) + SendAndReset(); + return true; } - // SUBSCRIBE channel1 channel2.. ==> [$9\r\nSUBSCRIBE\r\n$]8\r\nchannel1\r\n$8\r\nchannel2\r\n => Subscribe to channel1 and channel2 var disabledBroker = subscribeBroker == null; + var header = cmd switch + { + RespCommand.SUBSCRIBE => CmdStrings.subscribe, + RespCommand.SSUBSCRIBE => CmdStrings.ssubscribe, + _ => throw new NotImplementedException() + }; + + // SUBSCRIBE|SUBSCRIBE channel1 channel2.. ==> [$9\r\nSUBSCRIBE\r\n$]8\r\nchannel1\r\n$8\r\nchannel2\r\n => Subscribe to channel1 and channel2 for (var c = 0; c < parseState.Count; c++) { var key = parseState.GetArgSliceByRef(c).SpanByte; @@ -150,8 +192,9 @@ private bool NetworkSUBSCRIBE() while (!RespWriteUtils.TryWriteArrayLength(3, ref dcurr, dend)) SendAndReset(); - while (!RespWriteUtils.TryWriteBulkString("subscribe"u8, ref dcurr, dend)) + while (!RespWriteUtils.TryWriteBulkString(header, ref dcurr, dend)) SendAndReset(); + while (!RespWriteUtils.TryWriteBulkString(new Span(keyPtr + sizeof(int), kSize), ref dcurr, dend)) SendAndReset(); diff --git a/libs/server/Resp/RespServerSession.cs b/libs/server/Resp/RespServerSession.cs index 4a6a8e5b6c..940d147be2 100644 --- a/libs/server/Resp/RespServerSession.cs +++ b/libs/server/Resp/RespServerSession.cs @@ -574,7 +574,8 @@ private bool ProcessBasicCommands(RespCommand cmd, ref TGarnetApi st RespCommand.GETBIT => NetworkStringGetBit(ref storageApi), RespCommand.BITCOUNT => NetworkStringBitCount(ref storageApi), RespCommand.BITPOS => NetworkStringBitPosition(ref storageApi), - RespCommand.PUBLISH => NetworkPUBLISH(), + RespCommand.PUBLISH => NetworkPUBLISH(RespCommand.PUBLISH), + RespCommand.SPUBLISH => NetworkPUBLISH(RespCommand.SPUBLISH), RespCommand.PING => parseState.Count == 0 ? NetworkPING() : NetworkArrayPING(), RespCommand.ASKING => NetworkASKING(), RespCommand.MULTI => NetworkMULTI(), @@ -614,7 +615,8 @@ private bool ProcessArrayCommands(RespCommand cmd, ref TGarnetApi st RespCommand.WATCHMS => NetworkWATCH_MS(), RespCommand.WATCHOS => NetworkWATCH_OS(), // Pub/sub commands - RespCommand.SUBSCRIBE => NetworkSUBSCRIBE(), + RespCommand.SUBSCRIBE => NetworkSUBSCRIBE(cmd), + RespCommand.SSUBSCRIBE => NetworkSUBSCRIBE(cmd), RespCommand.PSUBSCRIBE => NetworkPSUBSCRIBE(), RespCommand.UNSUBSCRIBE => NetworkUNSUBSCRIBE(), RespCommand.PUNSUBSCRIBE => NetworkPUNSUBSCRIBE(), diff --git a/libs/server/Resp/RespServerSessionSlotVerify.cs b/libs/server/Resp/RespServerSessionSlotVerify.cs index 323fb50be0..9de8ee1c18 100644 --- a/libs/server/Resp/RespServerSessionSlotVerify.cs +++ b/libs/server/Resp/RespServerSessionSlotVerify.cs @@ -3,7 +3,6 @@ using System; using System.Diagnostics; -using Garnet.common; namespace Garnet.server { @@ -41,68 +40,7 @@ bool CanServeSlot(RespCommand cmd) return true; csvi.keyNumOffset = -1; - var specs = commandInfo.KeySpecifications; - switch (specs.Length) - { - case 1: - var searchIndex = (BeginSearchIndex)specs[0].BeginSearch; - - switch (specs[0].FindKeys) - { - case FindKeysRange: - var findRange = (FindKeysRange)specs[0].FindKeys; - csvi.firstKey = searchIndex.Index - 1; - csvi.lastKey = findRange.LastKey < 0 ? findRange.LastKey + parseState.Count + 1 : findRange.LastKey - searchIndex.Index + 1; - csvi.step = findRange.KeyStep; - break; - case FindKeysKeyNum: - var findKeysKeyNum = (FindKeysKeyNum)specs[0].FindKeys; - csvi.firstKey = searchIndex.Index + findKeysKeyNum.FirstKey - 1; - csvi.lastKey = csvi.firstKey + parseState.GetInt(searchIndex.Index + findKeysKeyNum.KeyNumIdx - 1); - csvi.step = findKeysKeyNum.KeyStep; - break; - case FindKeysUnknown: - default: - throw new GarnetException("FindKeys spec not known"); - } - - break; - case 2: - searchIndex = (BeginSearchIndex)specs[0].BeginSearch; - switch (specs[0].FindKeys) - { - case FindKeysRange: - csvi.firstKey = RespCommand.BITOP == cmd ? searchIndex.Index - 2 : searchIndex.Index - 1; - break; - case FindKeysKeyNum: - case FindKeysUnknown: - default: - throw new GarnetException("FindKeys spec not known"); - } - - var searchIndex1 = (BeginSearchIndex)specs[1].BeginSearch; - switch (specs[1].FindKeys) - { - case FindKeysRange: - var findRange = (FindKeysRange)specs[1].FindKeys; - csvi.lastKey = findRange.LastKey < 0 ? findRange.LastKey + parseState.Count + 1 : findRange.LastKey + searchIndex1.Index - searchIndex.Index + 1; - csvi.step = findRange.KeyStep; - break; - case FindKeysKeyNum: - var findKeysKeyNum = (FindKeysKeyNum)specs[1].FindKeys; - csvi.keyNumOffset = searchIndex1.Index + findKeysKeyNum.KeyNumIdx - 1; - csvi.lastKey = searchIndex1.Index + parseState.GetInt(csvi.keyNumOffset); - csvi.step = findKeysKeyNum.KeyStep; - break; - case FindKeysUnknown: - default: - throw new GarnetException("FindKeys spec not known"); - } - - break; - default: - throw new GarnetException("KeySpecification not supported count"); - } + storeWrapper.clusterProvider.ExtractKeySpecs(commandInfo, cmd, ref parseState, ref csvi); csvi.readOnly = cmd.IsReadOnly(); csvi.sessionAsking = SessionAsking; return !clusterSession.NetworkMultiKeySlotVerify(ref parseState, ref csvi, ref dcurr, ref dend); diff --git a/libs/server/StoreWrapper.cs b/libs/server/StoreWrapper.cs index 6da4f7fca5..0bef69a596 100644 --- a/libs/server/StoreWrapper.cs +++ b/libs/server/StoreWrapper.cs @@ -47,6 +47,12 @@ public sealed class StoreWrapper /// Server options /// public readonly GarnetServerOptions serverOptions; + + /// + /// Subscribe broker + /// + public readonly SubscribeBroker> subscribeBroker; + internal readonly IClusterProvider clusterProvider; /// @@ -124,6 +130,7 @@ public StoreWrapper( CustomCommandManager customCommandManager, TsavoriteLog appendOnlyFile, GarnetServerOptions serverOptions, + SubscribeBroker> subscribeBroker, AccessControlList accessControlList = null, IClusterFactory clusterFactory = null, ILoggerFactory loggerFactory = null @@ -137,6 +144,7 @@ public StoreWrapper( this.objectStore = objectStore; this.appendOnlyFile = appendOnlyFile; this.serverOptions = serverOptions; + this.subscribeBroker = subscribeBroker; lastSaveTime = DateTimeOffset.FromUnixTimeSeconds(0); this.customCommandManager = customCommandManager; this.monitor = serverOptions.MetricsSamplingFrequency > 0 ? new GarnetServerMonitor(this, serverOptions, server, loggerFactory?.CreateLogger("GarnetServerMonitor")) : null; diff --git a/libs/server/Transaction/TxnKeyManager.cs b/libs/server/Transaction/TxnKeyManager.cs index 7238a9ecb6..2e525eee68 100644 --- a/libs/server/Transaction/TxnKeyManager.cs +++ b/libs/server/Transaction/TxnKeyManager.cs @@ -189,6 +189,7 @@ private static int AdminCommands(RespCommand command) RespCommand.CLIENT => 1, RespCommand.PING => 1, RespCommand.PUBLISH => 1, + RespCommand.SPUBLISH => 1, _ => -1 }; } diff --git a/playground/CommandInfoUpdater/GarnetCommandsDocs.json b/playground/CommandInfoUpdater/GarnetCommandsDocs.json index 1599c22037..84c80cea0e 100644 --- a/playground/CommandInfoUpdater/GarnetCommandsDocs.json +++ b/playground/CommandInfoUpdater/GarnetCommandsDocs.json @@ -763,5 +763,28 @@ ] } ] + }, + { + "Command": "CLUSTER", + "Name": "CLUSTER", + "Summary": "A container for Redis Cluster internal commands.", + "Group": "Cluster", + "Complexity": "Depends on subcommand.", + "SubCommands": [ + { + "Command": "CLUSTER_PUBLISH", + "Name": "CLUSTER|PUBLISH", + "Summary": "Processes a forwarded published message from any node in the cluster", + "Group": "Cluster", + "Complexity": "O(1)" + }, + { + "Command": "CLUSTER_SPUBLISH", + "Name": "CLUSTER|SPUBLISH", + "Summary": "Processes a forwarded published message from a node in the same shard", + "Group": "Cluster", + "Complexity": "O(1)" + } + ] } ] \ No newline at end of file diff --git a/playground/CommandInfoUpdater/GarnetCommandsInfo.json b/playground/CommandInfoUpdater/GarnetCommandsInfo.json index b7adc23f4c..04570affc7 100644 --- a/playground/CommandInfoUpdater/GarnetCommandsInfo.json +++ b/playground/CommandInfoUpdater/GarnetCommandsInfo.json @@ -207,6 +207,43 @@ "KeySpecifications": null, "SubCommands": null }, + { + "Command": "CLUSTER_PUBLISH", + "Name": "CLUSTER|PUBLISH", + "IsInternal": true, + "Arity": 4, + "Flags": "Loading, NoScript, PubSub, Stale", + "FirstKey": 1, + "LastKey": 1, + "Step": 1, + "AclCategories": "Admin, PubSub, Slow, Garnet" + }, + { + "Command": "CLUSTER_SPUBLISH", + "Name": "CLUSTER|SPUBLISH", + "IsInternal": true, + "Arity": 4, + "Flags": "Loading, NoScript, PubSub, Stale", + "FirstKey": 1, + "LastKey": 1, + "Step": 1, + "AclCategories": "Admin, PubSub, Slow, Garnet", + "KeySpecifications": [ + { + "BeginSearch": { + "TypeDiscriminator": "BeginSearchIndex", + "Index": 1 + }, + "FindKeys": { + "TypeDiscriminator": "FindKeysRange", + "LastKey": 0, + "KeyStep": 1, + "Limit": 0 + }, + "Flags": "RO" + } + ] + }, { "Command": "CLUSTER_SEND_CKPT_FILE_SEGMENT", "Name": "CLUSTER|SEND_CKPT_FILE_SEGMENT", diff --git a/playground/CommandInfoUpdater/SupportedCommand.cs b/playground/CommandInfoUpdater/SupportedCommand.cs index 841b584252..33b49862e9 100644 --- a/playground/CommandInfoUpdater/SupportedCommand.cs +++ b/playground/CommandInfoUpdater/SupportedCommand.cs @@ -83,6 +83,8 @@ public class SupportedCommand new("CLUSTER|MYID", RespCommand.CLUSTER_MYID), new("CLUSTER|MYPARENTID", RespCommand.CLUSTER_MYPARENTID), new("CLUSTER|NODES", RespCommand.CLUSTER_NODES), + new("CLUSTER|PUBLISH", RespCommand.CLUSTER_PUBLISH), + new("CLUSTER|SPUBLISH", RespCommand.CLUSTER_SPUBLISH), new("CLUSTER|REPLICAS", RespCommand.CLUSTER_REPLICAS), new("CLUSTER|REPLICATE", RespCommand.CLUSTER_REPLICATE), new("CLUSTER|RESET", RespCommand.CLUSTER_RESET), @@ -93,7 +95,7 @@ public class SupportedCommand new("CLUSTER|SETSLOTSRANGE", RespCommand.CLUSTER_SETSLOTSRANGE), new("CLUSTER|SHARDS", RespCommand.CLUSTER_SHARDS), new("CLUSTER|SLOTS", RespCommand.CLUSTER_SLOTS), - new("CLUSTER|SLOTSTATE", RespCommand.CLUSTER_SLOTSTATE), + new("CLUSTER|SLOTSTATE", RespCommand.CLUSTER_SLOTSTATE) ]), new("COMMAND", RespCommand.COMMAND, [ @@ -266,11 +268,13 @@ public class SupportedCommand new("SMISMEMBER", RespCommand.SMISMEMBER), new("SMOVE", RespCommand.SMOVE), new("SPOP", RespCommand.SPOP), + new("SPUBLISH", RespCommand.SPUBLISH), new("SRANDMEMBER", RespCommand.SRANDMEMBER), new("SREM", RespCommand.SREM), new("SSCAN", RespCommand.SSCAN), new("STRLEN", RespCommand.STRLEN), new("SUBSCRIBE", RespCommand.SUBSCRIBE), + new("SSUBSCRIBE", RespCommand.SSUBSCRIBE), new("SUBSTR", RespCommand.SUBSTR), new("SUNION", RespCommand.SUNION), new("SUNIONSTORE", RespCommand.SUNIONSTORE), diff --git a/test/Garnet.test/Resp/ACL/RespCommandTests.cs b/test/Garnet.test/Resp/ACL/RespCommandTests.cs index 733aec785c..3bb8fe5149 100644 --- a/test/Garnet.test/Resp/ACL/RespCommandTests.cs +++ b/test/Garnet.test/Resp/ACL/RespCommandTests.cs @@ -13,7 +13,6 @@ using Garnet.server.ACL; using NUnit.Framework; using NUnit.Framework.Legacy; -using StackExchange.Redis; namespace Garnet.test.Resp.ACL { @@ -780,6 +779,64 @@ static async Task DoClientSetInfoAsync(GarnetClient client) } } + [Test] + public async Task SSubscribeACLsAsync() + { + // SUBSCRIBE is sufficient weird that all we care to test is forbidding it + await CheckCommandsAsync( + "SSUBSCRIBE", + [DoSSubscribeAsync], + skipPermitted: true + ); + + static async Task DoSSubscribeAsync(GarnetClient client) + { + try + { + await client.ExecuteForStringResultAsync("SSUBSCRIBE", ["channel"]); + Assert.Fail("Shouldn't be reachable, cluster isn't enabled"); + } + catch (Exception e) + { + if (e.Message == "ERR This instance has cluster support disabled") + { + return; + } + + throw; + } + } + } + + [Test] + public async Task SPublishACLsAsync() + { + // SUBSCRIBE is sufficient weird that all we care to test is forbidding it + await CheckCommandsAsync( + "SPUBLISH", + [DoSPublishAsync], + skipPermitted: true + ); + + static async Task DoSPublishAsync(GarnetClient client) + { + try + { + await client.ExecuteForStringResultAsync("SPUBLISH", ["channel", "message"]); + Assert.Fail("Shouldn't be reachable, cluster isn't enabled"); + } + catch (Exception e) + { + if (e.Message == "ERR This instance has cluster support disabled") + { + return; + } + + throw; + } + } + } + [Test] public async Task ClientUnblockACLsAsync() { @@ -2189,6 +2246,64 @@ static async Task DoClusterSlotStateAsync(GarnetClient client) } } + [Test] + public async Task ClusterPublishACLsAsync() + { + // All cluster command "success" is a thrown exception, because clustering is disabled + + await CheckCommandsAsync( + "CLUSTER PUBLISH", + [DoClusterPublishAsync] + ); + + static async Task DoClusterPublishAsync(GarnetClient client) + { + try + { + await client.ExecuteForStringResultAsync("CLUSTER", ["PUBLISH", "channel", "message"]); + Assert.Fail("Shouldn't be reachable, cluster isn't enabled"); + } + catch (Exception e) + { + if (e.Message == "ERR This instance has cluster support disabled") + { + return; + } + + throw; + } + } + } + + [Test] + public async Task ClusterSPublishACLsAsync() + { + // All cluster command "success" is a thrown exception, because clustering is disabled + + await CheckCommandsAsync( + "CLUSTER SPUBLISH", + [DoClusterSPublishAsync] + ); + + static async Task DoClusterSPublishAsync(GarnetClient client) + { + try + { + await client.ExecuteForStringResultAsync("CLUSTER", ["SPUBLISH", "channel", "message"]); + Assert.Fail("Shouldn't be reachable, cluster isn't enabled"); + } + catch (Exception e) + { + if (e.Message == "ERR This instance has cluster support disabled") + { + return; + } + + throw; + } + } + } + [Test] public async Task CommandACLsAsync() { From e3da613ef2df405ba3ac1a8d84d7c940007accc3 Mon Sep 17 00:00:00 2001 From: darrenge Date: Thu, 30 Jan 2025 17:56:16 -0800 Subject: [PATCH 2/2] Updated Lua BDN expected Allocated bytes for a few settings. (#984) --- test/BDNPerfTests/BDN_Benchmark_Config.json | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/test/BDNPerfTests/BDN_Benchmark_Config.json b/test/BDNPerfTests/BDN_Benchmark_Config.json index cdff7ac0f0..8e83bec381 100644 --- a/test/BDNPerfTests/BDN_Benchmark_Config.json +++ b/test/BDNPerfTests/BDN_Benchmark_Config.json @@ -122,13 +122,13 @@ "expected_ResetParametersSmall_Native,None": 1984, "expected_ResetParametersLarge_Native,None": 1984, "expected_ConstructSmall_Native,None": 2400, - "expected_ConstructLarge_Native,None": 5000, + "expected_ConstructLarge_Native,None": 5500, "expected_CompileForSessionSmall_Native,None": 2000, "expected_CompileForSessionLarge_Native,None": 1984, "expected_ResetParametersSmall_Tracked,Limit": 1984, "expected_ResetParametersLarge_Tracked,Limit": 1984, "expected_ConstructSmall_Tracked,Limit": 2400, - "expected_ConstructLarge_Tracked,Limit": 4500, + "expected_ConstructLarge_Tracked,Limit": 5500, "expected_CompileForSessionSmall_Tracked,Limit": 1984, "expected_CompileForSessionLarge_Tracked,Limit": 1984, "expected_ResetParametersSmall_Tracked,None": 1984, @@ -154,19 +154,19 @@ "expected_LookupHit_Native,None": 1984, "expected_LookupMiss_Native,None": 1984, "expected_LoadOuterHit_Native,None": 1984, - "expected_LoadInnerHit_Native,None": 1984, + "expected_LoadInnerHit_Native,None": 2350, "expected_LoadMiss_Native,None": 1984, "expected_Digest_Native,None": 1984, "expected_LookupHit_Tracked,Limit": 2000, "expected_LookupMiss_Tracked,Limit": 2000, "expected_LoadOuterHit_Tracked,Limit": 1984, - "expected_LoadInnerHit_Tracked,Limit": 1984, + "expected_LoadInnerHit_Tracked,Limit": 2350, "expected_LoadMiss_Tracked,Limit": 1984, "expected_Digest_Tracked,Limit": 1984, "expected_LookupHit_Tracked,None": 1984, "expected_LookupMiss_Tracked,None": 1984, "expected_LoadOuterHit_Tracked,None": 1984, - "expected_LoadInnerHit_Tracked,None": 1984, + "expected_LoadInnerHit_Tracked,None": 2350, "expected_LoadMiss_Tracked,None": 1984, "expected_Digest_Tracked,None": 1984 },