Skip to content

Commit

Permalink
Make traffic robustness test use kubernetes.Interface
Browse files Browse the repository at this point in the history
Signed-off-by: Aleksander Mistewicz <[email protected]>
  • Loading branch information
AwesomePatrol committed Jan 7, 2025
1 parent d77bb5a commit 65f0888
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 86 deletions.
63 changes: 58 additions & 5 deletions tests/robustness/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ type RecordingClient struct {
kvOperations *model.AppendableHistory
}

var _ clientv3.KV = (*RecordingClient)(nil)

type TimedWatchEvent struct {
model.WatchEvent
Time time.Duration
Expand Down Expand Up @@ -81,7 +83,16 @@ func (c *RecordingClient) Report() report.ClientReport {
}
}

func (c *RecordingClient) Get(ctx context.Context, key string, revision int64) (kv *mvccpb.KeyValue, rev int64, err error) {
func (c *RecordingClient) Do(ctx context.Context, op clientv3.Op) (clientv3.OpResponse, error) {
panic("not implemented")
}

func (c *RecordingClient) Get(ctx context.Context, key string, opts ...clientv3.OpOption) (*clientv3.GetResponse, error) {
revision := clientv3.OpGet(key, opts...).Rev()
return c.Range(ctx, key, "", revision, 0)
}

func (c *RecordingClient) OldGet(ctx context.Context, key string, revision int64) (kv *mvccpb.KeyValue, rev int64, err error) {
resp, err := c.Range(ctx, key, "", revision, 0)
if err != nil {
return nil, 0, err
Expand Down Expand Up @@ -112,7 +123,7 @@ func (c *RecordingClient) Range(ctx context.Context, start, end string, revision
return resp, err
}

func (c *RecordingClient) Put(ctx context.Context, key, value string) (*clientv3.PutResponse, error) {
func (c *RecordingClient) Put(ctx context.Context, key, value string, _ ...clientv3.OpOption) (*clientv3.PutResponse, error) {
c.kvMux.Lock()
defer c.kvMux.Unlock()
callTime := time.Since(c.baseTime)
Expand All @@ -122,7 +133,7 @@ func (c *RecordingClient) Put(ctx context.Context, key, value string) (*clientv3
return resp, err
}

func (c *RecordingClient) Delete(ctx context.Context, key string) (*clientv3.DeleteResponse, error) {
func (c *RecordingClient) Delete(ctx context.Context, key string, _ ...clientv3.OpOption) (*clientv3.DeleteResponse, error) {
c.kvMux.Lock()
defer c.kvMux.Unlock()
callTime := time.Since(c.baseTime)
Expand All @@ -132,7 +143,49 @@ func (c *RecordingClient) Delete(ctx context.Context, key string) (*clientv3.Del
return resp, err
}

func (c *RecordingClient) Txn(ctx context.Context, conditions []clientv3.Cmp, onSuccess []clientv3.Op, onFailure []clientv3.Op) (*clientv3.TxnResponse, error) {
type wrappedTxn struct {
txn clientv3.Txn
conditions []clientv3.Cmp
onSuccess []clientv3.Op
onFailure []clientv3.Op
c *RecordingClient
}

var _ clientv3.Txn = (*wrappedTxn)(nil)

func (w *wrappedTxn) If(cs ...clientv3.Cmp) clientv3.Txn {
w.conditions = append(w.conditions, cs...)
w.txn = w.txn.If(cs...)
return w
}

func (w *wrappedTxn) Then(ops ...clientv3.Op) clientv3.Txn {
w.onSuccess = append(w.onSuccess, ops...)
w.txn = w.txn.Then(ops...)
return w
}

func (w *wrappedTxn) Else(ops ...clientv3.Op) clientv3.Txn {
w.onFailure = append(w.onFailure, ops...)
w.txn = w.txn.Else(ops...)
return w
}

func (w *wrappedTxn) Commit() (*clientv3.TxnResponse, error) {
w.c.kvMux.Lock()
defer w.c.kvMux.Unlock()
callTime := time.Since(w.c.baseTime)
resp, err := w.txn.Commit()
returnTime := time.Since(w.c.baseTime)
w.c.kvOperations.AppendTxn(w.conditions, w.onSuccess, w.onFailure, callTime, returnTime, resp, err)
return resp, err
}

func (c *RecordingClient) Txn(ctx context.Context) clientv3.Txn {
return &wrappedTxn{txn: c.client.Txn(ctx), c: c}
}

func (c *RecordingClient) OldTxn(ctx context.Context, conditions []clientv3.Cmp, onSuccess []clientv3.Op, onFailure []clientv3.Op) (*clientv3.TxnResponse, error) {
txn := c.client.Txn(ctx).If(
conditions...,
).Then(
Expand Down Expand Up @@ -190,7 +243,7 @@ func (c *RecordingClient) Defragment(ctx context.Context) (*clientv3.DefragmentR
return resp, err
}

func (c *RecordingClient) Compact(ctx context.Context, rev int64) (*clientv3.CompactResponse, error) {
func (c *RecordingClient) Compact(ctx context.Context, rev int64, _ ...clientv3.CompactOption) (*clientv3.CompactResponse, error) {
c.kvMux.Lock()
defer c.kvMux.Unlock()
callTime := time.Since(c.baseTime)
Expand Down
2 changes: 1 addition & 1 deletion tests/robustness/failpoint/trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func (t triggerCompact) Trigger(ctx context.Context, _ *testing.T, member e2e.Et

var rev int64
for {
_, rev, err = cc.Get(ctx, "/", 0)
_, rev, err = cc.OldGet(ctx, "/", 0)
if err != nil {
return nil, fmt.Errorf("failed to get revision: %w", err)
}
Expand Down
10 changes: 5 additions & 5 deletions tests/robustness/traffic/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,9 +170,9 @@ func (c etcdTrafficClient) Request(ctx context.Context, request etcdRequestType,
var limit int64
switch request {
case StaleGet:
_, rev, err = c.client.Get(opCtx, c.randomKey(), lastRev)
_, rev, err = c.client.OldGet(opCtx, c.randomKey(), lastRev)
case Get:
_, rev, err = c.client.Get(opCtx, c.randomKey(), 0)
_, rev, err = c.client.OldGet(opCtx, c.randomKey(), 0)
case List:
var resp *clientv3.GetResponse
resp, err = c.client.Range(ctx, c.keyPrefix, clientv3.GetPrefixRangeEnd(c.keyPrefix), 0, limit)
Expand Down Expand Up @@ -205,14 +205,14 @@ func (c etcdTrafficClient) Request(ctx context.Context, request etcdRequestType,
}
case MultiOpTxn:
var resp *clientv3.TxnResponse
resp, err = c.client.Txn(opCtx, nil, c.pickMultiTxnOps(), nil)
resp, err = c.client.OldTxn(opCtx, nil, c.pickMultiTxnOps(), nil)
if resp != nil {
rev = resp.Header.Revision
}
case CompareAndSet:
var kv *mvccpb.KeyValue
key := c.randomKey()
kv, rev, err = c.client.Get(opCtx, key, 0)
kv, rev, err = c.client.OldGet(opCtx, key, 0)
if err == nil {
c.limiter.Wait(ctx)
var expectedRevision int64
Expand All @@ -221,7 +221,7 @@ func (c etcdTrafficClient) Request(ctx context.Context, request etcdRequestType,
}
txnCtx, txnCancel := context.WithTimeout(ctx, RequestTimeout)
var resp *clientv3.TxnResponse
resp, err = c.client.Txn(txnCtx, []clientv3.Cmp{clientv3.Compare(clientv3.ModRevision(key), "=", expectedRevision)}, []clientv3.Op{clientv3.OpPut(key, fmt.Sprintf("%d", c.idProvider.NewRequestID()))}, nil)
resp, err = c.client.OldTxn(txnCtx, []clientv3.Cmp{clientv3.Compare(clientv3.ModRevision(key), "=", expectedRevision)}, []clientv3.Op{clientv3.OpPut(key, fmt.Sprintf("%d", c.idProvider.NewRequestID()))}, nil)
txnCancel()
if resp != nil {
rev = resp.Header.Revision
Expand Down
94 changes: 19 additions & 75 deletions tests/robustness/traffic/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

"go.etcd.io/etcd/api/v3/mvccpb"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/kubernetes"
"go.etcd.io/etcd/pkg/v3/stringutil"
"go.etcd.io/etcd/tests/v3/robustness/client"
"go.etcd.io/etcd/tests/v3/robustness/identity"
Expand Down Expand Up @@ -56,7 +57,7 @@ func (t kubernetesTraffic) ExpectUniqueRevision() bool {
}

func (t kubernetesTraffic) Run(ctx context.Context, c *client.RecordingClient, limiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIDStorage, nonUniqueWriteLimiter ConcurrencyLimiter, finish <-chan struct{}) {
kc := &kubernetesClient{client: c}
kc := kubernetes.Client{KV: c}
s := newStorage()
keyPrefix := "/registry/" + t.resource + "/"
g := errgroup.Group{}
Expand All @@ -75,7 +76,7 @@ func (t kubernetesTraffic) Run(ctx context.Context, c *client.RecordingClient, l
if err != nil {
continue
}
t.Watch(ctx, kc, s, limiter, keyPrefix, rev+1)
t.Watch(ctx, c, s, limiter, keyPrefix, rev+1)
}
})
g.Go(func() error {
Expand Down Expand Up @@ -105,49 +106,47 @@ func (t kubernetesTraffic) Run(ctx context.Context, c *client.RecordingClient, l
g.Wait()
}

func (t kubernetesTraffic) Read(ctx context.Context, kc *kubernetesClient, s *storage, limiter *rate.Limiter, keyPrefix string, limit int) (rev int64, err error) {
rangeEnd := clientv3.GetPrefixRangeEnd(keyPrefix)

func (t kubernetesTraffic) Read(ctx context.Context, kc kubernetes.Interface, s *storage, limiter *rate.Limiter, keyPrefix string, limit int) (rev int64, err error) {
hasMore := true
rangeStart := keyPrefix
var kvs []*mvccpb.KeyValue
var revision int64
var cont string

for hasMore {
readCtx, cancel := context.WithTimeout(ctx, RequestTimeout)
resp, err := kc.Range(readCtx, rangeStart, rangeEnd, revision, int64(limit))
resp, err := kc.List(readCtx, keyPrefix, kubernetes.ListOptions{Continue: cont, Revision: revision, Limit: int64(limit)})
cancel()
if err != nil {
return 0, err
}
limiter.Wait(ctx)

hasMore = resp.More
if len(resp.Kvs) > 0 && hasMore {
rangeStart = string(resp.Kvs[len(resp.Kvs)-1].Key) + "\x00"
}
kvs = append(kvs, resp.Kvs...)
if revision == 0 {
revision = resp.Header.Revision
revision = resp.Revision
}
hasMore = limit > 0 && resp.Count > int64(limit)
if hasMore {
cont = string(kvs[len(kvs)-1].Key)
}
}
s.Reset(revision, kvs)
return revision, nil
}

func (t kubernetesTraffic) Write(ctx context.Context, kc *kubernetesClient, ids identity.Provider, s *storage, limiter *rate.Limiter, nonUniqueWriteLimiter ConcurrencyLimiter) (err error) {
func (t kubernetesTraffic) Write(ctx context.Context, kc kubernetes.Interface, ids identity.Provider, s *storage, limiter *rate.Limiter, nonUniqueWriteLimiter ConcurrencyLimiter) (err error) {
writeCtx, cancel := context.WithTimeout(ctx, RequestTimeout)
defer cancel()
count := s.Count()
if count < t.averageKeyCount/2 {
err = kc.OptimisticCreate(writeCtx, t.generateKey(), fmt.Sprintf("%d", ids.NewRequestID()))
_, err = kc.OptimisticPut(writeCtx, t.generateKey(), []byte(fmt.Sprintf("%d", ids.NewRequestID())), 0, kubernetes.PutOptions{})
} else {
key, rev := s.PickRandom()
if rev == 0 {
return errors.New("storage empty")
}
if count > t.averageKeyCount*3/2 && nonUniqueWriteLimiter.Take() {
_, err = kc.OptimisticDelete(writeCtx, key, rev)
_, err = kc.OptimisticDelete(writeCtx, key, rev, kubernetes.DeleteOptions{})
nonUniqueWriteLimiter.Return()
} else {
shouldReturn := false
Expand All @@ -159,11 +158,11 @@ func (t kubernetesTraffic) Write(ctx context.Context, kc *kubernetesClient, ids
op := random.PickRandom(choices)
switch op {
case KubernetesDelete:
_, err = kc.OptimisticDelete(writeCtx, key, rev)
_, err = kc.OptimisticDelete(writeCtx, key, rev, kubernetes.DeleteOptions{})
case KubernetesUpdate:
_, err = kc.OptimisticUpdate(writeCtx, key, fmt.Sprintf("%d", ids.NewRequestID()), rev)
_, err = kc.OptimisticPut(writeCtx, key, []byte(fmt.Sprintf("%d", ids.NewRequestID())), rev, kubernetes.PutOptions{})
case KubernetesCreate:
err = kc.OptimisticCreate(writeCtx, t.generateKey(), fmt.Sprintf("%d", ids.NewRequestID()))
_, err = kc.OptimisticPut(writeCtx, t.generateKey(), []byte(fmt.Sprintf("%d", ids.NewRequestID())), rev, kubernetes.PutOptions{})
default:
panic(fmt.Sprintf("invalid choice: %q", op))
}
Expand All @@ -188,15 +187,15 @@ func filterOutNonUniqueKubernetesWrites(choices []random.ChoiceWeight[Kubernetes
return resp
}

func (t kubernetesTraffic) Watch(ctx context.Context, kc *kubernetesClient, s *storage, limiter *rate.Limiter, keyPrefix string, revision int64) {
func (t kubernetesTraffic) Watch(ctx context.Context, c *client.RecordingClient, s *storage, limiter *rate.Limiter, keyPrefix string, revision int64) {
watchCtx, cancel := context.WithTimeout(ctx, WatchTimeout)
defer cancel()

// Kubernetes issues Watch requests by requiring a leader to exist
// in the cluster:
// https://github.com/kubernetes/kubernetes/blob/2016fab3085562b4132e6d3774b6ded5ba9939fd/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go#L872
watchCtx = clientv3.WithRequireLeader(watchCtx)
for e := range kc.client.Watch(watchCtx, keyPrefix, revision, true, true, true) {
for e := range c.Watch(watchCtx, keyPrefix, revision, true, true, true) {
s.Update(e)
}
limiter.Wait(ctx)
Expand All @@ -214,61 +213,6 @@ const (
KubernetesCreate KubernetesRequestType = "create"
)

type kubernetesClient struct {
client *client.RecordingClient
}

func (k kubernetesClient) List(ctx context.Context, prefix string, revision, limit int64) (*clientv3.GetResponse, error) {
resp, err := k.client.Range(ctx, prefix, clientv3.GetPrefixRangeEnd(prefix), revision, limit)
if err != nil {
return nil, err
}
return resp, err
}

func (k kubernetesClient) Range(ctx context.Context, start, end string, revision, limit int64) (*clientv3.GetResponse, error) {
return k.client.Range(ctx, start, end, revision, limit)
}

func (k kubernetesClient) OptimisticDelete(ctx context.Context, key string, expectedRevision int64) (*mvccpb.KeyValue, error) {
return k.optimisticOperationOrGet(ctx, key, clientv3.OpDelete(key), expectedRevision)
}

func (k kubernetesClient) OptimisticUpdate(ctx context.Context, key, value string, expectedRevision int64) (*mvccpb.KeyValue, error) {
return k.optimisticOperationOrGet(ctx, key, clientv3.OpPut(key, value), expectedRevision)
}

func (k kubernetesClient) OptimisticCreate(ctx context.Context, key, value string) error {
_, err := k.client.Txn(ctx, []clientv3.Cmp{clientv3.Compare(clientv3.ModRevision(key), "=", 0)}, []clientv3.Op{clientv3.OpPut(key, value)}, nil)
return err
}

func (k kubernetesClient) RequestProgress(ctx context.Context) error {
// Kubernetes makes RequestProgress calls by requiring a leader to be
// present in the cluster:
// https://github.com/kubernetes/kubernetes/blob/2016fab3085562b4132e6d3774b6ded5ba9939fd/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go#L87
return k.client.RequestProgress(clientv3.WithRequireLeader(ctx))
}

// Kubernetes optimistically assumes that key didn't change since it was last observed, so it executes operations within a transaction conditioned on key not changing.
// However, if the keys value changed it wants imminently to read it, thus the Get operation on failure.
func (k kubernetesClient) optimisticOperationOrGet(ctx context.Context, key string, operation clientv3.Op, expectedRevision int64) (*mvccpb.KeyValue, error) {
resp, err := k.client.Txn(ctx, []clientv3.Cmp{clientv3.Compare(clientv3.ModRevision(key), "=", expectedRevision)}, []clientv3.Op{operation}, []clientv3.Op{clientv3.OpGet(key)})
if err != nil {
return nil, err
}
if !resp.Succeeded {
getResp := (*clientv3.GetResponse)(resp.Responses[0].GetResponseRange())
if err != nil || len(getResp.Kvs) == 0 {
return nil, err
}
if len(getResp.Kvs) == 1 {
return getResp.Kvs[0], err
}
}
return nil, err
}

type storage struct {
mux sync.RWMutex
keyRevision map[string]int64
Expand Down

0 comments on commit 65f0888

Please sign in to comment.