Skip to content

Commit

Permalink
Topology Server Locking Refactor (vitessio#16005)
Browse files Browse the repository at this point in the history
Signed-off-by: Manan Gupta <[email protected]>
  • Loading branch information
GuptaManan100 authored Jun 12, 2024
1 parent 999001b commit 3f2d096
Show file tree
Hide file tree
Showing 17 changed files with 711 additions and 605 deletions.
83 changes: 83 additions & 0 deletions go/test/endtoend/topotest/consul/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ import (
"testing"
"time"

topoutils "vitess.io/vitess/go/test/endtoend/topotest/utils"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/topo"

"github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -140,6 +142,87 @@ func TestTopoRestart(t *testing.T) {
}
}

// TestShardLocking tests that shard locking works as intended.
func TestShardLocking(t *testing.T) {
// create topo server connection
ts, err := topo.OpenServer(*clusterInstance.TopoFlavorString(), clusterInstance.VtctlProcess.TopoGlobalAddress, clusterInstance.VtctlProcess.TopoGlobalRoot)
require.NoError(t, err)

// Acquire a shard lock.
ctx, unlock, err := ts.LockShard(context.Background(), KeyspaceName, "0", "TestShardLocking")
require.NoError(t, err)
// Check that we can't reacquire it from the same context.
_, _, err = ts.LockShard(ctx, KeyspaceName, "0", "TestShardLocking")
require.ErrorContains(t, err, "lock for shard customer/0 is already held")
// Also check that TryLockShard is non-blocking and returns an error.
_, _, err = ts.TryLockShard(context.Background(), KeyspaceName, "0", "TestShardLocking")
require.ErrorContains(t, err, "node already exists: lock already exists at path keyspaces/customer/shards/0")
// Check that CheckShardLocked doesn't return an error.
err = topo.CheckShardLocked(ctx, KeyspaceName, "0")
require.NoError(t, err)

// We'll now try to acquire the lock from a different thread.
secondThreadLockAcquired := false
go func() {
_, unlock, err := ts.LockShard(context.Background(), KeyspaceName, "0", "TestShardLocking")
defer unlock(&err)
require.NoError(t, err)
secondThreadLockAcquired = true
}()

// Wait for some time and ensure that the second acquiring of lock shard is blocked.
time.Sleep(100 * time.Millisecond)
require.False(t, secondThreadLockAcquired)

// Unlock the shard.
unlock(&err)
// Check that we no longer have shard lock acquired.
err = topo.CheckShardLocked(ctx, KeyspaceName, "0")
require.ErrorContains(t, err, "shard customer/0 is not locked (no lockInfo in map)")

// Wait to see that the second thread was able to acquire the shard lock.
topoutils.WaitForBoolValue(t, &secondThreadLockAcquired, true)
}

// TestKeyspaceLocking tests that keyspace locking works as intended.
func TestKeyspaceLocking(t *testing.T) {
// create topo server connection
ts, err := topo.OpenServer(*clusterInstance.TopoFlavorString(), clusterInstance.VtctlProcess.TopoGlobalAddress, clusterInstance.VtctlProcess.TopoGlobalRoot)
require.NoError(t, err)

// Acquire a keyspace lock.
ctx, unlock, err := ts.LockKeyspace(context.Background(), KeyspaceName, "TestKeyspaceLocking")
require.NoError(t, err)
// Check that we can't reacquire it from the same context.
_, _, err = ts.LockKeyspace(ctx, KeyspaceName, "TestKeyspaceLocking")
require.ErrorContains(t, err, "lock for keyspace customer is already held")
// Check that CheckKeyspaceLocked doesn't return an error.
err = topo.CheckKeyspaceLocked(ctx, KeyspaceName)
require.NoError(t, err)

// We'll now try to acquire the lock from a different thread.
secondThreadLockAcquired := false
go func() {
_, unlock, err := ts.LockKeyspace(context.Background(), KeyspaceName, "TestKeyspaceLocking")
defer unlock(&err)
require.NoError(t, err)
secondThreadLockAcquired = true
}()

// Wait for some time and ensure that the second acquiring of lock shard is blocked.
time.Sleep(100 * time.Millisecond)
require.False(t, secondThreadLockAcquired)

// Unlock the keyspace.
unlock(&err)
// Check that we no longer have keyspace lock acquired.
err = topo.CheckKeyspaceLocked(ctx, KeyspaceName)
require.ErrorContains(t, err, "keyspace customer is not locked (no lockInfo in map)")

// Wait to see that the second thread was able to acquire the shard lock.
topoutils.WaitForBoolValue(t, &secondThreadLockAcquired, true)
}

