Skip to content

Commit

Permalink
Fix(e2e): run a mysql client in kind to test graceful shutdown case (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
fgksgf authored Dec 23, 2024
1 parent 56a1888 commit 43de8f5
Show file tree
Hide file tree
Showing 8 changed files with 227 additions and 91 deletions.
134 changes: 134 additions & 0 deletions cmd/testing-workload/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
"context"
"database/sql"
"errors"
"flag"
"fmt"
"sync"
"sync/atomic"
"time"

_ "github.com/go-sql-driver/mysql"
)

var (
host string
durationInMinutes int
maxConnections int
sleepIntervalSec int
longTxnSleepSec int
)

//nolint:mnd // default values
func main() {
flag.StringVar(&host, "host", "", "host")
flag.IntVar(&durationInMinutes, "duration", 10, "duration in minutes")
flag.IntVar(&maxConnections, "max-connections", 30, "max connections")
flag.IntVar(&sleepIntervalSec, "sleep-interval", 1, "sleep interval in seconds")
flag.IntVar(&longTxnSleepSec, "long-txn-sleep", 10, "how many seconds to sleep to simulate a long transaction")
flag.Parse()

db, err := sql.Open("mysql", fmt.Sprintf("root:@(%s:4000)/test?charset=utf8mb4", host))
if err != nil {
panic(err)
}
if err = db.Ping(); err != nil {
panic(err)
}
defer db.Close()
db.SetConnMaxLifetime(time.Minute)
db.SetMaxIdleConns(maxConnections)
db.SetMaxOpenConns(maxConnections)

table := "test.e2e_test"
str := fmt.Sprintf("create table if not exists %s(id int primary key auto_increment, v int);", table)
_, err = db.Exec(str)
if err != nil {
panic(err)
}

var totalCount, failCount atomic.Uint64
var wg sync.WaitGroup
clientCtx, cancel := context.WithTimeout(context.Background(), time.Duration(durationInMinutes)*time.Minute)
defer cancel()

for i := 1; i <= maxConnections; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
for {
select {
case <-clientCtx.Done():
return
default:
err := executeSimpleTransaction(db, id, table)
totalCount.Add(1)
if err != nil {
fmt.Printf("[%d-%s] failed to execute simple transaction(long: %v): %v\n", id, time.Now().String(), id%3 == 0, err)
failCount.Add(1)
}
time.Sleep(time.Duration(sleepIntervalSec) * time.Second)
}
}
}(i)
}
wg.Wait()
fmt.Printf("total count: %d, fail count: %d\n", totalCount.Load(), failCount.Load())
if failCount.Load() > 0 {
panic("there are failed transactions")
}
}

