Skip to content

Commit

Permalink
Update etcdutl migrate command: load wal records from the latest snap…
Browse files Browse the repository at this point in the history
…shot

Signed-off-by: Benjamin Wang <[email protected]>
  • Loading branch information
ahrtr committed Jan 7, 2025
1 parent aac7ef6 commit d17821f
Show file tree
Hide file tree
Showing 2 changed files with 170 additions and 1 deletion.
28 changes: 27 additions & 1 deletion etcdutl/etcdutl/migrate_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package etcdutl

import (
"errors"
"fmt"
"strings"

Expand All @@ -24,6 +25,7 @@ import (

"go.etcd.io/etcd/api/v3/version"
"go.etcd.io/etcd/pkg/v3/cobrautl"
"go.etcd.io/etcd/server/v3/etcdserver/api/snap"
"go.etcd.io/etcd/server/v3/storage/backend"
"go.etcd.io/etcd/server/v3/storage/datadir"
"go.etcd.io/etcd/server/v3/storage/schema"
Expand Down Expand Up @@ -95,7 +97,11 @@ func (o *migrateOptions) Config() (*migrateConfig, error) {
c.be = backend.NewDefaultBackend(GetLogger(), dbPath)

walPath := datadir.ToWALDir(o.dataDir)
w, err := wal.OpenForRead(c.lg, walPath, walpb.Snapshot{})
walSnap, err := getLatestWALSnap(c.lg, o.dataDir)
if err != nil {
return nil, fmt.Errorf("failed to get the lastest snapshot: %w", err)
}
w, err := wal.OpenForRead(c.lg, walPath, walSnap)
if err != nil {
return nil, fmt.Errorf(`failed to open wal: %w`, err)
}
Expand All @@ -108,6 +114,26 @@ func (o *migrateOptions) Config() (*migrateConfig, error) {
return c, nil
}

func getLatestWALSnap(lg *zap.Logger, dataDir string) (walpb.Snapshot, error) {
walPath := datadir.ToWALDir(dataDir)
walSnaps, err := wal.ValidSnapshotEntries(lg, walPath)
if err != nil {
return walpb.Snapshot{}, err
}

ss := snap.New(lg, datadir.ToSnapDir(dataDir))
snapshot, err := ss.LoadNewestAvailable(walSnaps)
if err != nil && !errors.Is(err, snap.ErrNoSnapshot) {
return walpb.Snapshot{}, err
}

var walsnap walpb.Snapshot
if snapshot != nil {
walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term
}
return walsnap, nil
}

type migrateConfig struct {
lg *zap.Logger
be backend.Backend
Expand Down
143 changes: 143 additions & 0 deletions etcdutl/etcdutl/migrate_command_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
// Copyright 2025 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package etcdutl

import (
"testing"

"github.com/stretchr/testify/require"
"go.uber.org/zap"

"go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/client/pkg/v3/fileutil"
"go.etcd.io/etcd/pkg/v3/pbutil"
"go.etcd.io/etcd/server/v3/etcdserver/api/snap"
"go.etcd.io/etcd/server/v3/storage/datadir"
"go.etcd.io/etcd/server/v3/storage/wal"
"go.etcd.io/etcd/server/v3/storage/wal/walpb"
"go.etcd.io/raft/v3/raftpb"
)

func TestGetLatestWalSnap(t *testing.T) {
testCases := []struct {
name string
walSnaps []walpb.Snapshot
snapshots []raftpb.Snapshot
expectedLatestWALSnap walpb.Snapshot
}{
{
name: "wal snapshot records match the snapshot files",
walSnaps: []walpb.Snapshot{
{Index: 10, Term: 2},
{Index: 20, Term: 3},
{Index: 30, Term: 5},
},
snapshots: []raftpb.Snapshot{
{Metadata: raftpb.SnapshotMetadata{Index: 10, Term: 2}},
{Metadata: raftpb.SnapshotMetadata{Index: 20, Term: 3}},
{Metadata: raftpb.SnapshotMetadata{Index: 30, Term: 5}},
},
expectedLatestWALSnap: walpb.Snapshot{Index: 30, Term: 5},
},
{
name: "there are orphan snapshot files",
walSnaps: []walpb.Snapshot{
{Index: 10, Term: 2},
{Index: 20, Term: 3},
{Index: 35, Term: 5},
},
snapshots: []raftpb.Snapshot{
{Metadata: raftpb.SnapshotMetadata{Index: 10, Term: 2}},
{Metadata: raftpb.SnapshotMetadata{Index: 20, Term: 3}},
{Metadata: raftpb.SnapshotMetadata{Index: 35, Term: 5}},
{Metadata: raftpb.SnapshotMetadata{Index: 40, Term: 6}},
{Metadata: raftpb.SnapshotMetadata{Index: 50, Term: 7}},
},
expectedLatestWALSnap: walpb.Snapshot{Index: 35, Term: 5},
},
{
name: "there are orphan snapshot records in wal file",
walSnaps: []walpb.Snapshot{
{Index: 10, Term: 2},
{Index: 20, Term: 3},
{Index: 30, Term: 4},
{Index: 45, Term: 5},
{Index: 55, Term: 6},
},
snapshots: []raftpb.Snapshot{
{Metadata: raftpb.SnapshotMetadata{Index: 10, Term: 2}},
{Metadata: raftpb.SnapshotMetadata{Index: 20, Term: 3}},
{Metadata: raftpb.SnapshotMetadata{Index: 30, Term: 4}},
},
expectedLatestWALSnap: walpb.Snapshot{Index: 30, Term: 4},
},
{
name: "wal snapshot records do not match the snapshot files at all",
walSnaps: []walpb.Snapshot{
{Index: 10, Term: 2},
{Index: 20, Term: 3},
{Index: 30, Term: 4},
},
snapshots: []raftpb.Snapshot{
{Metadata: raftpb.SnapshotMetadata{Index: 40, Term: 5}},
{Metadata: raftpb.SnapshotMetadata{Index: 50, Term: 6}},
},
expectedLatestWALSnap: walpb.Snapshot{},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
dataDir := t.TempDir()
lg := zap.NewNop()

require.NoError(t, fileutil.TouchDirAll(lg, datadir.ToMemberDir(dataDir)))
require.NoError(t, fileutil.TouchDirAll(lg, datadir.ToWALDir(dataDir)))
require.NoError(t, fileutil.TouchDirAll(lg, datadir.ToSnapDir(dataDir)))

// populate wal file
w, err := wal.Create(lg, datadir.ToWALDir(dataDir), pbutil.MustMarshal(
&etcdserverpb.Metadata{
NodeID: 1,
ClusterID: 2,
},
))
require.NoError(t, err)

for _, walSnap := range tc.walSnaps {
walSnap.ConfState = &raftpb.ConfState{Voters: []uint64{1}}
walErr := w.SaveSnapshot(walSnap)
require.NoError(t, walErr)
walErr = w.Save(raftpb.HardState{Term: walSnap.Term, Commit: walSnap.Index, Vote: 1}, nil)
require.NoError(t, walErr)
}
err = w.Close()
require.NoError(t, err)

// generate snapshot files
ss := snap.New(lg, datadir.ToSnapDir(dataDir))
for _, snap := range tc.snapshots {
snap.Metadata.ConfState = raftpb.ConfState{Voters: []uint64{1}}
snapErr := ss.SaveSnap(snap)
require.NoError(t, snapErr)
}

walSnap, err := getLatestWALSnap(lg, dataDir)
require.NoError(t, err)

require.Equal(t, tc.expectedLatestWALSnap, walSnap)
})
}
}

0 comments on commit d17821f

Please sign in to comment.