tka: implement compaction logic
Signed-off-by: Tom DNetto <tom@tailscale.com>
This commit is contained in:
@@ -5,10 +5,12 @@ package tka
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/fxamacker/cbor/v2"
|
||||
"tailscale.com/atomicfile"
|
||||
@@ -53,6 +55,24 @@ type Chonk interface {
|
||||
LastActiveAncestor() (*AUMHash, error)
|
||||
}
|
||||
|
||||
// CompactableChonk implementation are extensions of Chonk, which are
|
||||
// able to be operated by compaction logic to deleted old AUMs.
|
||||
type CompactableChonk interface {
|
||||
Chonk
|
||||
|
||||
// AllAUMs returns all AUMs stored in the chonk.
|
||||
AllAUMs() ([]AUMHash, error)
|
||||
|
||||
// CommitTime returns the time at which the AUM was committed.
|
||||
//
|
||||
// If the AUM does not exist, then os.ErrNotExist is returned.
|
||||
CommitTime(hash AUMHash) (time.Time, error)
|
||||
|
||||
// PurgeAUMs permanently and irrevocably deletes the specified
|
||||
// AUMs from storage.
|
||||
PurgeAUMs(hashes []AUMHash) error
|
||||
}
|
||||
|
||||
// Mem implements in-memory storage of TKA state, suitable for
|
||||
// tests.
|
||||
//
|
||||
@@ -437,3 +457,302 @@ func (c *FS) commit(h AUMHash, updater func(*fsHashInfo)) error {
|
||||
}
|
||||
return atomicfile.WriteFile(filepath.Join(dir, base), buff.Bytes(), 0644)
|
||||
}
|
||||
|
||||
// CompactionOptions describes tuneables to use when compacting a Chonk.
|
||||
type CompactionOptions struct {
|
||||
// The minimum number of ancestor AUMs to remember. The actual length
|
||||
// of the chain post-compaction may be longer to reach a Checkpoint AUM.
|
||||
MinChain int
|
||||
// The minimum duration to store an AUM before it is a candidate for deletion.
|
||||
MinAge time.Duration
|
||||
}
|
||||
|
||||
// retainState tracks the state of an AUM hash as it is being considered for
|
||||
// deletion.
|
||||
type retainState uint8
|
||||
|
||||
// Valid retainState flags.
|
||||
const (
|
||||
retainStateActive retainState = 1 << iota // The AUM is part of the active chain and less than MinChain hops from HEAD.
|
||||
retainStateYoung // The AUM is younger than MinAge.
|
||||
retainStateLeaf // The AUM is a descendant of an AUM to be retained.
|
||||
retainStateAncestor // The AUM is part of a chain between a retained AUM and the new lastActiveAncestor.
|
||||
retainStateCandidate // The AUM is part of the active chain.
|
||||
|
||||
// retainAUMMask is a bit mask of any bit which should prevent
|
||||
// the deletion of an AUM.
|
||||
retainAUMMask retainState = retainStateActive | retainStateYoung | retainStateLeaf | retainStateAncestor
|
||||
)
|
||||
|
||||
// markActiveChain marks AUMs in the active chain.
|
||||
// All AUMs that are within minChain ancestors of head are
|
||||
// marked retainStateActive, and all remaining ancestors are
|
||||
// marked retainStateCandidate.
|
||||
//
|
||||
// markActiveChain returns the next ancestor AUM which is a checkpoint AUM.
|
||||
func markActiveChain(storage Chonk, verdict map[AUMHash]retainState, minChain int, head AUMHash) (lastActiveAncestor AUMHash, err error) {
|
||||
next, err := storage.AUM(head)
|
||||
if err != nil {
|
||||
return AUMHash{}, err
|
||||
}
|
||||
|
||||
for i := 0; i < minChain; i++ {
|
||||
h := next.Hash()
|
||||
verdict[h] |= retainStateActive
|
||||
|
||||
parent, hasParent := next.Parent()
|
||||
if !hasParent {
|
||||
// Genesis AUM (beginning of time). The chain isnt long enough to need truncating.
|
||||
return h, nil
|
||||
}
|
||||
|
||||
if next, err = storage.AUM(parent); err != nil {
|
||||
if err == os.ErrNotExist {
|
||||
// We've reached the end of the chain we have stored.
|
||||
return h, nil
|
||||
}
|
||||
return AUMHash{}, fmt.Errorf("reading active chain (retainStateActive) (%d): %w", i, err)
|
||||
}
|
||||
}
|
||||
|
||||
// If we got this far, we have at least minChain AUMs stored, and minChain number
|
||||
// of ancestors have been marked for retention. We now continue to iterate backwards
|
||||
// till we find an AUM which we can compact to (a Checkpoint AUM).
|
||||
for {
|
||||
h := next.Hash()
|
||||
verdict[h] |= retainStateActive
|
||||
if next.MessageKind == AUMCheckpoint {
|
||||
lastActiveAncestor = h
|
||||
break
|
||||
}
|
||||
|
||||
parent, hasParent := next.Parent()
|
||||
if !hasParent {
|
||||
return AUMHash{}, errors.New("reached genesis AUM without finding an appropriate lastActiveAncestor")
|
||||
}
|
||||
if next, err = storage.AUM(parent); err != nil {
|
||||
return AUMHash{}, fmt.Errorf("searching for compaction target: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Mark remaining known ancestors as retainStateCandidate.
|
||||
for {
|
||||
parent, hasParent := next.Parent()
|
||||
if !hasParent {
|
||||
break
|
||||
}
|
||||
verdict[parent] |= retainStateCandidate
|
||||
if next, err = storage.AUM(parent); err != nil {
|
||||
if err == os.ErrNotExist {
|
||||
// We've reached the end of the chain we have stored.
|
||||
break
|
||||
}
|
||||
return AUMHash{}, fmt.Errorf("reading active chain (retainStateCandidate): %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
return lastActiveAncestor, nil
|
||||
}
|
||||
|
||||
// markYoungAUMs marks all AUMs younger than minAge for retention. All
|
||||
// candidate AUMs must exist in verdict.
|
||||
func markYoungAUMs(storage CompactableChonk, verdict map[AUMHash]retainState, minAge time.Duration) error {
|
||||
minTime := time.Now().Add(-minAge)
|
||||
for h, _ := range verdict {
|
||||
commitTime, err := storage.CommitTime(h)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if commitTime.After(minTime) {
|
||||
verdict[h] |= retainStateYoung
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// markAncestorIntersectionAUMs walks backwards from all AUMs to be retained,
|
||||
// ensuring they intersect with candidateAncestor. All AUMs between a retained
|
||||
// AUM and candidateAncestor are marked for retention.
|
||||
//
|
||||
// If there is no intersection between candidateAncestor and the ancestors of
|
||||
// a retained AUM (this can happen if a retained AUM intersects the main chain
|
||||
// before candidateAncestor) then candidate ancestor is recomputed based on
|
||||
// the new oldest intersection.
|
||||
//
|
||||
// The final value for lastActiveAncestor is returned.
|
||||
func markAncestorIntersectionAUMs(storage Chonk, verdict map[AUMHash]retainState, candidateAncestor AUMHash) (lastActiveAncestor AUMHash, err error) {
|
||||
toScan := make([]AUMHash, 0, len(verdict))
|
||||
for h, v := range verdict {
|
||||
if (v & retainAUMMask) == 0 {
|
||||
continue // not marked for retention, so dont need to consider it
|
||||
}
|
||||
if h == candidateAncestor {
|
||||
continue
|
||||
}
|
||||
toScan = append(toScan, h)
|
||||
}
|
||||
|
||||
var didAdjustCandidateAncestor bool
|
||||
for len(toScan) > 0 {
|
||||
nextIterScan := make([]AUMHash, 0, len(verdict))
|
||||
for _, h := range toScan {
|
||||
if verdict[h]&retainStateAncestor != 0 {
|
||||
// This AUM and its ancestors have already been iterated.
|
||||
continue
|
||||
}
|
||||
verdict[h] |= retainStateAncestor
|
||||
|
||||
a, err := storage.AUM(h)
|
||||
if err != nil {
|
||||
return AUMHash{}, fmt.Errorf("reading %v: %w", h, err)
|
||||
}
|
||||
parent, hasParent := a.Parent()
|
||||
if !hasParent {
|
||||
return AUMHash{}, errors.New("reached genesis AUM without intersecting with candidate ancestor")
|
||||
}
|
||||
|
||||
if verdict[parent]&retainAUMMask != 0 {
|
||||
// Includes candidateAncestor (has retainStateActive set)
|
||||
continue
|
||||
}
|
||||
if verdict[parent]&retainStateCandidate != 0 {
|
||||
// We've intersected with the active chain but haven't done so through
|
||||
// candidateAncestor. That means that we intersect the active chain
|
||||
// before candidateAncestor, hence candidateAncestor actually needs
|
||||
// to be earlier than it is now.
|
||||
candidateAncestor = parent
|
||||
didAdjustCandidateAncestor = true
|
||||
verdict[parent] |= retainStateAncestor
|
||||
|
||||
// There could be AUMs on the active chain between our new candidateAncestor
|
||||
// and the old one, make sure they are marked as retained.
|
||||
next := parent
|
||||
childLoop:
|
||||
for {
|
||||
children, err := storage.ChildAUMs(next)
|
||||
if err != nil {
|
||||
return AUMHash{}, fmt.Errorf("reading children %v: %w", next, err)
|
||||
}
|
||||
// While there can be many children of an AUM, there can only be
|
||||
// one child on the active chain (it will have retainStateCandidate set).
|
||||
for _, a := range children {
|
||||
h := a.Hash()
|
||||
if v := verdict[h]; v&retainStateCandidate != 0 && v&retainStateActive == 0 {
|
||||
verdict[h] |= retainStateAncestor
|
||||
next = h
|
||||
continue childLoop
|
||||
}
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
nextIterScan = append(nextIterScan, parent)
|
||||
}
|
||||
toScan = nextIterScan
|
||||
}
|
||||
|
||||
// If candidateAncestor was adjusted backwards, then it may not be a checkpoint
|
||||
// (and hence a valid compaction candidate). If so, iterate backwards and adjust
|
||||
// the candidateAncestor till we find a checkpoint.
|
||||
if didAdjustCandidateAncestor {
|
||||
var next AUM
|
||||
if next, err = storage.AUM(candidateAncestor); err != nil {
|
||||
return AUMHash{}, fmt.Errorf("searching for compaction target: %w", err)
|
||||
}
|
||||
|
||||
for {
|
||||
h := next.Hash()
|
||||
verdict[h] |= retainStateActive
|
||||
if next.MessageKind == AUMCheckpoint {
|
||||
candidateAncestor = h
|
||||
break
|
||||
}
|
||||
|
||||
parent, hasParent := next.Parent()
|
||||
if !hasParent {
|
||||
return AUMHash{}, errors.New("reached genesis AUM without finding an appropriate candidateAncestor")
|
||||
}
|
||||
if next, err = storage.AUM(parent); err != nil {
|
||||
return AUMHash{}, fmt.Errorf("searching for compaction target: %w", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return candidateAncestor, nil
|
||||
}
|
||||
|
||||
// markDescendantAUMs marks all children of a retained AUM as retained.
|
||||
func markDescendantAUMs(storage Chonk, verdict map[AUMHash]retainState) error {
|
||||
toScan := make([]AUMHash, 0, len(verdict))
|
||||
for h, v := range verdict {
|
||||
if v&retainAUMMask == 0 {
|
||||
continue // not marked, so dont need to mark descendants
|
||||
}
|
||||
toScan = append(toScan, h)
|
||||
}
|
||||
|
||||
for len(toScan) > 0 {
|
||||
nextIterScan := make([]AUMHash, 0, len(verdict))
|
||||
for _, h := range toScan {
|
||||
if verdict[h]&retainStateLeaf != 0 {
|
||||
// This AUM and its decendants have already been marked.
|
||||
continue
|
||||
}
|
||||
verdict[h] |= retainStateLeaf
|
||||
|
||||
children, err := storage.ChildAUMs(h)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, a := range children {
|
||||
nextIterScan = append(nextIterScan, a.Hash())
|
||||
}
|
||||
}
|
||||
toScan = nextIterScan
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Compact deletes old AUMs from storage, based on the parameters given in opts.
|
||||
func Compact(storage CompactableChonk, head AUMHash, opts CompactionOptions) (lastActiveAncestor AUMHash, err error) {
|
||||
if opts.MinChain == 0 {
|
||||
return AUMHash{}, errors.New("opts.MinChain must be set")
|
||||
}
|
||||
if opts.MinAge == 0 {
|
||||
return AUMHash{}, errors.New("opts.MinAge must be set")
|
||||
}
|
||||
|
||||
all, err := storage.AllAUMs()
|
||||
if err != nil {
|
||||
return AUMHash{}, fmt.Errorf("AllAUMs: %w", err)
|
||||
}
|
||||
verdict := make(map[AUMHash]retainState, len(all))
|
||||
for _, h := range all {
|
||||
verdict[h] = 0
|
||||
}
|
||||
|
||||
if lastActiveAncestor, err = markActiveChain(storage, verdict, opts.MinChain, head); err != nil {
|
||||
return AUMHash{}, fmt.Errorf("marking active chain: %w", err)
|
||||
}
|
||||
if err := markYoungAUMs(storage, verdict, opts.MinAge); err != nil {
|
||||
return AUMHash{}, fmt.Errorf("marking young AUMs: %w", err)
|
||||
}
|
||||
if err := markDescendantAUMs(storage, verdict); err != nil {
|
||||
return AUMHash{}, fmt.Errorf("marking decendant AUMs: %w", err)
|
||||
}
|
||||
if lastActiveAncestor, err = markAncestorIntersectionAUMs(storage, verdict, lastActiveAncestor); err != nil {
|
||||
return AUMHash{}, fmt.Errorf("marking ancestor intersection: %w", err)
|
||||
}
|
||||
|
||||
toDelete := make([]AUMHash, 0, len(verdict))
|
||||
for h, v := range verdict {
|
||||
if v&retainAUMMask == 0 { // no retention set
|
||||
toDelete = append(toDelete, h)
|
||||
}
|
||||
}
|
||||
|
||||
return lastActiveAncestor, storage.PurgeAUMs(toDelete)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user