// ExecuteSimpleTransaction performs a transaction to insert or update the given id in the specified table.
func executeSimpleTransaction(db *sql.DB, id int, table string) error {
tx, err := db.Begin()
if err != nil {
return fmt.Errorf("failed to begin txn: %w", err)
}
defer func() {
if r := recover(); r != nil {
_ = tx.Rollback()
}
}()

// Prepare SQL statement to replace or insert a record
//nolint:gosec // only for testing
str := fmt.Sprintf("replace into %s(id, v) values(?, ?);", table)
if _, err = tx.Exec(str, id, id); err != nil {
_ = tx.Rollback()
return fmt.Errorf("failed to exec statement: %w", err)
}

// Simulate a different operation by updating the value
if _, err = tx.Exec(fmt.Sprintf("update %s set v = ? where id = ?;", table), id*2, id); err != nil {
_ = tx.Rollback()
return fmt.Errorf("failed to exec update statement: %w", err)
}

// Simulate a long transaction by sleeping for 10 seconds
if id%3 == 0 {
time.Sleep(time.Duration(longTxnSleepSec) * time.Second)
}

// Commit the transaction
if err = tx.Commit(); err != nil && !errors.Is(err, sql.ErrTxDone) {
return fmt.Errorf("failed to commit txn: %w", err)
}
return nil
}
2 changes: 1 addition & 1 deletion hack/lib/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ function build::all() {
shift
done
if [[ ${#targets[@]} -eq 0 ]]; then
targets=("operator" "prestop-checker")
targets=("operator" "prestop-checker" "testing-workload")
fi

local platforms
Expand Down
2 changes: 1 addition & 1 deletion hack/lib/e2e.sh
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ function e2e::prepare() {
e2e::install_rbac

# build the operator image and load it into the kind cluster
image::build prestop-checker operator --push
image::build prestop-checker operator testing-workload --push
e2e::uninstall_operator
e2e::install_operator

Expand Down
14 changes: 13 additions & 1 deletion image/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ USER 65532:65532

ENTRYPOINT ["/operator"]


FROM --platform=$TARGETPLATFORM ghcr.io/pingcap-qe/bases/pingcap-base:v1.9.2 AS prestop-checker

ARG TARGETPLATFORM
Expand All @@ -50,3 +49,16 @@ COPY --from=builder ./_output/$TARGETPLATFORM/bin/prestop-checker prestop-checke
USER 65532:65532

ENTRYPOINT ["/prestop-checker"]

FROM --platform=$TARGETPLATFORM ghcr.io/pingcap-qe/bases/pingcap-base:v1.9.2 AS testing-workload

ARG TARGETPLATFORM

WORKDIR /

COPY --from=builder ./_output/$TARGETPLATFORM/bin/testing-workload testing-workload

# nonroot user of distroless
USER 65532:65532

ENTRYPOINT ["/testing-workload"]
2 changes: 1 addition & 1 deletion pkg/configs/tidb/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ const (
// defaultGracefulWaitBeforeShutdownInSeconds is the default value of the tidb config `graceful-wait-before-shutdown`,
// which is set by the operator if not set by the user, for graceful shutdown.
// Note that the default value is zero in tidb-server.
defaultGracefulWaitBeforeShutdownInSeconds = 30
defaultGracefulWaitBeforeShutdownInSeconds = 60
)

// Config is a subset config of tidb
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/tidb/tasks/cm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func TestConfigMap(t *testing.T) {
},
Data: map[string]string{
v1alpha1.ConfigFileName: `advertise-address = 'test-tidb-tidb.subdomain.default.svc'
graceful-wait-before-shutdown = 30
graceful-wait-before-shutdown = 60
host = '::'
path = 'test-pd.default:2379'
store = 'tikv'
Expand Down
125 changes: 72 additions & 53 deletions tests/e2e/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,25 @@
package cluster

import (
"bytes"
"context"
"database/sql"
"fmt"
"io"
"slices"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/Masterminds/semver/v3"
_ "github.com/go-sql-driver/mysql"

//nolint: stylecheck // too many changes, refactor later
. "github.com/onsi/ginkgo/v2"
//nolint: stylecheck // too many changes, refactor later
. "github.com/onsi/gomega"

"github.com/Masterminds/semver/v3"
_ "github.com/go-sql-driver/mysql"
appsv1 "k8s.io/api/apps/v1"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
Expand Down Expand Up @@ -1642,77 +1644,94 @@ location-labels = ["region", "zone", "host"]`
Expect(k8sClient.Create(ctx, dbg)).To(Succeed())

By("Waiting for the cluster to be ready")
// TODO: extract it to a common utils
svcName := dbg.Name + "-tidb"
var clusterIP string
Eventually(func(g Gomega) {
_, ready := utiltidb.IsClusterReady(k8sClient, tc.Name, tc.Namespace)
g.Expect(ready).To(BeTrue())

g.Expect(utiltidb.AreAllInstancesReady(k8sClient, pdg,
[]*v1alpha1.TiKVGroup{kvg}, []*v1alpha1.TiDBGroup{dbg}, nil)).To(Succeed())
g.Expect(utiltidb.IsTiDBConnectable(ctx, k8sClient, fw,
tc.Namespace, tc.Name, dbg.Name, "root", "", "")).To(Succeed())
svc, err := clientSet.CoreV1().Services(tc.Namespace).Get(ctx, svcName, metav1.GetOptions{})
g.Expect(err).To(BeNil())
clusterIP = svc.Spec.ClusterIP
}).WithTimeout(createClusterTimeout).WithPolling(createClusterPolling).Should(Succeed())

By("Connect to the TiDB cluster to run transactions")
// TODO: extract it to a common utils
svcName := dbg.Name + "-tidb"
dsn, cancel, err := utiltidb.PortForwardAndGetTiDBDSN(fw, tc.Namespace, svcName, "root", "", "test", "charset=utf8mb4")
Expect(err).To(BeNil())
defer cancel()
db, err := sql.Open("mysql", dsn)
Expect(err).To(BeNil())
defer db.Close()
maxConn := 30
db.SetMaxIdleConns(maxConn)
db.SetMaxOpenConns(maxConn)

table := "test.e2e_test"
str := fmt.Sprintf("create table if not exists %s(id int primary key auto_increment, v int);", table)
_, err = db.Exec(str)
By("Create a Job to connect to the TiDB cluster to run transactions")
jobName := "testing-workload-job"
job := &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: jobName,
Namespace: tc.Namespace,
},
Spec: batchv1.JobSpec{
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"app": jobName,
},
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "testing-workload",
Image: "pingcap/testing-workload:latest",
Args: []string{
"--host", clusterIP,
"--duration", "8",
"--max-connections", "30",
},
ImagePullPolicy: corev1.PullIfNotPresent,
},
},
RestartPolicy: corev1.RestartPolicyNever,
},
},
BackoffLimit: ptr.To[int32](0),
},
}
_, err := clientSet.BatchV1().Jobs(tc.Namespace).Create(ctx, job, metav1.CreateOptions{})
Expect(err).To(BeNil())