func execute(t *testing.T, conn *mysql.Conn, query string) *sqltypes.Result {
t.Helper()
qr, err := conn.ExecuteFetch(query, 1000, true)
Expand Down
86 changes: 86 additions & 0 deletions go/test/endtoend/topotest/etcd2/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ import (
"testing"
"time"

topoutils "vitess.io/vitess/go/test/endtoend/topotest/utils"
"vitess.io/vitess/go/test/endtoend/utils"
"vitess.io/vitess/go/vt/topo"

"vitess.io/vitess/go/vt/log"

Expand Down Expand Up @@ -111,10 +113,94 @@ func TestTopoDownServingQuery(t *testing.T) {
execMulti(t, conn, `insert into t1(c1, c2, c3, c4) values (300,100,300,'abc'); ;; insert into t1(c1, c2, c3, c4) values (301,101,301,'abcd');;`)
utils.AssertMatches(t, conn, `select c1,c2,c3 from t1`, `[[INT64(300) INT64(100) INT64(300)] [INT64(301) INT64(101) INT64(301)]]`)
clusterInstance.TopoProcess.TearDown(clusterInstance.Cell, clusterInstance.OriginalVTDATAROOT, clusterInstance.CurrentVTDATAROOT, true, *clusterInstance.TopoFlavorString())
defer func() {
_ = clusterInstance.TopoProcess.SetupEtcd()
}()
time.Sleep(3 * time.Second)
utils.AssertMatches(t, conn, `select c1,c2,c3 from t1`, `[[INT64(300) INT64(100) INT64(300)] [INT64(301) INT64(101) INT64(301)]]`)
}

// TestShardLocking tests that shard locking works as intended.
func TestShardLocking(t *testing.T) {
// create topo server connection
ts, err := topo.OpenServer(*clusterInstance.TopoFlavorString(), clusterInstance.VtctlProcess.TopoGlobalAddress, clusterInstance.VtctlProcess.TopoGlobalRoot)
require.NoError(t, err)

// Acquire a shard lock.
ctx, unlock, err := ts.LockShard(context.Background(), KeyspaceName, "0", "TestShardLocking")
require.NoError(t, err)
// Check that we can't reacquire it from the same context.
_, _, err = ts.LockShard(ctx, KeyspaceName, "0", "TestShardLocking")
require.ErrorContains(t, err, "lock for shard customer/0 is already held")
// Also check that TryLockShard is non-blocking and returns an error.
_, _, err = ts.TryLockShard(context.Background(), KeyspaceName, "0", "TestShardLocking")
require.ErrorContains(t, err, "node already exists: lock already exists at path keyspaces/customer/shards/0")
// Check that CheckShardLocked doesn't return an error.
err = topo.CheckShardLocked(ctx, KeyspaceName, "0")
require.NoError(t, err)

// We'll now try to acquire the lock from a different thread.
secondThreadLockAcquired := false
go func() {
_, unlock, err := ts.LockShard(context.Background(), KeyspaceName, "0", "TestShardLocking")
defer unlock(&err)
require.NoError(t, err)
secondThreadLockAcquired = true
}()

// Wait for some time and ensure that the second acquiring of lock shard is blocked.
time.Sleep(100 * time.Millisecond)
require.False(t, secondThreadLockAcquired)

// Unlock the shard.
unlock(&err)
// Check that we no longer have shard lock acquired.
err = topo.CheckShardLocked(ctx, KeyspaceName, "0")
require.ErrorContains(t, err, "shard customer/0 is not locked (no lockInfo in map)")

// Wait to see that the second thread was able to acquire the shard lock.
topoutils.WaitForBoolValue(t, &secondThreadLockAcquired, true)
}

// TestKeyspaceLocking tests that keyspace locking works as intended.
func TestKeyspaceLocking(t *testing.T) {
// create topo server connection
ts, err := topo.OpenServer(*clusterInstance.TopoFlavorString(), clusterInstance.VtctlProcess.TopoGlobalAddress, clusterInstance.VtctlProcess.TopoGlobalRoot)
require.NoError(t, err)

// Acquire a keyspace lock.
ctx, unlock, err := ts.LockKeyspace(context.Background(), KeyspaceName, "TestKeyspaceLocking")
require.NoError(t, err)
// Check that we can't reacquire it from the same context.
_, _, err = ts.LockKeyspace(ctx, KeyspaceName, "TestKeyspaceLocking")
require.ErrorContains(t, err, "lock for keyspace customer is already held")
// Check that CheckKeyspaceLocked doesn't return an error.
err = topo.CheckKeyspaceLocked(ctx, KeyspaceName)
require.NoError(t, err)

// We'll now try to acquire the lock from a different thread.
secondThreadLockAcquired := false
go func() {
_, unlock, err := ts.LockKeyspace(context.Background(), KeyspaceName, "TestKeyspaceLocking")
defer unlock(&err)
require.NoError(t, err)
secondThreadLockAcquired = true
}()

// Wait for some time and ensure that the second acquiring of lock shard is blocked.
time.Sleep(100 * time.Millisecond)
require.False(t, secondThreadLockAcquired)

// Unlock the keyspace.
unlock(&err)
// Check that we no longer have keyspace lock acquired.
err = topo.CheckKeyspaceLocked(ctx, KeyspaceName)
require.ErrorContains(t, err, "keyspace customer is not locked (no lockInfo in map)")

// Wait to see that the second thread was able to acquire the shard lock.
topoutils.WaitForBoolValue(t, &secondThreadLockAcquired, true)
}

func execMulti(t *testing.T, conn *mysql.Conn, query string) []*sqltypes.Result {
t.Helper()
var res []*sqltypes.Result
Expand Down
41 changes: 41 additions & 0 deletions go/test/endtoend/topotest/utils/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
Copyright 2024 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package utils

import (
"testing"
"time"

"github.com/stretchr/testify/require"
)

// WaitForBoolValue takes a pointer to a boolean and waits for it to reach a certain value.
func WaitForBoolValue(t *testing.T, val *bool, waitFor bool) {
timeout := time.After(15 * time.Second)
for {
select {
case <-timeout:
require.Failf(t, "Failed waiting for bool value", "Timed out waiting for the boolean to become %v", waitFor)
return
default:
if *val == waitFor {
return
}
time.Sleep(100 * time.Millisecond)
}
}
}
83 changes: 83 additions & 0 deletions go/test/endtoend/topotest/zk2/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ import (
"testing"
"time"

topoutils "vitess.io/vitess/go/test/endtoend/topotest/utils"
"vitess.io/vitess/go/test/endtoend/utils"
"vitess.io/vitess/go/vt/topo"

"vitess.io/vitess/go/vt/log"

Expand Down Expand Up @@ -116,6 +118,87 @@ func TestTopoDownServingQuery(t *testing.T) {
utils.AssertMatches(t, conn, `select c1,c2,c3 from t1`, `[[INT64(300) INT64(100) INT64(300)] [INT64(301) INT64(101) INT64(301)]]`)
}

// TestShardLocking tests that shard locking works as intended.
func TestShardLocking(t *testing.T) {
// create topo server connection
ts, err := topo.OpenServer(*clusterInstance.TopoFlavorString(), clusterInstance.VtctlProcess.TopoGlobalAddress, clusterInstance.VtctlProcess.TopoGlobalRoot)
require.NoError(t, err)

// Acquire a shard lock.
ctx, unlock, err := ts.LockShard(context.Background(), KeyspaceName, "0", "TestShardLocking")
require.NoError(t, err)
// Check that we can't reacquire it from the same context.
_, _, err = ts.LockShard(ctx, KeyspaceName, "0", "TestShardLocking")
require.ErrorContains(t, err, "lock for shard customer/0 is already held")
// Also check that TryLockShard is non-blocking and returns an error.
_, _, err = ts.TryLockShard(context.Background(), KeyspaceName, "0", "TestShardLocking")
require.ErrorContains(t, err, "node already exists: lock already exists at path keyspaces/customer/shards/0")
// Check that CheckShardLocked doesn't return an error.
err = topo.CheckShardLocked(ctx, KeyspaceName, "0")
require.NoError(t, err)

// We'll now try to acquire the lock from a different thread.
secondThreadLockAcquired := false
go func() {
_, unlock, err := ts.LockShard(context.Background(), KeyspaceName, "0", "TestShardLocking")
defer unlock(&err)
require.NoError(t, err)
secondThreadLockAcquired = true
}()

// Wait for some time and ensure that the second acquiring of lock shard is blocked.
time.Sleep(100 * time.Millisecond)
require.False(t, secondThreadLockAcquired)

// Unlock the shard.
unlock(&err)
// Check that we no longer have shard lock acquired.
err = topo.CheckShardLocked(ctx, KeyspaceName, "0")
require.ErrorContains(t, err, "shard customer/0 is not locked (no lockInfo in map)")

// Wait to see that the second thread was able to acquire the shard lock.
topoutils.WaitForBoolValue(t, &secondThreadLockAcquired, true)
}

// TestKeyspaceLocking tests that keyspace locking works as intended.
func TestKeyspaceLocking(t *testing.T) {
// create topo server connection
ts, err := topo.OpenServer(*clusterInstance.TopoFlavorString(), clusterInstance.VtctlProcess.TopoGlobalAddress, clusterInstance.VtctlProcess.TopoGlobalRoot)
require.NoError(t, err)

// Acquire a keyspace lock.
ctx, unlock, err := ts.LockKeyspace(context.Background(), KeyspaceName, "TestKeyspaceLocking")
require.NoError(t, err)
// Check that we can't reacquire it from the same context.
_, _, err = ts.LockKeyspace(ctx, KeyspaceName, "TestKeyspaceLocking")
require.ErrorContains(t, err, "lock for keyspace customer is already held")
// Check that CheckKeyspaceLocked doesn't return an error.
err = topo.CheckKeyspaceLocked(ctx, KeyspaceName)
require.NoError(t, err)

// We'll now try to acquire the lock from a different thread.
secondThreadLockAcquired := false
go func() {
_, unlock, err := ts.LockKeyspace(context.Background(), KeyspaceName, "TestKeyspaceLocking")
defer unlock(&err)
require.NoError(t, err)
secondThreadLockAcquired = true
}()

// Wait for some time and ensure that the second acquiring of lock shard is blocked.
time.Sleep(100 * time.Millisecond)
require.False(t, secondThreadLockAcquired)

// Unlock the keyspace.
unlock(&err)
// Check that we no longer have keyspace lock acquired.
err = topo.CheckKeyspaceLocked(ctx, KeyspaceName)
require.ErrorContains(t, err, "keyspace customer is not locked (no lockInfo in map)")

// Wait to see that the second thread was able to acquire the shard lock.
topoutils.WaitForBoolValue(t, &secondThreadLockAcquired, true)
}

func execMulti(t *testing.T, conn *mysql.Conn, query string) []*sqltypes.Result {
t.Helper()
var res []*sqltypes.Result
Expand Down
2 changes: 1 addition & 1 deletion go/vt/schemamanager/tablet_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -475,7 +475,7 @@ func (exec *TabletExecutor) Execute(ctx context.Context, sqls []string) *Execute
}
for index, sql := range sqls {
// Attempt to renew lease:
if err := rl.Do(func() error { return topo.CheckKeyspaceLockedAndRenew(ctx, exec.keyspace) }); err != nil {
if err := rl.Do(func() error { return topo.CheckKeyspaceLocked(ctx, exec.keyspace) }); err != nil {
return errorExecResult(vterrors.Wrapf(err, "CheckKeyspaceLocked in ApplySchemaKeyspace %v", exec.keyspace))
}
execResult.CurSQLIndex = index
Expand Down
Loading

0 comments on commit 3f2d096

Please sign in to comment.