diff --git a/server/auth/store.go b/server/auth/store.go index 19dd7e738ad..2843de8cd29 100644 --- a/server/auth/store.go +++ b/server/auth/store.go @@ -30,6 +30,7 @@ import ( pb "go.etcd.io/etcd/api/v3/etcdserverpb" "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" "go.etcd.io/etcd/server/v3/mvcc/backend" + "go.etcd.io/etcd/server/v3/mvcc/buckets" "go.uber.org/zap" "golang.org/x/crypto/bcrypt" @@ -45,10 +46,6 @@ var ( revisionKey = []byte("authRevision") - authBucketName = []byte("auth") - authUsersBucketName = []byte("authUsers") - authRolesBucketName = []byte("authRoles") - ErrRootUserNotExist = errors.New("auth: root user does not exist") ErrRootRoleNotExist = errors.New("auth: root user does not have root role") ErrUserAlreadyExist = errors.New("auth: user already exists") @@ -240,7 +237,7 @@ func (as *authStore) AuthEnable() error { return ErrRootRoleNotExist } - tx.UnsafePut(authBucketName, enableFlagKey, authEnabled) + tx.UnsafePut(buckets.Auth, enableFlagKey, authEnabled) as.enabled = true as.tokenProvider.enable() @@ -262,7 +259,7 @@ func (as *authStore) AuthDisable() { b := as.be tx := b.BatchTx() tx.Lock() - tx.UnsafePut(authBucketName, enableFlagKey, authDisabled) + tx.UnsafePut(buckets.Auth, enableFlagKey, authDisabled) as.commitRevision(tx) tx.Unlock() b.ForceCommit() @@ -357,7 +354,7 @@ func (as *authStore) Recover(be backend.Backend) { as.be = be tx := be.BatchTx() tx.Lock() - _, vs := tx.UnsafeRange(authBucketName, enableFlagKey, nil, 0) + _, vs := tx.UnsafeRange(buckets.Auth, enableFlagKey, nil, 0) if len(vs) == 1 { if bytes.Equal(vs[0], authEnabled) { enabled = true @@ -906,7 +903,7 @@ func (as *authStore) IsAdminPermitted(authInfo *AuthInfo) error { } func getUser(lg *zap.Logger, tx backend.BatchTx, username string) *authpb.User { - _, vs := tx.UnsafeRange(authUsersBucketName, []byte(username), nil, 0) + _, vs := tx.UnsafeRange(buckets.AuthUsers, []byte(username), nil, 0) if len(vs) == 0 { return nil } @@ -924,7 +921,7 @@ func getUser(lg *zap.Logger, tx backend.BatchTx, username string) *authpb.User { } func getAllUsers(lg *zap.Logger, tx backend.BatchTx) []*authpb.User { - _, vs := tx.UnsafeRange(authUsersBucketName, []byte{0}, []byte{0xff}, -1) + _, vs := tx.UnsafeRange(buckets.AuthUsers, []byte{0}, []byte{0xff}, -1) if len(vs) == 0 { return nil } @@ -946,15 +943,15 @@ func putUser(lg *zap.Logger, tx backend.BatchTx, user *authpb.User) { if err != nil { lg.Panic("failed to unmarshal 'authpb.User'", zap.Error(err)) } - tx.UnsafePut(authUsersBucketName, user.Name, b) + tx.UnsafePut(buckets.AuthUsers, user.Name, b) } func delUser(tx backend.BatchTx, username string) { - tx.UnsafeDelete(authUsersBucketName, []byte(username)) + tx.UnsafeDelete(buckets.AuthUsers, []byte(username)) } func getRole(lg *zap.Logger, tx backend.BatchTx, rolename string) *authpb.Role { - _, vs := tx.UnsafeRange(authRolesBucketName, []byte(rolename), nil, 0) + _, vs := tx.UnsafeRange(buckets.AuthRoles, []byte(rolename), nil, 0) if len(vs) == 0 { return nil } @@ -968,7 +965,7 @@ func getRole(lg *zap.Logger, tx backend.BatchTx, rolename string) *authpb.Role { } func getAllRoles(lg *zap.Logger, tx backend.BatchTx) []*authpb.Role { - _, vs := tx.UnsafeRange(authRolesBucketName, []byte{0}, []byte{0xff}, -1) + _, vs := tx.UnsafeRange(buckets.AuthRoles, []byte{0}, []byte{0xff}, -1) if len(vs) == 0 { return nil } @@ -995,11 +992,11 @@ func putRole(lg *zap.Logger, tx backend.BatchTx, role *authpb.Role) { ) } - tx.UnsafePut(authRolesBucketName, role.Name, b) + tx.UnsafePut(buckets.AuthRoles, role.Name, b) } func delRole(tx backend.BatchTx, rolename string) { - tx.UnsafeDelete(authRolesBucketName, []byte(rolename)) + tx.UnsafeDelete(buckets.AuthRoles, []byte(rolename)) } func (as *authStore) IsAuthEnabled() bool { @@ -1028,12 +1025,12 @@ func NewAuthStore(lg *zap.Logger, be backend.Backend, tp TokenProvider, bcryptCo tx := be.BatchTx() tx.Lock() - tx.UnsafeCreateBucket(authBucketName) - tx.UnsafeCreateBucket(authUsersBucketName) - tx.UnsafeCreateBucket(authRolesBucketName) + tx.UnsafeCreateBucket(buckets.Auth) + tx.UnsafeCreateBucket(buckets.AuthUsers) + tx.UnsafeCreateBucket(buckets.AuthRoles) enabled := false - _, vs := tx.UnsafeRange(authBucketName, enableFlagKey, nil, 0) + _, vs := tx.UnsafeRange(buckets.Auth, enableFlagKey, nil, 0) if len(vs) == 1 { if bytes.Equal(vs[0], authEnabled) { enabled = true @@ -1076,11 +1073,11 @@ func (as *authStore) commitRevision(tx backend.BatchTx) { atomic.AddUint64(&as.revision, 1) revBytes := make([]byte, revBytesLen) binary.BigEndian.PutUint64(revBytes, as.Revision()) - tx.UnsafePut(authBucketName, revisionKey, revBytes) + tx.UnsafePut(buckets.Auth, revisionKey, revBytes) } func getRevision(tx backend.BatchTx) uint64 { - _, vs := tx.UnsafeRange(authBucketName, revisionKey, nil, 0) + _, vs := tx.UnsafeRange(buckets.Auth, revisionKey, nil, 0) if len(vs) != 1 { // this can happen in the initialization phase return 0 diff --git a/server/etcdserver/api/membership/cluster.go b/server/etcdserver/api/membership/cluster.go index 3187d12f7d1..3df9588be8c 100644 --- a/server/etcdserver/api/membership/cluster.go +++ b/server/etcdserver/api/membership/cluster.go @@ -34,6 +34,7 @@ import ( "go.etcd.io/etcd/raft/v3/raftpb" "go.etcd.io/etcd/server/v3/etcdserver/api/v2store" "go.etcd.io/etcd/server/v3/mvcc/backend" + "go.etcd.io/etcd/server/v3/mvcc/buckets" "github.com/coreos/go-semver/semver" "github.com/prometheus/client_golang/prometheus" @@ -700,7 +701,7 @@ func clusterVersionFromBackend(lg *zap.Logger, be backend.Backend) *semver.Versi tx := be.ReadTx() tx.RLock() defer tx.RUnlock() - keys, vals := tx.UnsafeRange(clusterBucketName, ckey, nil, 0) + keys, vals := tx.UnsafeRange(buckets.Cluster, ckey, nil, 0) if len(keys) == 0 { return nil } @@ -719,7 +720,7 @@ func downgradeInfoFromBackend(lg *zap.Logger, be backend.Backend) *DowngradeInfo tx := be.ReadTx() tx.Lock() defer tx.Unlock() - keys, vals := tx.UnsafeRange(clusterBucketName, dkey, nil, 0) + keys, vals := tx.UnsafeRange(buckets.Cluster, dkey, nil, 0) if len(keys) == 0 { return nil } diff --git a/server/etcdserver/api/membership/confstate.go b/server/etcdserver/api/membership/confstate.go index 9bfc71b379c..3aa8c649b2e 100644 --- a/server/etcdserver/api/membership/confstate.go +++ b/server/etcdserver/api/membership/confstate.go @@ -19,8 +19,8 @@ import ( "log" "go.etcd.io/etcd/raft/v3/raftpb" - "go.etcd.io/etcd/server/v3/mvcc" "go.etcd.io/etcd/server/v3/mvcc/backend" + "go.etcd.io/etcd/server/v3/mvcc/buckets" "go.uber.org/zap" ) @@ -36,13 +36,13 @@ func MustUnsafeSaveConfStateToBackend(lg *zap.Logger, tx backend.BatchTx, confSt lg.Panic("Cannot marshal raftpb.ConfState", zap.Stringer("conf-state", confState), zap.Error(err)) } - tx.UnsafePut(mvcc.MetaBucketName, confStateKey, confStateBytes) + tx.UnsafePut(buckets.Meta, confStateKey, confStateBytes) } // UnsafeConfStateFromBackend retrieves ConfState from the backend. // Returns nil if confState in backend is not persisted (e.g. backend writen by 0 { bs2 := make([]byte, 8) binary.BigEndian.PutUint64(bs2, term) - tx.UnsafePut(MetaBucketName, TermKeyName, bs2) + tx.UnsafePut(buckets.Meta, buckets.MetaTermKeyName, bs2) } } diff --git a/server/lease/lessor.go b/server/lease/lessor.go index 5dba54db02e..7236515f2b3 100644 --- a/server/lease/lessor.go +++ b/server/lease/lessor.go @@ -27,6 +27,7 @@ import ( pb "go.etcd.io/etcd/api/v3/etcdserverpb" "go.etcd.io/etcd/server/v3/lease/leasepb" "go.etcd.io/etcd/server/v3/mvcc/backend" + "go.etcd.io/etcd/server/v3/mvcc/buckets" "go.uber.org/zap" ) @@ -39,8 +40,6 @@ const MaxLeaseTTL = 9000000000 var ( forever = time.Time{} - leaseBucketName = []byte("lease") - // maximum number of leases to revoke per second; configurable for tests leaseRevokeRate = 1000 @@ -337,7 +336,7 @@ func (le *lessor) Revoke(id LeaseID) error { // lease deletion needs to be in the same backend transaction with the // kv deletion. Or we might end up with not executing the revoke or not // deleting the keys if etcdserver fails in between. - le.b.BatchTx().UnsafeDelete(leaseBucketName, int64ToBytes(int64(l.ID))) + le.b.BatchTx().UnsafeDelete(buckets.Lease, int64ToBytes(int64(l.ID))) txn.End() @@ -771,8 +770,8 @@ func (le *lessor) initAndRecover() { tx := le.b.BatchTx() tx.Lock() - tx.UnsafeCreateBucket(leaseBucketName) - _, vs := tx.UnsafeRange(leaseBucketName, int64ToBytes(0), int64ToBytes(math.MaxInt64), 0) + tx.UnsafeCreateBucket(buckets.Lease) + _, vs := tx.UnsafeRange(buckets.Lease, int64ToBytes(0), int64ToBytes(math.MaxInt64), 0) // TODO: copy vs and do decoding outside tx lock if lock contention becomes an issue. for i := range vs { var lpb leasepb.Lease @@ -831,7 +830,7 @@ func (l *Lease) persistTo(b backend.Backend) { } b.BatchTx().Lock() - b.BatchTx().UnsafePut(leaseBucketName, key, val) + b.BatchTx().UnsafePut(buckets.Lease, key, val) b.BatchTx().Unlock() } diff --git a/server/lease/lessor_test.go b/server/lease/lessor_test.go index c6cb0518e84..58a4ad29086 100644 --- a/server/lease/lessor_test.go +++ b/server/lease/lessor_test.go @@ -28,6 +28,7 @@ import ( pb "go.etcd.io/etcd/api/v3/etcdserverpb" "go.etcd.io/etcd/server/v3/mvcc/backend" + "go.etcd.io/etcd/server/v3/mvcc/buckets" "go.uber.org/zap" ) @@ -92,7 +93,7 @@ func TestLessorGrant(t *testing.T) { } be.BatchTx().Lock() - _, vs := be.BatchTx().UnsafeRange(leaseBucketName, int64ToBytes(int64(l.ID)), nil, 0) + _, vs := be.BatchTx().UnsafeRange(buckets.Lease, int64ToBytes(int64(l.ID)), nil, 0) if len(vs) != 1 { t.Errorf("len(vs) = %d, want 1", len(vs)) } @@ -195,7 +196,7 @@ func TestLessorRevoke(t *testing.T) { } be.BatchTx().Lock() - _, vs := be.BatchTx().UnsafeRange(leaseBucketName, int64ToBytes(int64(l.ID)), nil, 0) + _, vs := be.BatchTx().UnsafeRange(buckets.Lease, int64ToBytes(int64(l.ID)), nil, 0) if len(vs) != 0 { t.Errorf("len(vs) = %d, want 0", len(vs)) } diff --git a/server/mvcc/backend/backend.go b/server/mvcc/backend/backend.go index bd194b24c2c..b7207c1717a 100644 --- a/server/mvcc/backend/backend.go +++ b/server/mvcc/backend/backend.go @@ -53,7 +53,7 @@ type Backend interface { ConcurrentReadTx() ReadTx Snapshot() Snapshot - Hash(ignores map[IgnoreKey]struct{}) (uint32, error) + Hash(ignores func(bucketName, keyName []byte) bool) (uint32, error) // Size returns the current size of the backend physically allocated. // The backend can hold DB space that is not utilized at the moment, // since it can conduct pre-allocation or spare unused space for recycling. @@ -194,10 +194,10 @@ func newBackend(bcfg BackendConfig) *backend { readTx: &readTx{ baseReadTx: baseReadTx{ buf: txReadBuffer{ - txBuffer: txBuffer{make(map[string]*bucketBuffer)}, + txBuffer: txBuffer{make(map[BucketID]*bucketBuffer)}, bufVersion: 0, }, - buckets: make(map[string]*bolt.Bucket), + buckets: make(map[BucketID]*bolt.Bucket), txWg: new(sync.WaitGroup), txMu: new(sync.RWMutex), }, @@ -358,12 +358,7 @@ func (b *backend) Snapshot() Snapshot { return &snapshot{tx, stopc, donec} } -type IgnoreKey struct { - Bucket string - Key string -} - -func (b *backend) Hash(ignores map[IgnoreKey]struct{}) (uint32, error) { +func (b *backend) Hash(ignores func(bucketName, keyName []byte) bool) (uint32, error) { h := crc32.New(crc32.MakeTable(crc32.Castagnoli)) b.mu.RLock() @@ -377,8 +372,7 @@ func (b *backend) Hash(ignores map[IgnoreKey]struct{}) (uint32, error) { } h.Write(next) b.ForEach(func(k, v []byte) error { - bk := IgnoreKey{Bucket: string(next), Key: string(k)} - if _, ok := ignores[bk]; !ok { + if ignores != nil && !ignores(next, k) { h.Write(k) h.Write(v) } @@ -587,7 +581,7 @@ func defragdb(odb, tmpdb *bolt.DB, limit int) error { if berr != nil { return berr } - tmpb.FillPercent = 0.9 // for seq write in for each + tmpb.FillPercent = 0.9 // for bucket2seq write in for each if err = b.ForEach(func(k, v []byte) error { count++ @@ -601,7 +595,7 @@ func defragdb(odb, tmpdb *bolt.DB, limit int) error { return err } tmpb = tmptx.Bucket(next) - tmpb.FillPercent = 0.9 // for seq write in for each + tmpb.FillPercent = 0.9 // for bucket2seq write in for each count = 0 } diff --git a/server/mvcc/backend/backend_bench_test.go b/server/mvcc/backend/backend_bench_test.go index 4f3599a8710..7bfae653178 100644 --- a/server/mvcc/backend/backend_bench_test.go +++ b/server/mvcc/backend/backend_bench_test.go @@ -21,6 +21,7 @@ import ( "github.com/stretchr/testify/assert" betesting "go.etcd.io/etcd/server/v3/mvcc/backend/testing" + "go.etcd.io/etcd/server/v3/mvcc/buckets" ) func BenchmarkBackendPut(b *testing.B) { @@ -41,13 +42,13 @@ func BenchmarkBackendPut(b *testing.B) { batchTx := backend.BatchTx() batchTx.Lock() - batchTx.UnsafeCreateBucket([]byte("test")) + batchTx.UnsafeCreateBucket(buckets.Test) batchTx.Unlock() b.ResetTimer() for i := 0; i < b.N; i++ { batchTx.Lock() - batchTx.UnsafePut([]byte("test"), keys[i], value) + batchTx.UnsafePut(buckets.Test, keys[i], value) batchTx.Unlock() } } diff --git a/server/mvcc/backend/backend_test.go b/server/mvcc/backend/backend_test.go index bb7a34e6d3d..af93f1ab859 100644 --- a/server/mvcc/backend/backend_test.go +++ b/server/mvcc/backend/backend_test.go @@ -25,6 +25,7 @@ import ( bolt "go.etcd.io/bbolt" "go.etcd.io/etcd/server/v3/mvcc/backend" betesting "go.etcd.io/etcd/server/v3/mvcc/backend/testing" + "go.etcd.io/etcd/server/v3/mvcc/buckets" ) func TestBackendClose(t *testing.T) { @@ -52,8 +53,8 @@ func TestBackendSnapshot(t *testing.T) { tx := b.BatchTx() tx.Lock() - tx.UnsafeCreateBucket([]byte("test")) - tx.UnsafePut([]byte("test"), []byte("foo"), []byte("bar")) + tx.UnsafeCreateBucket(buckets.Test) + tx.UnsafePut(buckets.Test, []byte("foo"), []byte("bar")) tx.Unlock() b.ForceCommit() @@ -77,7 +78,7 @@ func TestBackendSnapshot(t *testing.T) { newTx := nb.BatchTx() newTx.Lock() - ks, _ := newTx.UnsafeRange([]byte("test"), []byte("foo"), []byte("goo"), 0) + ks, _ := newTx.UnsafeRange(buckets.Test, []byte("foo"), []byte("goo"), 0) if len(ks) != 1 { t.Errorf("len(kvs) = %d, want 1", len(ks)) } @@ -94,8 +95,8 @@ func TestBackendBatchIntervalCommit(t *testing.T) { tx := b.BatchTx() tx.Lock() - tx.UnsafeCreateBucket([]byte("test")) - tx.UnsafePut([]byte("test"), []byte("foo"), []byte("bar")) + tx.UnsafeCreateBucket(buckets.Test) + tx.UnsafePut(buckets.Test, []byte("foo"), []byte("bar")) tx.Unlock() for i := 0; i < 10; i++ { @@ -126,9 +127,9 @@ func TestBackendDefrag(t *testing.T) { tx := b.BatchTx() tx.Lock() - tx.UnsafeCreateBucket([]byte("test")) + tx.UnsafeCreateBucket(buckets.Test) for i := 0; i < backend.DefragLimitForTest()+100; i++ { - tx.UnsafePut([]byte("test"), []byte(fmt.Sprintf("foo_%d", i)), []byte("bar")) + tx.UnsafePut(buckets.Test, []byte(fmt.Sprintf("foo_%d", i)), []byte("bar")) } tx.Unlock() b.ForceCommit() @@ -137,7 +138,7 @@ func TestBackendDefrag(t *testing.T) { tx = b.BatchTx() tx.Lock() for i := 0; i < 50; i++ { - tx.UnsafeDelete([]byte("test"), []byte(fmt.Sprintf("foo_%d", i))) + tx.UnsafeDelete(buckets.Test, []byte(fmt.Sprintf("foo_%d", i))) } tx.Unlock() b.ForceCommit() @@ -171,8 +172,8 @@ func TestBackendDefrag(t *testing.T) { // try put more keys after shrink. tx = b.BatchTx() tx.Lock() - tx.UnsafeCreateBucket([]byte("test")) - tx.UnsafePut([]byte("test"), []byte("more"), []byte("bar")) + tx.UnsafeCreateBucket(buckets.Test) + tx.UnsafePut(buckets.Test, []byte("more"), []byte("bar")) tx.Unlock() b.ForceCommit() } @@ -184,15 +185,15 @@ func TestBackendWriteback(t *testing.T) { tx := b.BatchTx() tx.Lock() - tx.UnsafeCreateBucket([]byte("key")) - tx.UnsafePut([]byte("key"), []byte("abc"), []byte("bar")) - tx.UnsafePut([]byte("key"), []byte("def"), []byte("baz")) - tx.UnsafePut([]byte("key"), []byte("overwrite"), []byte("1")) + tx.UnsafeCreateBucket(buckets.Key) + tx.UnsafePut(buckets.Key, []byte("abc"), []byte("bar")) + tx.UnsafePut(buckets.Key, []byte("def"), []byte("baz")) + tx.UnsafePut(buckets.Key, []byte("overwrite"), []byte("1")) tx.Unlock() // overwrites should be propagated too tx.Lock() - tx.UnsafePut([]byte("key"), []byte("overwrite"), []byte("2")) + tx.UnsafePut(buckets.Key, []byte("overwrite"), []byte("2")) tx.Unlock() keys := []struct { @@ -242,12 +243,14 @@ func TestBackendWriteback(t *testing.T) { } rtx := b.ReadTx() for i, tt := range keys { - rtx.RLock() - k, v := rtx.UnsafeRange([]byte("key"), tt.key, tt.end, tt.limit) - rtx.RUnlock() - if !reflect.DeepEqual(tt.wkey, k) || !reflect.DeepEqual(tt.wval, v) { - t.Errorf("#%d: want k=%+v, v=%+v; got k=%+v, v=%+v", i, tt.wkey, tt.wval, k, v) - } + func() { + rtx.RLock() + defer rtx.RUnlock() + k, v := rtx.UnsafeRange(buckets.Key, tt.key, tt.end, tt.limit) + if !reflect.DeepEqual(tt.wkey, k) || !reflect.DeepEqual(tt.wval, v) { + t.Errorf("#%d: want k=%+v, v=%+v; got k=%+v, v=%+v", i, tt.wkey, tt.wval, k, v) + } + }() } } @@ -258,20 +261,20 @@ func TestConcurrentReadTx(t *testing.T) { wtx1 := b.BatchTx() wtx1.Lock() - wtx1.UnsafeCreateBucket([]byte("key")) - wtx1.UnsafePut([]byte("key"), []byte("abc"), []byte("ABC")) - wtx1.UnsafePut([]byte("key"), []byte("overwrite"), []byte("1")) + wtx1.UnsafeCreateBucket(buckets.Key) + wtx1.UnsafePut(buckets.Key, []byte("abc"), []byte("ABC")) + wtx1.UnsafePut(buckets.Key, []byte("overwrite"), []byte("1")) wtx1.Unlock() wtx2 := b.BatchTx() wtx2.Lock() - wtx2.UnsafePut([]byte("key"), []byte("def"), []byte("DEF")) - wtx2.UnsafePut([]byte("key"), []byte("overwrite"), []byte("2")) + wtx2.UnsafePut(buckets.Key, []byte("def"), []byte("DEF")) + wtx2.UnsafePut(buckets.Key, []byte("overwrite"), []byte("2")) wtx2.Unlock() rtx := b.ConcurrentReadTx() rtx.RLock() // no-op - k, v := rtx.UnsafeRange([]byte("key"), []byte("abc"), []byte("\xff"), 0) + k, v := rtx.UnsafeRange(buckets.Key, []byte("abc"), []byte("\xff"), 0) rtx.RUnlock() wKey := [][]byte{[]byte("abc"), []byte("def"), []byte("overwrite")} wVal := [][]byte{[]byte("ABC"), []byte("DEF"), []byte("2")} @@ -288,10 +291,10 @@ func TestBackendWritebackForEach(t *testing.T) { tx := b.BatchTx() tx.Lock() - tx.UnsafeCreateBucket([]byte("key")) + tx.UnsafeCreateBucket(buckets.Key) for i := 0; i < 5; i++ { k := []byte(fmt.Sprintf("%04d", i)) - tx.UnsafePut([]byte("key"), k, []byte("bar")) + tx.UnsafePut(buckets.Key, k, []byte("bar")) } tx.Unlock() @@ -299,10 +302,10 @@ func TestBackendWritebackForEach(t *testing.T) { b.ForceCommit() tx.Lock() - tx.UnsafeCreateBucket([]byte("key")) + tx.UnsafeCreateBucket(buckets.Key) for i := 5; i < 20; i++ { k := []byte(fmt.Sprintf("%04d", i)) - tx.UnsafePut([]byte("key"), k, []byte("bar")) + tx.UnsafePut(buckets.Key, k, []byte("bar")) } tx.Unlock() @@ -313,7 +316,7 @@ func TestBackendWritebackForEach(t *testing.T) { } rtx := b.ReadTx() rtx.RLock() - assert.NoError(t, rtx.UnsafeForEach([]byte("key"), getSeq)) + assert.NoError(t, rtx.UnsafeForEach(buckets.Key, getSeq)) rtx.RUnlock() partialSeq := seq @@ -322,7 +325,7 @@ func TestBackendWritebackForEach(t *testing.T) { b.ForceCommit() tx.Lock() - assert.NoError(t, tx.UnsafeForEach([]byte("key"), getSeq)) + assert.NoError(t, tx.UnsafeForEach(buckets.Key, getSeq)) tx.Unlock() if seq != partialSeq { diff --git a/server/mvcc/backend/batch_tx.go b/server/mvcc/backend/batch_tx.go index 74107b445e5..b2b0ad7cbf0 100644 --- a/server/mvcc/backend/batch_tx.go +++ b/server/mvcc/backend/batch_tx.go @@ -25,13 +25,30 @@ import ( "go.uber.org/zap" ) +type BucketID int + +type Bucket interface { + // ID returns a unique identifier of a bucket. + // The id must NOT be persisted and can be used as lightweight identificator + // in the in-memory maps. + ID() BucketID + Name() []byte + // String implements Stringer (human readable name). + String() string + + // IsSafeRangeBucket is a hack to avoid inadvertently reading duplicate keys; + // overwrites on a bucket should only fetch with limit=1, but safeRangeBucket + // is known to never overwrite any key so range is safe. + IsSafeRangeBucket() bool +} + type BatchTx interface { ReadTx - UnsafeCreateBucket(name []byte) - UnsafeDeleteBucket(name []byte) - UnsafePut(bucketName []byte, key []byte, value []byte) - UnsafeSeqPut(bucketName []byte, key []byte, value []byte) - UnsafeDelete(bucketName []byte, key []byte) + UnsafeCreateBucket(bucket Bucket) + UnsafeDeleteBucket(bucket Bucket) + UnsafePut(bucket Bucket, key []byte, value []byte) + UnsafeSeqPut(bucket Bucket, key []byte, value []byte) + UnsafeDelete(bucket Bucket, key []byte) // Commit commits a previous tx and begins a new writable one. Commit() // CommitAndStop commits the previous tx and does not create a new one. @@ -69,24 +86,24 @@ func (t *batchTx) RUnlock() { panic("unexpected RUnlock") } -func (t *batchTx) UnsafeCreateBucket(name []byte) { - _, err := t.tx.CreateBucket(name) +func (t *batchTx) UnsafeCreateBucket(bucket Bucket) { + _, err := t.tx.CreateBucket(bucket.Name()) if err != nil && err != bolt.ErrBucketExists { t.backend.lg.Fatal( "failed to create a bucket", - zap.String("bucket-name", string(name)), + zap.Stringer("bucket-name", bucket), zap.Error(err), ) } t.pending++ } -func (t *batchTx) UnsafeDeleteBucket(name []byte) { - err := t.tx.DeleteBucket(name) +func (t *batchTx) UnsafeDeleteBucket(bucket Bucket) { + err := t.tx.DeleteBucket(bucket.Name()) if err != nil && err != bolt.ErrBucketNotFound { t.backend.lg.Fatal( "failed to delete a bucket", - zap.String("bucket-name", string(name)), + zap.Stringer("bucket-name", bucket), zap.Error(err), ) } @@ -94,21 +111,21 @@ func (t *batchTx) UnsafeDeleteBucket(name []byte) { } // UnsafePut must be called holding the lock on the tx. -func (t *batchTx) UnsafePut(bucketName []byte, key []byte, value []byte) { - t.unsafePut(bucketName, key, value, false) +func (t *batchTx) UnsafePut(bucket Bucket, key []byte, value []byte) { + t.unsafePut(bucket, key, value, false) } // UnsafeSeqPut must be called holding the lock on the tx. -func (t *batchTx) UnsafeSeqPut(bucketName []byte, key []byte, value []byte) { - t.unsafePut(bucketName, key, value, true) +func (t *batchTx) UnsafeSeqPut(bucket Bucket, key []byte, value []byte) { + t.unsafePut(bucket, key, value, true) } -func (t *batchTx) unsafePut(bucketName []byte, key []byte, value []byte, seq bool) { - bucket := t.tx.Bucket(bucketName) +func (t *batchTx) unsafePut(bucketType Bucket, key []byte, value []byte, seq bool) { + bucket := t.tx.Bucket(bucketType.Name()) if bucket == nil { t.backend.lg.Fatal( "failed to find a bucket", - zap.String("bucket-name", string(bucketName)), + zap.Stringer("bucket-name", bucketType), zap.Stack("stack"), ) } @@ -120,7 +137,7 @@ func (t *batchTx) unsafePut(bucketName []byte, key []byte, value []byte, seq boo if err := bucket.Put(key, value); err != nil { t.backend.lg.Fatal( "failed to write to a bucket", - zap.String("bucket-name", string(bucketName)), + zap.Stringer("bucket-name", bucketType), zap.Error(err), ) } @@ -128,12 +145,12 @@ func (t *batchTx) unsafePut(bucketName []byte, key []byte, value []byte, seq boo } // UnsafeRange must be called holding the lock on the tx. -func (t *batchTx) UnsafeRange(bucketName, key, endKey []byte, limit int64) ([][]byte, [][]byte) { - bucket := t.tx.Bucket(bucketName) +func (t *batchTx) UnsafeRange(bucketType Bucket, key, endKey []byte, limit int64) ([][]byte, [][]byte) { + bucket := t.tx.Bucket(bucketType.Name()) if bucket == nil { t.backend.lg.Fatal( "failed to find a bucket", - zap.String("bucket-name", string(bucketName)), + zap.Stringer("bucket-name", bucketType), zap.Stack("stack"), ) } @@ -163,12 +180,12 @@ func unsafeRange(c *bolt.Cursor, key, endKey []byte, limit int64) (keys [][]byte } // UnsafeDelete must be called holding the lock on the tx. -func (t *batchTx) UnsafeDelete(bucketName []byte, key []byte) { - bucket := t.tx.Bucket(bucketName) +func (t *batchTx) UnsafeDelete(bucketType Bucket, key []byte) { + bucket := t.tx.Bucket(bucketType.Name()) if bucket == nil { t.backend.lg.Fatal( "failed to find a bucket", - zap.String("bucket-name", string(bucketName)), + zap.Stringer("bucket-name", bucketType), zap.Stack("stack"), ) } @@ -176,7 +193,7 @@ func (t *batchTx) UnsafeDelete(bucketName []byte, key []byte) { if err != nil { t.backend.lg.Fatal( "failed to delete a key", - zap.String("bucket-name", string(bucketName)), + zap.Stringer("bucket-name", bucketType), zap.Error(err), ) } @@ -184,12 +201,12 @@ func (t *batchTx) UnsafeDelete(bucketName []byte, key []byte) { } // UnsafeForEach must be called holding the lock on the tx. -func (t *batchTx) UnsafeForEach(bucketName []byte, visitor func(k, v []byte) error) error { - return unsafeForEach(t.tx, bucketName, visitor) +func (t *batchTx) UnsafeForEach(bucket Bucket, visitor func(k, v []byte) error) error { + return unsafeForEach(t.tx, bucket, visitor) } -func unsafeForEach(tx *bolt.Tx, bucket []byte, visitor func(k, v []byte) error) error { - if b := tx.Bucket(bucket); b != nil { +func unsafeForEach(tx *bolt.Tx, bucket Bucket, visitor func(k, v []byte) error) error { + if b := tx.Bucket(bucket.Name()); b != nil { return b.ForEach(visitor) } return nil @@ -253,8 +270,8 @@ func newBatchTxBuffered(backend *backend) *batchTxBuffered { tx := &batchTxBuffered{ batchTx: batchTx{backend: backend}, buf: txWriteBuffer{ - txBuffer: txBuffer{make(map[string]*bucketBuffer)}, - seq: true, + txBuffer: txBuffer{make(map[BucketID]*bucketBuffer)}, + bucket2seq: make(map[BucketID]bool), }, } tx.Commit() @@ -316,12 +333,12 @@ func (t *batchTxBuffered) unsafeCommit(stop bool) { } } -func (t *batchTxBuffered) UnsafePut(bucketName []byte, key []byte, value []byte) { - t.batchTx.UnsafePut(bucketName, key, value) - t.buf.put(bucketName, key, value) +func (t *batchTxBuffered) UnsafePut(bucket Bucket, key []byte, value []byte) { + t.batchTx.UnsafePut(bucket, key, value) + t.buf.put(bucket, key, value) } -func (t *batchTxBuffered) UnsafeSeqPut(bucketName []byte, key []byte, value []byte) { - t.batchTx.UnsafeSeqPut(bucketName, key, value) - t.buf.putSeq(bucketName, key, value) +func (t *batchTxBuffered) UnsafeSeqPut(bucket Bucket, key []byte, value []byte) { + t.batchTx.UnsafeSeqPut(bucket, key, value) + t.buf.putSeq(bucket, key, value) } diff --git a/server/mvcc/backend/batch_tx_test.go b/server/mvcc/backend/batch_tx_test.go index 95375253d1a..75b377fed8e 100644 --- a/server/mvcc/backend/batch_tx_test.go +++ b/server/mvcc/backend/batch_tx_test.go @@ -22,6 +22,7 @@ import ( bolt "go.etcd.io/bbolt" "go.etcd.io/etcd/server/v3/mvcc/backend" betesting "go.etcd.io/etcd/server/v3/mvcc/backend/testing" + "go.etcd.io/etcd/server/v3/mvcc/buckets" ) func TestBatchTxPut(t *testing.T) { @@ -33,18 +34,18 @@ func TestBatchTxPut(t *testing.T) { tx.Lock() // create bucket - tx.UnsafeCreateBucket([]byte("test")) + tx.UnsafeCreateBucket(buckets.Test) // put v := []byte("bar") - tx.UnsafePut([]byte("test"), []byte("foo"), v) + tx.UnsafePut(buckets.Test, []byte("foo"), v) tx.Unlock() // check put result before and after tx is committed for k := 0; k < 2; k++ { tx.Lock() - _, gv := tx.UnsafeRange([]byte("test"), []byte("foo"), nil, 0) + _, gv := tx.UnsafeRange(buckets.Test, []byte("foo"), nil, 0) tx.Unlock() if !reflect.DeepEqual(gv[0], v) { t.Errorf("v = %s, want %s", string(gv[0]), string(v)) @@ -61,12 +62,12 @@ func TestBatchTxRange(t *testing.T) { tx.Lock() defer tx.Unlock() - tx.UnsafeCreateBucket([]byte("test")) + tx.UnsafeCreateBucket(buckets.Test) // put keys allKeys := [][]byte{[]byte("foo"), []byte("foo1"), []byte("foo2")} allVals := [][]byte{[]byte("bar"), []byte("bar1"), []byte("bar2")} for i := range allKeys { - tx.UnsafePut([]byte("test"), allKeys[i], allVals[i]) + tx.UnsafePut(buckets.Test, allKeys[i], allVals[i]) } tests := []struct { @@ -114,7 +115,7 @@ func TestBatchTxRange(t *testing.T) { }, } for i, tt := range tests { - keys, vals := tx.UnsafeRange([]byte("test"), tt.key, tt.endKey, tt.limit) + keys, vals := tx.UnsafeRange(buckets.Test, tt.key, tt.endKey, tt.limit) if !reflect.DeepEqual(keys, tt.wkeys) { t.Errorf("#%d: keys = %+v, want %+v", i, keys, tt.wkeys) } @@ -131,17 +132,17 @@ func TestBatchTxDelete(t *testing.T) { tx := b.BatchTx() tx.Lock() - tx.UnsafeCreateBucket([]byte("test")) - tx.UnsafePut([]byte("test"), []byte("foo"), []byte("bar")) + tx.UnsafeCreateBucket(buckets.Test) + tx.UnsafePut(buckets.Test, []byte("foo"), []byte("bar")) - tx.UnsafeDelete([]byte("test"), []byte("foo")) + tx.UnsafeDelete(buckets.Test, []byte("foo")) tx.Unlock() // check put result before and after tx is committed for k := 0; k < 2; k++ { tx.Lock() - ks, _ := tx.UnsafeRange([]byte("test"), []byte("foo"), nil, 0) + ks, _ := tx.UnsafeRange(buckets.Test, []byte("foo"), nil, 0) tx.Unlock() if len(ks) != 0 { t.Errorf("keys on foo = %v, want nil", ks) @@ -156,15 +157,15 @@ func TestBatchTxCommit(t *testing.T) { tx := b.BatchTx() tx.Lock() - tx.UnsafeCreateBucket([]byte("test")) - tx.UnsafePut([]byte("test"), []byte("foo"), []byte("bar")) + tx.UnsafeCreateBucket(buckets.Test) + tx.UnsafePut(buckets.Test, []byte("foo"), []byte("bar")) tx.Unlock() tx.Commit() // check whether put happens via db view backend.DbFromBackendForTest(b).View(func(tx *bolt.Tx) error { - bucket := tx.Bucket([]byte("test")) + bucket := tx.Bucket(buckets.Test.Name()) if bucket == nil { t.Errorf("bucket test does not exit") return nil @@ -185,14 +186,14 @@ func TestBatchTxBatchLimitCommit(t *testing.T) { tx := b.BatchTx() tx.Lock() - tx.UnsafeCreateBucket([]byte("test")) - tx.UnsafePut([]byte("test"), []byte("foo"), []byte("bar")) + tx.UnsafeCreateBucket(buckets.Test) + tx.UnsafePut(buckets.Test, []byte("foo"), []byte("bar")) tx.Unlock() // batch limit commit should have been triggered // check whether put happens via db view backend.DbFromBackendForTest(b).View(func(tx *bolt.Tx) error { - bucket := tx.Bucket([]byte("test")) + bucket := tx.Bucket(buckets.Test.Name()) if bucket == nil { t.Errorf("bucket test does not exit") return nil diff --git a/server/mvcc/backend/hooks_test.go b/server/mvcc/backend/hooks_test.go index dbf68f910be..03a90dd3d83 100644 --- a/server/mvcc/backend/hooks_test.go +++ b/server/mvcc/backend/hooks_test.go @@ -22,10 +22,11 @@ import ( "github.com/stretchr/testify/assert" "go.etcd.io/etcd/server/v3/mvcc/backend" betesting "go.etcd.io/etcd/server/v3/mvcc/backend/testing" + "go.etcd.io/etcd/server/v3/mvcc/buckets" ) var ( - bucket = []byte("bucket") + bucket = buckets.Test key = []byte("key") ) diff --git a/server/mvcc/backend/read_tx.go b/server/mvcc/backend/read_tx.go index 3658786277b..56327d52ae6 100644 --- a/server/mvcc/backend/read_tx.go +++ b/server/mvcc/backend/read_tx.go @@ -15,17 +15,15 @@ package backend import ( - "bytes" "math" "sync" bolt "go.etcd.io/bbolt" ) -// safeRangeBucket is a hack to avoid inadvertently reading duplicate keys; -// overwrites on a bucket should only fetch with limit=1, but safeRangeBucket +// IsSafeRangeBucket is a hack to avoid inadvertently reading duplicate keys; +// overwrites on a bucket should only fetch with limit=1, but IsSafeRangeBucket // is known to never overwrite any key so range is safe. -var safeRangeBucket = []byte("key") type ReadTx interface { Lock() @@ -33,8 +31,8 @@ type ReadTx interface { RLock() RUnlock() - UnsafeRange(bucketName []byte, key, endKey []byte, limit int64) (keys [][]byte, vals [][]byte) - UnsafeForEach(bucketName []byte, visitor func(k, v []byte) error) error + UnsafeRange(bucket Bucket, key, endKey []byte, limit int64) (keys [][]byte, vals [][]byte) + UnsafeForEach(bucket Bucket, visitor func(k, v []byte) error) error } // Base type for readTx and concurrentReadTx to eliminate duplicate functions between these @@ -47,12 +45,12 @@ type baseReadTx struct { // txMu protects accesses to buckets and tx on Range requests. txMu *sync.RWMutex tx *bolt.Tx - buckets map[string]*bolt.Bucket + buckets map[BucketID]*bolt.Bucket // txWg protects tx from being rolled back at the end of a batch interval until all reads using this tx are done. txWg *sync.WaitGroup } -func (baseReadTx *baseReadTx) UnsafeForEach(bucketName []byte, visitor func(k, v []byte) error) error { +func (baseReadTx *baseReadTx) UnsafeForEach(bucket Bucket, visitor func(k, v []byte) error) error { dups := make(map[string]struct{}) getDups := func(k, v []byte) error { dups[string(k)] = struct{}{} @@ -64,19 +62,19 @@ func (baseReadTx *baseReadTx) UnsafeForEach(bucketName []byte, visitor func(k, v } return visitor(k, v) } - if err := baseReadTx.buf.ForEach(bucketName, getDups); err != nil { + if err := baseReadTx.buf.ForEach(bucket, getDups); err != nil { return err } baseReadTx.txMu.Lock() - err := unsafeForEach(baseReadTx.tx, bucketName, visitNoDup) + err := unsafeForEach(baseReadTx.tx, bucket, visitNoDup) baseReadTx.txMu.Unlock() if err != nil { return err } - return baseReadTx.buf.ForEach(bucketName, visitor) + return baseReadTx.buf.ForEach(bucket, visitor) } -func (baseReadTx *baseReadTx) UnsafeRange(bucketName, key, endKey []byte, limit int64) ([][]byte, [][]byte) { +func (baseReadTx *baseReadTx) UnsafeRange(bucketType Bucket, key, endKey []byte, limit int64) ([][]byte, [][]byte) { if endKey == nil { // forbid duplicates for single keys limit = 1 @@ -84,16 +82,16 @@ func (baseReadTx *baseReadTx) UnsafeRange(bucketName, key, endKey []byte, limit if limit <= 0 { limit = math.MaxInt64 } - if limit > 1 && !bytes.Equal(bucketName, safeRangeBucket) { + if limit > 1 && !bucketType.IsSafeRangeBucket() { panic("do not use unsafeRange on non-keys bucket") } - keys, vals := baseReadTx.buf.Range(bucketName, key, endKey, limit) + keys, vals := baseReadTx.buf.Range(bucketType, key, endKey, limit) if int64(len(keys)) == limit { return keys, vals } // find/cache bucket - bn := string(bucketName) + bn := bucketType.ID() baseReadTx.txMu.RLock() bucket, ok := baseReadTx.buckets[bn] baseReadTx.txMu.RUnlock() @@ -101,7 +99,7 @@ func (baseReadTx *baseReadTx) UnsafeRange(bucketName, key, endKey []byte, limit if !ok { baseReadTx.txMu.Lock() lockHeld = true - bucket = baseReadTx.tx.Bucket(bucketName) + bucket = baseReadTx.tx.Bucket(bucketType.Name()) baseReadTx.buckets[bn] = bucket } @@ -133,7 +131,7 @@ func (rt *readTx) RUnlock() { rt.mu.RUnlock() } func (rt *readTx) reset() { rt.buf.reset() - rt.buckets = make(map[string]*bolt.Bucket) + rt.buckets = make(map[BucketID]*bolt.Bucket) rt.tx = nil rt.txWg = new(sync.WaitGroup) } diff --git a/server/mvcc/backend/tx_buffer.go b/server/mvcc/backend/tx_buffer.go index 53072543cef..66740024836 100644 --- a/server/mvcc/backend/tx_buffer.go +++ b/server/mvcc/backend/tx_buffer.go @@ -23,7 +23,7 @@ const bucketBufferInitialSize = 512 // txBuffer handles functionality shared between txWriteBuffer and txReadBuffer. type txBuffer struct { - buckets map[string]*bucketBuffer + buckets map[BucketID]*bucketBuffer } func (txb *txBuffer) reset() { @@ -39,23 +39,42 @@ func (txb *txBuffer) reset() { // txWriteBuffer buffers writes of pending updates that have not yet committed. type txWriteBuffer struct { txBuffer - seq bool + // Map from bucket ID into information whether this bucket is edited + // sequentially (i.e. keys are growing monotonically). + bucket2seq map[BucketID]bool } -func (txw *txWriteBuffer) put(bucket, k, v []byte) { - txw.seq = false - txw.putSeq(bucket, k, v) +func (txw *txWriteBuffer) put(bucket Bucket, k, v []byte) { + txw.bucket2seq[bucket.ID()] = false + txw.putInternal(bucket, k, v) } -func (txw *txWriteBuffer) putSeq(bucket, k, v []byte) { - b, ok := txw.buckets[string(bucket)] +func (txw *txWriteBuffer) putSeq(bucket Bucket, k, v []byte) { + // TODO: Add (in tests?) verification whether k>b[len(b)] + txw.putInternal(bucket, k, v) +} + +func (txw *txWriteBuffer) putInternal(bucket Bucket, k, v []byte) { + b, ok := txw.buckets[bucket.ID()] if !ok { b = newBucketBuffer() - txw.buckets[string(bucket)] = b + txw.buckets[bucket.ID()] = b } b.add(k, v) } +func (txw *txWriteBuffer) reset() { + txw.txBuffer.reset() + for k := range txw.bucket2seq { + v, ok := txw.buckets[k] + if !ok { + delete(txw.bucket2seq, k) + } else if v.used == 0 { + txw.bucket2seq[k] = true + } + } +} + func (txw *txWriteBuffer) writeback(txr *txReadBuffer) { for k, wb := range txw.buckets { rb, ok := txr.buckets[k] @@ -64,7 +83,7 @@ func (txw *txWriteBuffer) writeback(txr *txReadBuffer) { txr.buckets[k] = wb continue } - if !txw.seq && wb.used > 1 { + if seq, ok := txw.bucket2seq[k]; ok && !seq && wb.used > 1 { // assume no duplicate keys sort.Sort(wb) } @@ -82,15 +101,15 @@ type txReadBuffer struct { bufVersion uint64 } -func (txr *txReadBuffer) Range(bucketName, key, endKey []byte, limit int64) ([][]byte, [][]byte) { - if b := txr.buckets[string(bucketName)]; b != nil { +func (txr *txReadBuffer) Range(bucket Bucket, key, endKey []byte, limit int64) ([][]byte, [][]byte) { + if b := txr.buckets[bucket.ID()]; b != nil { return b.Range(key, endKey, limit) } return nil, nil } -func (txr *txReadBuffer) ForEach(bucketName []byte, visitor func(k, v []byte) error) error { - if b := txr.buckets[string(bucketName)]; b != nil { +func (txr *txReadBuffer) ForEach(bucket Bucket, visitor func(k, v []byte) error) error { + if b := txr.buckets[bucket.ID()]; b != nil { return b.ForEach(visitor) } return nil @@ -100,7 +119,7 @@ func (txr *txReadBuffer) ForEach(bucketName []byte, visitor func(k, v []byte) er func (txr *txReadBuffer) unsafeCopy() txReadBuffer { txrCopy := txReadBuffer{ txBuffer: txBuffer{ - buckets: make(map[string]*bucketBuffer, len(txr.txBuffer.buckets)), + buckets: make(map[BucketID]*bucketBuffer, len(txr.txBuffer.buckets)), }, bufVersion: 0, } diff --git a/server/mvcc/buckets/bucket.go b/server/mvcc/buckets/bucket.go new file mode 100644 index 00000000000..9214f72f251 --- /dev/null +++ b/server/mvcc/buckets/bucket.go @@ -0,0 +1,80 @@ +// Copyright 2021 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package buckets + +import ( + "bytes" + + "go.etcd.io/etcd/server/v3/mvcc/backend" +) + +var ( + keyBucketName = []byte("key") + metaBucketName = []byte("meta") + leaseBucketName = []byte("lease") + alarmBucketName = []byte("alarm") + + clusterBucketName = []byte("cluster") + + membersBucketName = []byte("members") + membersRemovedBucketName = []byte("members_removed") + + authBucketName = []byte("auth") + authUsersBucketName = []byte("authUsers") + authRolesBucketName = []byte("authRoles") + + testBucketName = []byte("test") +) + +var ( + Key = backend.Bucket(bucket{id: 1, name: keyBucketName, safeRangeBucket: true}) + Meta = backend.Bucket(bucket{id: 2, name: metaBucketName, safeRangeBucket: false}) + Lease = backend.Bucket(bucket{id: 3, name: leaseBucketName, safeRangeBucket: false}) + Alarm = backend.Bucket(bucket{id: 4, name: alarmBucketName, safeRangeBucket: false}) + Cluster = backend.Bucket(bucket{id: 5, name: clusterBucketName, safeRangeBucket: false}) + + Members = backend.Bucket(bucket{id: 10, name: membersBucketName, safeRangeBucket: false}) + MembersRemoved = backend.Bucket(bucket{id: 11, name: membersRemovedBucketName, safeRangeBucket: false}) + + Auth = backend.Bucket(bucket{id: 20, name: authBucketName, safeRangeBucket: false}) + AuthUsers = backend.Bucket(bucket{id: 21, name: authUsersBucketName, safeRangeBucket: false}) + AuthRoles = backend.Bucket(bucket{id: 22, name: authRolesBucketName, safeRangeBucket: false}) + + Test = backend.Bucket(bucket{id: 100, name: testBucketName, safeRangeBucket: false}) +) + +type bucket struct { + id backend.BucketID + name []byte + safeRangeBucket bool +} + +func (b bucket) ID() backend.BucketID { return b.id } +func (b bucket) Name() []byte { return b.name } +func (b bucket) String() string { return string(b.Name()) } +func (b bucket) IsSafeRangeBucket() bool { return b.safeRangeBucket } + +var ( + MetaConsistentIndexKeyName = []byte("consistent_index") + MetaTermKeyName = []byte("term") +) + +// DefaultIgnores defines buckets & keys to ignore in hash checking. +func DefaultIgnores(bucket, key []byte) bool { + // consistent index & term might be changed due to v2 internal sync, which + // is not controllable by the user. + return bytes.Compare(bucket, Meta.Name()) == 0 && + (bytes.Compare(key, MetaTermKeyName) == 0 || bytes.Compare(key, MetaConsistentIndexKeyName) == 0) +} diff --git a/server/mvcc/kvstore.go b/server/mvcc/kvstore.go index 5116b15cbd0..54055ed0552 100644 --- a/server/mvcc/kvstore.go +++ b/server/mvcc/kvstore.go @@ -26,17 +26,14 @@ import ( "go.etcd.io/etcd/api/v3/mvccpb" "go.etcd.io/etcd/pkg/v3/schedule" "go.etcd.io/etcd/pkg/v3/traceutil" - "go.etcd.io/etcd/server/v3/etcdserver/cindex" "go.etcd.io/etcd/server/v3/lease" "go.etcd.io/etcd/server/v3/mvcc/backend" + "go.etcd.io/etcd/server/v3/mvcc/buckets" "go.uber.org/zap" ) var ( - keyBucketName = []byte("key") - MetaBucketName = cindex.MetaBucketName - scheduledCompactKeyName = []byte("scheduledCompactRev") finishedCompactKeyName = []byte("finishedCompactRev") @@ -123,8 +120,8 @@ func NewStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg StoreConfi tx := s.b.BatchTx() tx.Lock() - tx.UnsafeCreateBucket(keyBucketName) - cindex.UnsafeCreateMetaBucket(tx) + tx.UnsafeCreateBucket(buckets.Key) + tx.UnsafeCreateBucket(buckets.Meta) tx.Unlock() s.b.ForceCommit() @@ -162,7 +159,7 @@ func (s *store) Hash() (hash uint32, revision int64, err error) { start := time.Now() s.b.ForceCommit() - h, err := s.b.Hash(DefaultIgnores) + h, err := s.b.Hash(buckets.DefaultIgnores) hashSec.Observe(time.Since(start).Seconds()) return h, s.currentRev, err @@ -198,8 +195,8 @@ func (s *store) HashByRev(rev int64) (hash uint32, currentRev int64, compactRev lower := revision{main: compactRev + 1} h := crc32.New(crc32.MakeTable(crc32.Castagnoli)) - h.Write(keyBucketName) - err = tx.UnsafeForEach(keyBucketName, func(k, v []byte) error { + h.Write(buckets.Key.Name()) + err = tx.UnsafeForEach(buckets.Key, func(k, v []byte) error { kr := bytesToRev(k) if !upper.GreaterThan(kr) { return nil @@ -242,7 +239,7 @@ func (s *store) updateCompactRev(rev int64) (<-chan struct{}, error) { tx := s.b.BatchTx() tx.Lock() - tx.UnsafePut(MetaBucketName, scheduledCompactKeyName, rbytes) + tx.UnsafePut(buckets.Meta, scheduledCompactKeyName, rbytes) tx.Unlock() // ensure that desired compaction is persisted s.b.ForceCommit() @@ -297,18 +294,6 @@ func (s *store) Compact(trace *traceutil.Trace, rev int64) (<-chan struct{}, err return s.compact(trace, rev) } -// DefaultIgnores is a map of keys to ignore in hash checking. -var DefaultIgnores map[backend.IgnoreKey]struct{} - -func init() { - DefaultIgnores = map[backend.IgnoreKey]struct{}{ - // consistent index might be changed due to v2 internal sync, which - // is not controllable by the user. - {Bucket: string(MetaBucketName), Key: string(cindex.ConsistentIndexKeyName)}: {}, - {Bucket: string(MetaBucketName), Key: string(cindex.TermKeyName)}: {}, - } -} - func (s *store) Commit() { s.mu.Lock() defer s.mu.Unlock() @@ -352,20 +337,20 @@ func (s *store) restore() error { tx := s.b.BatchTx() tx.Lock() - _, finishedCompactBytes := tx.UnsafeRange(MetaBucketName, finishedCompactKeyName, nil, 0) + _, finishedCompactBytes := tx.UnsafeRange(buckets.Meta, finishedCompactKeyName, nil, 0) if len(finishedCompactBytes) != 0 { s.revMu.Lock() s.compactMainRev = bytesToRev(finishedCompactBytes[0]).main s.lg.Info( "restored last compact revision", - zap.String("meta-bucket-name", string(MetaBucketName)), + zap.Stringer("meta-bucket-name", buckets.Meta), zap.String("meta-bucket-name-key", string(finishedCompactKeyName)), zap.Int64("restored-compact-revision", s.compactMainRev), ) s.revMu.Unlock() } - _, scheduledCompactBytes := tx.UnsafeRange(MetaBucketName, scheduledCompactKeyName, nil, 0) + _, scheduledCompactBytes := tx.UnsafeRange(buckets.Meta, scheduledCompactKeyName, nil, 0) scheduledCompact := int64(0) if len(scheduledCompactBytes) != 0 { scheduledCompact = bytesToRev(scheduledCompactBytes[0]).main @@ -375,7 +360,7 @@ func (s *store) restore() error { keysGauge.Set(0) rkvc, revc := restoreIntoIndex(s.lg, s.kvindex) for { - keys, vals := tx.UnsafeRange(keyBucketName, min, max, int64(restoreChunkKeys)) + keys, vals := tx.UnsafeRange(buckets.Key, min, max, int64(restoreChunkKeys)) if len(keys) == 0 { break } @@ -436,7 +421,7 @@ func (s *store) restore() error { s.lg.Info( "resume scheduled compaction", - zap.String("meta-bucket-name", string(MetaBucketName)), + zap.Stringer("meta-bucket-name", buckets.Meta), zap.String("meta-bucket-name-key", string(scheduledCompactKeyName)), zap.Int64("scheduled-compact-revision", scheduledCompact), ) diff --git a/server/mvcc/kvstore_compaction.go b/server/mvcc/kvstore_compaction.go index e056fedcc63..71bd4b7369c 100644 --- a/server/mvcc/kvstore_compaction.go +++ b/server/mvcc/kvstore_compaction.go @@ -18,6 +18,7 @@ import ( "encoding/binary" "time" + "go.etcd.io/etcd/server/v3/mvcc/buckets" "go.uber.org/zap" ) @@ -39,11 +40,11 @@ func (s *store) scheduleCompaction(compactMainRev int64, keep map[revision]struc tx := s.b.BatchTx() tx.Lock() - keys, _ := tx.UnsafeRange(keyBucketName, last, end, int64(s.cfg.CompactionBatchLimit)) + keys, _ := tx.UnsafeRange(buckets.Key, last, end, int64(s.cfg.CompactionBatchLimit)) for _, key := range keys { rev = bytesToRev(key) if _, ok := keep[rev]; !ok { - tx.UnsafeDelete(keyBucketName, key) + tx.UnsafeDelete(buckets.Key, key) keyCompactions++ } } @@ -51,7 +52,7 @@ func (s *store) scheduleCompaction(compactMainRev int64, keep map[revision]struc if len(keys) < s.cfg.CompactionBatchLimit { rbytes := make([]byte, 8+1+8) revToBytes(revision{main: compactMainRev}, rbytes) - tx.UnsafePut(MetaBucketName, finishedCompactKeyName, rbytes) + tx.UnsafePut(buckets.Meta, finishedCompactKeyName, rbytes) tx.Unlock() s.lg.Info( "finished scheduled compaction", diff --git a/server/mvcc/kvstore_compaction_test.go b/server/mvcc/kvstore_compaction_test.go index 4dbf8291300..062050ed163 100644 --- a/server/mvcc/kvstore_compaction_test.go +++ b/server/mvcc/kvstore_compaction_test.go @@ -24,6 +24,7 @@ import ( "go.etcd.io/etcd/pkg/v3/traceutil" "go.etcd.io/etcd/server/v3/lease" betesting "go.etcd.io/etcd/server/v3/mvcc/backend/testing" + "go.etcd.io/etcd/server/v3/mvcc/buckets" "go.uber.org/zap" ) @@ -74,7 +75,7 @@ func TestScheduleCompaction(t *testing.T) { ibytes := newRevBytes() for _, rev := range revs { revToBytes(rev, ibytes) - tx.UnsafePut(keyBucketName, ibytes, []byte("bar")) + tx.UnsafePut(buckets.Key, ibytes, []byte("bar")) } tx.Unlock() @@ -83,12 +84,12 @@ func TestScheduleCompaction(t *testing.T) { tx.Lock() for _, rev := range tt.wrevs { revToBytes(rev, ibytes) - keys, _ := tx.UnsafeRange(keyBucketName, ibytes, nil, 0) + keys, _ := tx.UnsafeRange(buckets.Key, ibytes, nil, 0) if len(keys) != 1 { t.Errorf("#%d: range on %v = %d, want 1", i, rev, len(keys)) } } - _, vals := tx.UnsafeRange(MetaBucketName, finishedCompactKeyName, nil, 0) + _, vals := tx.UnsafeRange(buckets.Meta, finishedCompactKeyName, nil, 0) revToBytes(revision{main: tt.rev}, ibytes) if w := [][]byte{ibytes}; !reflect.DeepEqual(vals, w) { t.Errorf("#%d: vals on %v = %+v, want %+v", i, finishedCompactKeyName, vals, w) diff --git a/server/mvcc/kvstore_test.go b/server/mvcc/kvstore_test.go index a60cb8de409..5de0a195cf2 100644 --- a/server/mvcc/kvstore_test.go +++ b/server/mvcc/kvstore_test.go @@ -37,6 +37,7 @@ import ( "go.etcd.io/etcd/server/v3/lease" "go.etcd.io/etcd/server/v3/mvcc/backend" betesting "go.etcd.io/etcd/server/v3/mvcc/backend/testing" + "go.etcd.io/etcd/server/v3/mvcc/buckets" "go.uber.org/zap" ) @@ -148,12 +149,12 @@ func TestStorePut(t *testing.T) { } wact := []testutil.Action{ - {Name: "seqput", Params: []interface{}{keyBucketName, tt.wkey, data}}, + {Name: "seqput", Params: []interface{}{buckets.Key, tt.wkey, data}}, } if tt.rr != nil { wact = []testutil.Action{ - {Name: "seqput", Params: []interface{}{keyBucketName, tt.wkey, data}}, + {Name: "seqput", Params: []interface{}{buckets.Key, tt.wkey, data}}, } } @@ -228,7 +229,7 @@ func TestStoreRange(t *testing.T) { wstart := newRevBytes() revToBytes(tt.idxr.revs[0], wstart) wact := []testutil.Action{ - {Name: "range", Params: []interface{}{keyBucketName, wstart, []byte(nil), int64(0)}}, + {Name: "range", Params: []interface{}{buckets.Key, wstart, []byte(nil), int64(0)}}, } if g := b.tx.Action(); !reflect.DeepEqual(g, wact) { t.Errorf("#%d: tx action = %+v, want %+v", i, g, wact) @@ -303,7 +304,7 @@ func TestStoreDeleteRange(t *testing.T) { t.Errorf("#%d: marshal err = %v, want nil", i, err) } wact := []testutil.Action{ - {Name: "seqput", Params: []interface{}{keyBucketName, tt.wkey, data}}, + {Name: "seqput", Params: []interface{}{buckets.Key, tt.wkey, data}}, } if g := b.tx.Action(); !reflect.DeepEqual(g, wact) { t.Errorf("#%d: tx action = %+v, want %+v", i, g, wact) @@ -342,10 +343,10 @@ func TestStoreCompact(t *testing.T) { end := make([]byte, 8) binary.BigEndian.PutUint64(end, uint64(4)) wact := []testutil.Action{ - {Name: "put", Params: []interface{}{MetaBucketName, scheduledCompactKeyName, newTestRevBytes(revision{3, 0})}}, - {Name: "range", Params: []interface{}{keyBucketName, make([]byte, 17), end, int64(10000)}}, - {Name: "delete", Params: []interface{}{keyBucketName, key2}}, - {Name: "put", Params: []interface{}{MetaBucketName, finishedCompactKeyName, newTestRevBytes(revision{3, 0})}}, + {Name: "put", Params: []interface{}{buckets.Meta, scheduledCompactKeyName, newTestRevBytes(revision{3, 0})}}, + {Name: "range", Params: []interface{}{buckets.Key, make([]byte, 17), end, int64(10000)}}, + {Name: "delete", Params: []interface{}{buckets.Key, key2}}, + {Name: "put", Params: []interface{}{buckets.Meta, finishedCompactKeyName, newTestRevBytes(revision{3, 0})}}, } if g := b.tx.Action(); !reflect.DeepEqual(g, wact) { t.Errorf("tx actions = %+v, want %+v", g, wact) @@ -398,9 +399,9 @@ func TestStoreRestore(t *testing.T) { t.Errorf("current rev = %v, want 5", s.currentRev) } wact := []testutil.Action{ - {Name: "range", Params: []interface{}{MetaBucketName, finishedCompactKeyName, []byte(nil), int64(0)}}, - {Name: "range", Params: []interface{}{MetaBucketName, scheduledCompactKeyName, []byte(nil), int64(0)}}, - {Name: "range", Params: []interface{}{keyBucketName, newTestRevBytes(revision{1, 0}), newTestRevBytes(revision{math.MaxInt64, math.MaxInt64}), int64(restoreChunkKeys)}}, + {Name: "range", Params: []interface{}{buckets.Meta, finishedCompactKeyName, []byte(nil), int64(0)}}, + {Name: "range", Params: []interface{}{buckets.Meta, scheduledCompactKeyName, []byte(nil), int64(0)}}, + {Name: "range", Params: []interface{}{buckets.Key, newTestRevBytes(revision{1, 0}), newTestRevBytes(revision{math.MaxInt64, math.MaxInt64}), int64(restoreChunkKeys)}}, } if g := b.tx.Action(); !reflect.DeepEqual(g, wact) { t.Errorf("tx actions = %+v, want %+v", g, wact) @@ -484,7 +485,7 @@ func TestRestoreContinueUnfinishedCompaction(t *testing.T) { revToBytes(revision{main: 2}, rbytes) tx := s0.b.BatchTx() tx.Lock() - tx.UnsafePut(MetaBucketName, scheduledCompactKeyName, rbytes) + tx.UnsafePut(buckets.Meta, scheduledCompactKeyName, rbytes) tx.Unlock() s0.Close() @@ -513,7 +514,7 @@ func TestRestoreContinueUnfinishedCompaction(t *testing.T) { for i := 0; i < 5; i++ { tx = s.b.BatchTx() tx.Lock() - ks, _ := tx.UnsafeRange(keyBucketName, revbytes, nil, 0) + ks, _ := tx.UnsafeRange(buckets.Key, revbytes, nil, 0) tx.Unlock() if len(ks) != 0 { time.Sleep(100 * time.Millisecond) @@ -870,27 +871,27 @@ type fakeBatchTx struct { rangeRespc chan rangeResp } -func (b *fakeBatchTx) Lock() {} -func (b *fakeBatchTx) Unlock() {} -func (b *fakeBatchTx) RLock() {} -func (b *fakeBatchTx) RUnlock() {} -func (b *fakeBatchTx) UnsafeCreateBucket(name []byte) {} -func (b *fakeBatchTx) UnsafeDeleteBucket(name []byte) {} -func (b *fakeBatchTx) UnsafePut(bucketName []byte, key []byte, value []byte) { - b.Recorder.Record(testutil.Action{Name: "put", Params: []interface{}{bucketName, key, value}}) +func (b *fakeBatchTx) Lock() {} +func (b *fakeBatchTx) Unlock() {} +func (b *fakeBatchTx) RLock() {} +func (b *fakeBatchTx) RUnlock() {} +func (b *fakeBatchTx) UnsafeCreateBucket(bucket backend.Bucket) {} +func (b *fakeBatchTx) UnsafeDeleteBucket(bucket backend.Bucket) {} +func (b *fakeBatchTx) UnsafePut(bucket backend.Bucket, key []byte, value []byte) { + b.Recorder.Record(testutil.Action{Name: "put", Params: []interface{}{bucket, key, value}}) } -func (b *fakeBatchTx) UnsafeSeqPut(bucketName []byte, key []byte, value []byte) { - b.Recorder.Record(testutil.Action{Name: "seqput", Params: []interface{}{bucketName, key, value}}) +func (b *fakeBatchTx) UnsafeSeqPut(bucket backend.Bucket, key []byte, value []byte) { + b.Recorder.Record(testutil.Action{Name: "seqput", Params: []interface{}{bucket, key, value}}) } -func (b *fakeBatchTx) UnsafeRange(bucketName []byte, key, endKey []byte, limit int64) (keys [][]byte, vals [][]byte) { - b.Recorder.Record(testutil.Action{Name: "range", Params: []interface{}{bucketName, key, endKey, limit}}) +func (b *fakeBatchTx) UnsafeRange(bucket backend.Bucket, key, endKey []byte, limit int64) (keys [][]byte, vals [][]byte) { + b.Recorder.Record(testutil.Action{Name: "range", Params: []interface{}{bucket, key, endKey, limit}}) r := <-b.rangeRespc return r.keys, r.vals } -func (b *fakeBatchTx) UnsafeDelete(bucketName []byte, key []byte) { - b.Recorder.Record(testutil.Action{Name: "delete", Params: []interface{}{bucketName, key}}) +func (b *fakeBatchTx) UnsafeDelete(bucket backend.Bucket, key []byte) { + b.Recorder.Record(testutil.Action{Name: "delete", Params: []interface{}{bucket, key}}) } -func (b *fakeBatchTx) UnsafeForEach(bucketName []byte, visitor func(k, v []byte) error) error { +func (b *fakeBatchTx) UnsafeForEach(bucket backend.Bucket, visitor func(k, v []byte) error) error { return nil } func (b *fakeBatchTx) Commit() {} @@ -900,17 +901,17 @@ type fakeBackend struct { tx *fakeBatchTx } -func (b *fakeBackend) BatchTx() backend.BatchTx { return b.tx } -func (b *fakeBackend) ReadTx() backend.ReadTx { return b.tx } -func (b *fakeBackend) ConcurrentReadTx() backend.ReadTx { return b.tx } -func (b *fakeBackend) Hash(ignores map[backend.IgnoreKey]struct{}) (uint32, error) { return 0, nil } -func (b *fakeBackend) Size() int64 { return 0 } -func (b *fakeBackend) SizeInUse() int64 { return 0 } -func (b *fakeBackend) OpenReadTxN() int64 { return 0 } -func (b *fakeBackend) Snapshot() backend.Snapshot { return nil } -func (b *fakeBackend) ForceCommit() {} -func (b *fakeBackend) Defrag() error { return nil } -func (b *fakeBackend) Close() error { return nil } +func (b *fakeBackend) BatchTx() backend.BatchTx { return b.tx } +func (b *fakeBackend) ReadTx() backend.ReadTx { return b.tx } +func (b *fakeBackend) ConcurrentReadTx() backend.ReadTx { return b.tx } +func (b *fakeBackend) Hash(func(bucketName, keyName []byte) bool) (uint32, error) { return 0, nil } +func (b *fakeBackend) Size() int64 { return 0 } +func (b *fakeBackend) SizeInUse() int64 { return 0 } +func (b *fakeBackend) OpenReadTxN() int64 { return 0 } +func (b *fakeBackend) Snapshot() backend.Snapshot { return nil } +func (b *fakeBackend) ForceCommit() {} +func (b *fakeBackend) Defrag() error { return nil } +func (b *fakeBackend) Close() error { return nil } type indexGetResp struct { rev revision diff --git a/server/mvcc/kvstore_txn.go b/server/mvcc/kvstore_txn.go index 42cfb2e2bee..16222817611 100644 --- a/server/mvcc/kvstore_txn.go +++ b/server/mvcc/kvstore_txn.go @@ -21,6 +21,7 @@ import ( "go.etcd.io/etcd/pkg/v3/traceutil" "go.etcd.io/etcd/server/v3/lease" "go.etcd.io/etcd/server/v3/mvcc/backend" + "go.etcd.io/etcd/server/v3/mvcc/buckets" "go.uber.org/zap" ) @@ -159,7 +160,7 @@ func (tr *storeTxnRead) rangeKeys(ctx context.Context, key, end []byte, curRev i default: } revToBytes(revpair, revBytes) - _, vs := tr.tx.UnsafeRange(keyBucketName, revBytes, nil, 0) + _, vs := tr.tx.UnsafeRange(buckets.Key, revBytes, nil, 0) if len(vs) != 1 { tr.s.lg.Fatal( "range failed to find revision pair", @@ -214,7 +215,7 @@ func (tw *storeTxnWrite) put(key, value []byte, leaseID lease.LeaseID) { } tw.trace.Step("marshal mvccpb.KeyValue") - tw.tx.UnsafeSeqPut(keyBucketName, ibytes, d) + tw.tx.UnsafeSeqPut(buckets.Key, ibytes, d) tw.s.kvindex.Put(key, idxRev) tw.changes = append(tw.changes, kv) tw.trace.Step("store kv pair into bolt db") @@ -275,7 +276,7 @@ func (tw *storeTxnWrite) delete(key []byte) { ) } - tw.tx.UnsafeSeqPut(keyBucketName, ibytes, d) + tw.tx.UnsafeSeqPut(buckets.Key, ibytes, d) err = tw.s.kvindex.Tombstone(key, idxRev) if err != nil { tw.storeTxnRead.s.lg.Fatal( diff --git a/server/mvcc/util.go b/server/mvcc/util.go index 25467609054..83cbf44bf84 100644 --- a/server/mvcc/util.go +++ b/server/mvcc/util.go @@ -19,6 +19,7 @@ import ( "go.etcd.io/etcd/api/v3/mvccpb" "go.etcd.io/etcd/server/v3/mvcc/backend" + "go.etcd.io/etcd/server/v3/mvcc/buckets" ) func WriteKV(be backend.Backend, kv mvccpb.KeyValue) { @@ -31,6 +32,6 @@ func WriteKV(be backend.Backend, kv mvccpb.KeyValue) { } be.BatchTx().Lock() - be.BatchTx().UnsafePut(keyBucketName, ibytes, d) + be.BatchTx().UnsafePut(buckets.Key, ibytes, d) be.BatchTx().Unlock() } diff --git a/server/mvcc/watchable_store.go b/server/mvcc/watchable_store.go index 63529ed672e..3c7edb33371 100644 --- a/server/mvcc/watchable_store.go +++ b/server/mvcc/watchable_store.go @@ -22,6 +22,7 @@ import ( "go.etcd.io/etcd/pkg/v3/traceutil" "go.etcd.io/etcd/server/v3/lease" "go.etcd.io/etcd/server/v3/mvcc/backend" + "go.etcd.io/etcd/server/v3/mvcc/buckets" "go.uber.org/zap" ) @@ -353,7 +354,7 @@ func (s *watchableStore) syncWatchers() int { // values are actual key-value pairs in backend. tx := s.store.b.ReadTx() tx.RLock() - revs, vs := tx.UnsafeRange(keyBucketName, minBytes, maxBytes, 0) + revs, vs := tx.UnsafeRange(buckets.Key, minBytes, maxBytes, 0) tx.RUnlock() evs := kvsToEvents(s.store.lg, wg, revs, vs) diff --git a/tools/etcd-dump-db/backend.go b/tools/etcd-dump-db/backend.go index 1cc7706d6da..3fe609d9cc4 100644 --- a/tools/etcd-dump-db/backend.go +++ b/tools/etcd-dump-db/backend.go @@ -20,10 +20,10 @@ import ( "path/filepath" "go.etcd.io/etcd/api/v3/authpb" + "go.etcd.io/etcd/server/v3/mvcc/buckets" "go.etcd.io/etcd/api/v3/mvccpb" "go.etcd.io/etcd/server/v3/lease/leasepb" - "go.etcd.io/etcd/server/v3/mvcc" "go.etcd.io/etcd/server/v3/mvcc/backend" bolt "go.etcd.io/bbolt" @@ -163,7 +163,7 @@ func iterateBucket(dbPath, bucket string, limit uint64, decode bool) (err error) func getHash(dbPath string) (hash uint32, err error) { b := backend.NewDefaultBackend(dbPath) - return b.Hash(mvcc.DefaultIgnores) + return b.Hash(buckets.DefaultIgnores) } // TODO: revert by revision and find specified hash value