Skip to content

Commit

Permalink
state: Implement a serial number for states.
Browse files Browse the repository at this point in the history
* Basic idea is to have a "generation number" like serial, that will let
us deal with maintenance by detecting compacted / aggregated state.

* Initial serial is the repository id to avoid any race issues with
concurrent first time backups.
  • Loading branch information
mathieu-plak committed Feb 3, 2025
1 parent 9a5bd20 commit 6a3128e
Show file tree
Hide file tree
Showing 7 changed files with 72 additions and 36 deletions.
6 changes: 3 additions & 3 deletions caching/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ func (c *_RepositoryCache) DelState(stateID objects.Checksum) error {
return c.delete("__state__", fmt.Sprintf("%x", stateID))
}

func (c *_RepositoryCache) GetStates() ([]objects.Checksum, error) {
ret := make([]objects.Checksum, 0)
func (c *_RepositoryCache) GetStates() (map[objects.Checksum][]byte, error) {
ret := make(map[objects.Checksum][]byte, 0)
iter := c.db.NewIterator(nil, nil)
defer iter.Release()

Expand All @@ -93,7 +93,7 @@ func (c *_RepositoryCache) GetStates() ([]objects.Checksum, error) {
fmt.Printf("Error decoding state ID: %v\n", err)
return nil, err
}
ret = append(ret, stateID)
ret[stateID] = iter.Value()
}

return ret, nil
Expand Down
2 changes: 1 addition & 1 deletion caching/scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func (c *ScanCache) GetState(stateID objects.Checksum) ([]byte, error) {
panic("GetState should never be used on the ScanCache backend")
}

func (c *ScanCache) GetStates() ([]objects.Checksum, error) {
func (c *ScanCache) GetStates() (map[objects.Checksum][]byte, error) {
panic("GetStates should never be used on the ScanCache backend")
}

Expand Down
2 changes: 1 addition & 1 deletion caching/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ type StateCache interface {
HasState(stateID objects.Checksum) (bool, error)
GetState(stateID objects.Checksum) ([]byte, error)
DelState(stateID objects.Checksum) error
GetStates() ([]objects.Checksum, error)
GetStates() (map[objects.Checksum][]byte, error)

PutDelta(blobType packfile.Type, blobCsum objects.Checksum, data []byte) error
GetDelta(blobType packfile.Type, blobCsum objects.Checksum) ([]byte, error)
Expand Down
7 changes: 1 addition & 6 deletions cmd/plakar/subcommands/info/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,12 +309,7 @@ func info_state(repo *repository.Repository, args []string) error {

fmt.Printf("Version: %d.%d.%d\n", st.Metadata.Version/100, (st.Metadata.Version/10)%10, st.Metadata.Version%10)
fmt.Printf("Creation: %s\n", st.Metadata.Timestamp)
if len(st.Metadata.Extends) > 0 {
fmt.Printf("Extends:\n")
for _, stateID := range st.Metadata.Extends {
fmt.Printf(" %x\n", stateID)
}
}
fmt.Printf("State serial: %s\n", st.Metadata.Serial)

printBlobs := func(name string, Type packfile.Type) {
for snapshot, err := range st.ListObjectsOfType(Type) {
Expand Down
13 changes: 12 additions & 1 deletion repository/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
_ "github.com/PlakarKorp/go-cdc-chunkers/chunkers/fastcdc"
_ "github.com/PlakarKorp/go-cdc-chunkers/chunkers/ultracdc"
"github.com/PlakarKorp/plakar/appcontext"
"github.com/PlakarKorp/plakar/caching"
"github.com/PlakarKorp/plakar/compression"
"github.com/PlakarKorp/plakar/encryption"
"github.com/PlakarKorp/plakar/hashing"
Expand Down Expand Up @@ -54,6 +55,7 @@ func New(ctx *appcontext.AppContext, store storage.Store, secret []byte) (*Repos
if err := r.RebuildState(); err != nil {
return nil, err
}

return r, nil
}

Expand Down Expand Up @@ -107,7 +109,7 @@ func (r *Repository) RebuildState() error {
// build delta of local and remote states
localStatesMap := make(map[objects.Checksum]struct{})
outdatedStates := make([]objects.Checksum, 0)
for _, stateID := range localStates {
for stateID := range localStates {
localStatesMap[stateID] = struct{}{}

if _, exists := remoteStatesMap[stateID]; !exists {
Expand Down Expand Up @@ -141,6 +143,11 @@ func (r *Repository) RebuildState() error {
}

r.state = aggregatedState

// The first Serial id is our repository ID, this allows us to deal
// naturally with concurrent first backups.
r.state.UpdateSerialOr(r.configuration.RepositoryID)

return nil
}

Expand Down Expand Up @@ -270,6 +277,10 @@ func (r *Repository) Chunker(rd io.ReadCloser) (*chunkers.Chunker, error) {
})
}

func (r *Repository) NewStateDelta(cache *caching.ScanCache) *state.LocalState {
return r.state.Derive(cache)
}

func (r *Repository) Location() string {
return r.store.Location()
}
Expand Down
76 changes: 53 additions & 23 deletions repository/state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/PlakarKorp/plakar/caching"
"github.com/PlakarKorp/plakar/objects"
"github.com/PlakarKorp/plakar/packfile"
"github.com/google/uuid"
"github.com/vmihailenco/msgpack/v5"
)

Expand All @@ -41,10 +42,10 @@ const (
)

type Metadata struct {
Version uint32 `msgpack:"version"`
Timestamp time.Time `msgpack:"timestamp"`
Aggregate bool `msgpack:"aggregate"`
Extends []objects.Checksum `msgpack:"extends"`
Version uint32 `msgpack:"version"`
Timestamp time.Time `msgpack:"timestamp"`
Aggregate bool `msgpack:"aggregate"`
Serial uuid.UUID `msgpack:"serial"`
}

type Location struct {
Expand Down Expand Up @@ -93,7 +94,6 @@ func NewLocalState(cache caching.StateCache) *LocalState {
Version: VERSION,
Timestamp: time.Now(),
Aggregate: false,
Extends: []objects.Checksum{},
},
cache: cache,
}
Expand All @@ -109,6 +109,48 @@ func FromStream(rd io.Reader, cache caching.StateCache) (*LocalState, error) {
}
}

// Derive constructs a new state backed by *cache*, keeping the same serial as previous one.
// Mainly used to construct Delta states when backing up.
func (ls *LocalState) Derive(cache caching.StateCache) *LocalState {
st := NewLocalState(cache)
st.Metadata.Serial = ls.Metadata.Serial

return st
}

// Finds the latest (current) serial in the aggregate state, and if none sets
// it to the provided one.
func (ls *LocalState) UpdateSerialOr(serial uuid.UUID) error {
var latestID *objects.Checksum = nil
var latestMT *Metadata = nil

states, err := ls.cache.GetStates()
if err != nil {
return err
}

for stateID, buf := range states {
mt, err := MetadataFromBytes(buf)

if err != nil {
return err
}

if latestID == nil || latestMT.Timestamp.Before(mt.Timestamp) {
latestID = &stateID
latestMT = mt
}
}

if latestMT != nil {
ls.Metadata.Serial = latestMT.Serial
} else {
ls.Metadata.Serial = serial
}

return nil
}

/* Insert the state denotated by stateID and its associated delta entries read from rd */
func (ls *LocalState) InsertState(stateID objects.Checksum, rd io.Reader) error {
has, err := ls.HasState(stateID)
Expand Down Expand Up @@ -182,13 +224,8 @@ func (ls *LocalState) SerializeToStream(w io.Writer) error {
return fmt.Errorf("failed to write aggregate flag: %w", err)
}
}
if err := writeUint64(uint64(len(ls.Metadata.Extends))); err != nil {
return fmt.Errorf("failed to write extends length: %w", err)
}
for _, checksum := range ls.Metadata.Extends {
if _, err := w.Write(checksum[:]); err != nil {
return fmt.Errorf("failed to write checksum: %w", err)
}
if _, err := w.Write(ls.Metadata.Serial[:]); err != nil {
return fmt.Errorf("failed to write serial flag: %w", err)
}

return nil
Expand Down Expand Up @@ -308,18 +345,11 @@ func (ls *LocalState) deserializeFromStream(r io.Reader) error {
}
ls.Metadata.Aggregate = aggregate[0] == 1

extendsLen, err := readUint64()
if err != nil {
return fmt.Errorf("failed to read extends length: %w", err)
}
ls.Metadata.Extends = make([]objects.Checksum, extendsLen)
for i := uint64(0); i < extendsLen; i++ {
var checksum objects.Checksum
if _, err := io.ReadFull(r, checksum[:]); err != nil {
return fmt.Errorf("failed to read checksum: %w", err)
}
ls.Metadata.Extends[i] = checksum
serial := make([]byte, len(uuid.UUID{}))
if _, err := io.ReadFull(r, serial); err != nil {
return fmt.Errorf("failed to read serial: %w", err)
}
ls.Metadata.Serial = uuid.UUID(serial)

return nil
}
Expand Down
2 changes: 1 addition & 1 deletion snapshot/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func New(repo *repository.Repository) (*Snapshot, error) {
packerChanDone: make(chan bool),
}

snap.deltaState = state.NewLocalState(scanCache)
snap.deltaState = repo.NewStateDelta(scanCache)

if snap.AppContext().Identity != uuid.Nil {
snap.Header.Identity.Identifier = snap.AppContext().Identity
Expand Down

0 comments on commit 6a3128e

Please sign in to comment.