var totalCount, failCount atomic.Uint64
var wg sync.WaitGroup
clientCtx, cancel2 := context.WithCancel(ctx)
defer cancel2()
for i := 0; i < maxConn; i++ {
id := i
wg.Add(1)
go func(db *sql.DB) {
defer wg.Done()
for {
select {
case <-clientCtx.Done():
return
default:
err := utiltidb.ExecuteSimpleTransaction(db, id, table)
totalCount.Add(1)
if err != nil {
failCount.Add(1)
}
time.Sleep(50 * time.Millisecond) //nolint:mnd // easy to understand
}
}
}(db)
}
By("Ensure the job pod is running")
var jobPodName string
Eventually(func(g Gomega) {
pods, err := clientSet.CoreV1().Pods(tc.Namespace).List(ctx, metav1.ListOptions{
LabelSelector: fmt.Sprintf("app=%s", jobName),
})
g.Expect(err).To(BeNil())
g.Expect(len(pods.Items)).To(Equal(1))
g.Expect(pods.Items[0].Status.Phase).To(Equal(corev1.PodRunning))
jobPodName = pods.Items[0].Name
}).WithTimeout(time.Minute).WithPolling(createClusterPolling).Should(Succeed())

By("Rolling restart TiDB")
var dbgGet v1alpha1.TiDBGroup
Expect(k8sClient.Get(ctx, client.ObjectKey{Namespace: tc.Namespace, Name: dbg.Name}, &dbgGet)).To(Succeed())
dbgGet.Spec.Template.Spec.Config = logLevelConfig
dbgGet.Spec.Template.Spec.Config = "log.level = 'warn'"
Expect(k8sClient.Update(ctx, &dbgGet)).To(Succeed())

Eventually(func(g Gomega) {
_, ready := utiltidb.IsClusterReady(k8sClient, tc.Name, tc.Namespace)
g.Expect(ready).To(BeTrue())
g.Expect(utiltidb.AreAllInstancesReady(k8sClient, pdg,
[]*v1alpha1.TiKVGroup{kvg}, []*v1alpha1.TiDBGroup{dbg}, nil)).To(Succeed())
jobGet, err := clientSet.BatchV1().Jobs(tc.Namespace).Get(ctx, jobName, metav1.GetOptions{})
g.Expect(err).To(BeNil())
if jobGet.Status.Failed > 0 {
// print the logs if the job failed
req := clientSet.CoreV1().Pods(tc.Namespace).GetLogs(jobPodName, &corev1.PodLogOptions{})
podLogs, err := req.Stream(ctx)
g.Expect(err).To(BeNil())
defer podLogs.Close()

g.Expect(totalCount.Load()).To(BeNumerically(">", 0))
buf := new(bytes.Buffer)
_, _ = io.Copy(buf, podLogs)
GinkgoWriter.Println(buf.String())
Fail("job failed")
}
g.Expect(jobGet.Status.Succeeded).To(BeNumerically("==", 1))
}).WithTimeout(createClusterTimeout).WithPolling(createClusterPolling).Should(Succeed())
GinkgoWriter.Printf("total count: %d, fail count: %d\n", totalCount.Load(), failCount.Load())
Expect(failCount.Load()).To(BeZero())
cancel2()
wg.Wait()
})
})

Expand Down
Loading

0 comments on commit 43de8f5

Please sign in to comment.