tka: compact TKA storage on startup
Signed-off-by: Tom DNetto <tom@tailscale.com>
This commit is contained in:
+99
-5
@@ -200,6 +200,11 @@ func ChonkDir(dir string) (*FS, error) {
|
||||
if !stat.IsDir() {
|
||||
return nil, fmt.Errorf("chonk directory %q is a file", dir)
|
||||
}
|
||||
|
||||
// TODO(tom): *FS marks AUMs as deleted but does not actually
|
||||
// delete them, to avoid data loss in the event of a bug.
|
||||
// Implement deletion after we are fairly sure in the implementation.
|
||||
|
||||
return &FS{base: dir}, nil
|
||||
}
|
||||
|
||||
@@ -213,8 +218,17 @@ func ChonkDir(dir string) (*FS, error) {
|
||||
// much smaller than JSON for AUMs. The 'keyasint' thing isn't essential
|
||||
// but again it saves a bunch of bytes.
|
||||
type fsHashInfo struct {
|
||||
Children []AUMHash `cbor:"1,keyasint"`
|
||||
AUM *AUM `cbor:"2,keyasint"`
|
||||
Children []AUMHash `cbor:"1,keyasint"`
|
||||
AUM *AUM `cbor:"2,keyasint"`
|
||||
CreatedUnix int64 `cbor:"3,keyasint,omitempty"`
|
||||
|
||||
// PurgedUnix is set when the AUM is deleted. The value is
|
||||
// the unix epoch at the time it was deleted.
|
||||
//
|
||||
// While a non-zero PurgedUnix symbolizes the AUM is deleted,
|
||||
// the fsHashInfo entry can continue to exist to track children
|
||||
// of this AUMHash.
|
||||
PurgedUnix int64 `cbor:"4,keyasint,omitempty"`
|
||||
}
|
||||
|
||||
// aumDir returns the directory an AUM is stored in, and its filename
|
||||
@@ -238,12 +252,45 @@ func (c *FS) AUM(hash AUMHash) (AUM, error) {
|
||||
}
|
||||
return AUM{}, err
|
||||
}
|
||||
if info.AUM == nil {
|
||||
if info.AUM == nil || info.PurgedUnix > 0 {
|
||||
return AUM{}, os.ErrNotExist
|
||||
}
|
||||
return *info.AUM, nil
|
||||
}
|
||||
|
||||
// CommitTime returns the time at which the AUM was committed.
|
||||
//
|
||||
// If the AUM does not exist, then os.ErrNotExist is returned.
|
||||
func (c *FS) CommitTime(h AUMHash) (time.Time, error) {
|
||||
c.mu.RLock()
|
||||
defer c.mu.RUnlock()
|
||||
|
||||
info, err := c.get(h)
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
return time.Time{}, os.ErrNotExist
|
||||
}
|
||||
return time.Time{}, err
|
||||
}
|
||||
if info.PurgedUnix > 0 {
|
||||
return time.Time{}, os.ErrNotExist
|
||||
}
|
||||
if info.CreatedUnix > 0 {
|
||||
return time.Unix(info.CreatedUnix, 0), nil
|
||||
}
|
||||
|
||||
// If we got this far, the AUM exists but CreatedUnix is not
|
||||
// set, presumably because this AUM was committed using a version
|
||||
// of tailscaled that pre-dates the introduction of CreatedUnix.
|
||||
// As such, we use the file modification time as a suitable analog.
|
||||
dir, base := c.aumDir(h)
|
||||
s, err := os.Stat(filepath.Join(dir, base))
|
||||
if err != nil {
|
||||
return time.Time{}, nil
|
||||
}
|
||||
return s.ModTime(), nil
|
||||
}
|
||||
|
||||
// AUM returns any known AUMs with a specific parent hash.
|
||||
func (c *FS) ChildAUMs(prevAUMHash AUMHash) ([]AUM, error) {
|
||||
c.mu.RLock()
|
||||
@@ -257,6 +304,9 @@ func (c *FS) ChildAUMs(prevAUMHash AUMHash) ([]AUM, error) {
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
// NOTE(tom): We don't check PurgedUnix here because 'purged'
|
||||
// only applies to that specific AUM (i.e. info.AUM) and not to
|
||||
// any information about children stored against that hash.
|
||||
|
||||
out := make([]AUM, len(info.Children))
|
||||
for i, h := range info.Children {
|
||||
@@ -265,7 +315,7 @@ func (c *FS) ChildAUMs(prevAUMHash AUMHash) ([]AUM, error) {
|
||||
// We expect any AUM recorded as a child on its parent to exist.
|
||||
return nil, fmt.Errorf("reading child %d of %x: %v", i, h, err)
|
||||
}
|
||||
if c.AUM == nil {
|
||||
if c.AUM == nil || c.PurgedUnix > 0 {
|
||||
return nil, fmt.Errorf("child %d of %x: AUM not stored", i, h)
|
||||
}
|
||||
out[i] = *c.AUM
|
||||
@@ -309,13 +359,27 @@ func (c *FS) Heads() ([]AUM, error) {
|
||||
|
||||
out := make([]AUM, 0, 6) // 6 is arbitrary.
|
||||
err := c.scanHashes(func(info *fsHashInfo) {
|
||||
if len(info.Children) == 0 && info.AUM != nil {
|
||||
if len(info.Children) == 0 && info.AUM != nil && info.PurgedUnix == 0 {
|
||||
out = append(out, *info.AUM)
|
||||
}
|
||||
})
|
||||
return out, err
|
||||
}
|
||||
|
||||
// AllAUMs returns all AUMs stored in the chonk.
|
||||
func (c *FS) AllAUMs() ([]AUMHash, error) {
|
||||
c.mu.RLock()
|
||||
defer c.mu.RUnlock()
|
||||
|
||||
out := make([]AUMHash, 0, 6) // 6 is arbitrary.
|
||||
err := c.scanHashes(func(info *fsHashInfo) {
|
||||
if info.AUM != nil && info.PurgedUnix == 0 {
|
||||
out = append(out, info.AUM.Hash())
|
||||
}
|
||||
})
|
||||
return out, err
|
||||
}
|
||||
|
||||
func (c *FS) scanHashes(eachHashInfo func(*fsHashInfo)) error {
|
||||
prefixDirs, err := os.ReadDir(c.base)
|
||||
if err != nil {
|
||||
@@ -411,6 +475,7 @@ func (c *FS) CommitVerifiedAUMs(updates []AUM) error {
|
||||
}
|
||||
|
||||
err := c.commit(h, func(info *fsHashInfo) {
|
||||
info.PurgedUnix = 0 // just in-case it was set for some reason
|
||||
info.AUM = &aum
|
||||
})
|
||||
if err != nil {
|
||||
@@ -421,6 +486,31 @@ func (c *FS) CommitVerifiedAUMs(updates []AUM) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// PurgeAUMs marks the specified AUMs for deletion from storage.
|
||||
func (c *FS) PurgeAUMs(hashes []AUMHash) error {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
now := time.Now()
|
||||
for i, h := range hashes {
|
||||
stored, err := c.get(h)
|
||||
if err != nil {
|
||||
return fmt.Errorf("reading %d (%x): %w", i, h, err)
|
||||
}
|
||||
if stored.AUM == nil || stored.PurgedUnix > 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
err = c.commit(h, func(info *fsHashInfo) {
|
||||
info.PurgedUnix = now.Unix()
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("committing purge[%d] (%x): %w", i, h, err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// commit calls the provided updater function to record changes relevant
|
||||
// to the given hash. The caller is expected to update the AUM and
|
||||
// Children fields, as relevant.
|
||||
@@ -430,6 +520,7 @@ func (c *FS) commit(h AUMHash, updater func(*fsHashInfo)) error {
|
||||
existing, err := c.get(h)
|
||||
switch {
|
||||
case os.IsNotExist(err):
|
||||
toCommit.CreatedUnix = time.Now().Unix()
|
||||
case err != nil:
|
||||
return err
|
||||
default:
|
||||
@@ -754,5 +845,8 @@ func Compact(storage CompactableChonk, head AUMHash, opts CompactionOptions) (la
|
||||
}
|
||||
}
|
||||
|
||||
if err := storage.SetLastActiveAncestor(lastActiveAncestor); err != nil {
|
||||
return AUMHash{}, err
|
||||
}
|
||||
return lastActiveAncestor, storage.PurgeAUMs(toDelete)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user