|
|
|
|
@ -55,6 +55,7 @@ func (b *LocalBackend) tkaSyncIfNeededLocked(nm *netmap.NetworkMap) error { |
|
|
|
|
// If the feature flag is not enabled, pretend we don't exist.
|
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
ourNodeKey := b.prefs.Persist.PrivateNodeKey.Public() |
|
|
|
|
|
|
|
|
|
isEnabled := b.tka != nil |
|
|
|
|
wantEnabled := nm.TKAEnabled |
|
|
|
|
@ -66,17 +67,16 @@ func (b *LocalBackend) tkaSyncIfNeededLocked(nm *netmap.NetworkMap) error { |
|
|
|
|
|
|
|
|
|
// Regardless of whether we are moving to disabled or enabled, we
|
|
|
|
|
// need information from the tka bootstrap endpoint.
|
|
|
|
|
ourNodeKey := b.prefs.Persist.PrivateNodeKey.Public() |
|
|
|
|
b.mu.Unlock() |
|
|
|
|
bs, err := b.tkaFetchBootstrap(ourNodeKey, ourHead) |
|
|
|
|
b.mu.Lock() |
|
|
|
|
if err != nil { |
|
|
|
|
return fmt.Errorf("fetching bootstrap: %v", err) |
|
|
|
|
return fmt.Errorf("fetching bootstrap: %w", err) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if wantEnabled && !isEnabled { |
|
|
|
|
if err := b.tkaBootstrapFromGenesisLocked(bs.GenesisAUM); err != nil { |
|
|
|
|
return fmt.Errorf("bootstrap: %v", err) |
|
|
|
|
return fmt.Errorf("bootstrap: %w", err) |
|
|
|
|
} |
|
|
|
|
isEnabled = true |
|
|
|
|
} else if !wantEnabled && isEnabled { |
|
|
|
|
@ -96,7 +96,96 @@ func (b *LocalBackend) tkaSyncIfNeededLocked(nm *netmap.NetworkMap) error { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if isEnabled && b.tka.authority.Head() != nm.TKAHead { |
|
|
|
|
// TODO(tom): Implement sync
|
|
|
|
|
if err := b.tkaSyncLocked(ourNodeKey); err != nil { |
|
|
|
|
return fmt.Errorf("tka sync: %w", err) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func toSyncOffer(head string, ancestors []string) (tka.SyncOffer, error) { |
|
|
|
|
var out tka.SyncOffer |
|
|
|
|
if err := out.Head.UnmarshalText([]byte(head)); err != nil { |
|
|
|
|
return tka.SyncOffer{}, fmt.Errorf("head.UnmarshalText: %v", err) |
|
|
|
|
} |
|
|
|
|
out.Ancestors = make([]tka.AUMHash, len(ancestors)) |
|
|
|
|
for i, a := range ancestors { |
|
|
|
|
if err := out.Ancestors[i].UnmarshalText([]byte(a)); err != nil { |
|
|
|
|
return tka.SyncOffer{}, fmt.Errorf("ancestor[%d].UnmarshalText: %v", i, err) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
return out, nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// tkaSyncLocked synchronizes TKA state with control. b.mu must be held
|
|
|
|
|
// and tka must be initialized. b.mu will be stepped out of (and back into)
|
|
|
|
|
// during network RPCs.
|
|
|
|
|
func (b *LocalBackend) tkaSyncLocked(ourNodeKey key.NodePublic) error { |
|
|
|
|
offer, err := b.tka.authority.SyncOffer(b.tka.storage) |
|
|
|
|
if err != nil { |
|
|
|
|
return fmt.Errorf("offer: %w", err) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
b.mu.Unlock() |
|
|
|
|
offerResp, err := b.tkaDoSyncOffer(ourNodeKey, offer) |
|
|
|
|
b.mu.Lock() |
|
|
|
|
if err != nil { |
|
|
|
|
return fmt.Errorf("offer RPC: %w", err) |
|
|
|
|
} |
|
|
|
|
controlOffer, err := toSyncOffer(offerResp.Head, offerResp.Ancestors) |
|
|
|
|
if err != nil { |
|
|
|
|
return fmt.Errorf("control offer: %v", err) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if controlOffer.Head == offer.Head { |
|
|
|
|
// We are up to date.
|
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Compute missing AUMs before we apply any AUMs from the control-plane,
|
|
|
|
|
// so we still submit AUMs to control even if they are not part of the
|
|
|
|
|
// active chain.
|
|
|
|
|
toSendAUMs, err := b.tka.authority.MissingAUMs(b.tka.storage, controlOffer) |
|
|
|
|
if err != nil { |
|
|
|
|
return fmt.Errorf("computing missing AUMs: %w", err) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// If we got this far, then we are not up to date. Either the control-plane
|
|
|
|
|
// has updates for us, or we have updates for the control plane.
|
|
|
|
|
//
|
|
|
|
|
// TODO(tom): Do we want to keep processing even if the Inform fails? Need
|
|
|
|
|
// to think through if theres holdback concerns here or not.
|
|
|
|
|
if len(offerResp.MissingAUMs) > 0 { |
|
|
|
|
aums := make([]tka.AUM, len(offerResp.MissingAUMs)) |
|
|
|
|
for i, a := range offerResp.MissingAUMs { |
|
|
|
|
if err := aums[i].Unserialize(a); err != nil { |
|
|
|
|
return fmt.Errorf("MissingAUMs[%d]: %v", i, err) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if err := b.tka.authority.Inform(b.tka.storage, aums); err != nil { |
|
|
|
|
return fmt.Errorf("inform failed: %v", err) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// NOTE(tom): We could short-circuit here if our HEAD equals the
|
|
|
|
|
// control-plane's head, but we don't just so control always has a
|
|
|
|
|
// copy of all forks that clients had.
|
|
|
|
|
|
|
|
|
|
b.mu.Unlock() |
|
|
|
|
sendResp, err := b.tkaDoSyncSend(ourNodeKey, toSendAUMs) |
|
|
|
|
b.mu.Lock() |
|
|
|
|
if err != nil { |
|
|
|
|
return fmt.Errorf("send RPC: %v", err) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
var remoteHead tka.AUMHash |
|
|
|
|
if err := remoteHead.UnmarshalText([]byte(sendResp.Head)); err != nil { |
|
|
|
|
return fmt.Errorf("head unmarshal: %v", err) |
|
|
|
|
} |
|
|
|
|
if remoteHead != b.tka.authority.Head() { |
|
|
|
|
b.logf("TKA desync: expected consensus after sync but our head is %v and the control plane's is %v", b.tka.authority.Head(), remoteHead) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return nil |
|
|
|
|
@ -405,3 +494,106 @@ func (b *LocalBackend) tkaFetchBootstrap(ourNodeKey key.NodePublic, head tka.AUM |
|
|
|
|
|
|
|
|
|
return a, nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func fromSyncOffer(offer tka.SyncOffer) (head string, ancestors []string, err error) { |
|
|
|
|
headBytes, err := offer.Head.MarshalText() |
|
|
|
|
if err != nil { |
|
|
|
|
return "", nil, fmt.Errorf("head.MarshalText: %v", err) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
ancestors = make([]string, len(offer.Ancestors)) |
|
|
|
|
for i, ancestor := range offer.Ancestors { |
|
|
|
|
hash, err := ancestor.MarshalText() |
|
|
|
|
if err != nil { |
|
|
|
|
return "", nil, fmt.Errorf("ancestor[%d].MarshalText: %v", i, err) |
|
|
|
|
} |
|
|
|
|
ancestors[i] = string(hash) |
|
|
|
|
} |
|
|
|
|
return string(headBytes), ancestors, nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// tkaDoSyncOffer sends a /machine/tka/sync/offer RPC to the control plane
|
|
|
|
|
// over noise. This is the first of two RPCs implementing tka synchronization.
|
|
|
|
|
func (b *LocalBackend) tkaDoSyncOffer(ourNodeKey key.NodePublic, offer tka.SyncOffer) (*tailcfg.TKASyncOfferResponse, error) { |
|
|
|
|
head, ancestors, err := fromSyncOffer(offer) |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, fmt.Errorf("encoding offer: %v", err) |
|
|
|
|
} |
|
|
|
|
syncReq := tailcfg.TKASyncOfferRequest{ |
|
|
|
|
Version: tailcfg.CurrentCapabilityVersion, |
|
|
|
|
NodeKey: ourNodeKey, |
|
|
|
|
Head: head, |
|
|
|
|
Ancestors: ancestors, |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
var req bytes.Buffer |
|
|
|
|
if err := json.NewEncoder(&req).Encode(syncReq); err != nil { |
|
|
|
|
return nil, fmt.Errorf("encoding request: %v", err) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), time.Minute) |
|
|
|
|
defer cancel() |
|
|
|
|
req2, err := http.NewRequestWithContext(ctx, "GET", "https://unused/machine/tka/sync/offer", &req) |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, fmt.Errorf("req: %w", err) |
|
|
|
|
} |
|
|
|
|
res, err := b.DoNoiseRequest(req2) |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, fmt.Errorf("resp: %w", err) |
|
|
|
|
} |
|
|
|
|
if res.StatusCode != 200 { |
|
|
|
|
body, _ := io.ReadAll(res.Body) |
|
|
|
|
res.Body.Close() |
|
|
|
|
return nil, fmt.Errorf("request returned (%d): %s", res.StatusCode, string(body)) |
|
|
|
|
} |
|
|
|
|
a := new(tailcfg.TKASyncOfferResponse) |
|
|
|
|
err = json.NewDecoder(res.Body).Decode(a) |
|
|
|
|
res.Body.Close() |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, fmt.Errorf("decoding JSON: %w", err) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return a, nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// tkaDoSyncSend sends a /machine/tka/sync/send RPC to the control plane
|
|
|
|
|
// over noise. This is the second of two RPCs implementing tka synchronization.
|
|
|
|
|
func (b *LocalBackend) tkaDoSyncSend(ourNodeKey key.NodePublic, aums []tka.AUM) (*tailcfg.TKASyncSendResponse, error) { |
|
|
|
|
sendReq := tailcfg.TKASyncSendRequest{ |
|
|
|
|
Version: tailcfg.CurrentCapabilityVersion, |
|
|
|
|
NodeKey: ourNodeKey, |
|
|
|
|
MissingAUMs: make([]tkatype.MarshaledAUM, len(aums)), |
|
|
|
|
} |
|
|
|
|
for i, a := range aums { |
|
|
|
|
sendReq.MissingAUMs[i] = a.Serialize() |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
var req bytes.Buffer |
|
|
|
|
if err := json.NewEncoder(&req).Encode(sendReq); err != nil { |
|
|
|
|
return nil, fmt.Errorf("encoding request: %v", err) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), time.Minute) |
|
|
|
|
defer cancel() |
|
|
|
|
req2, err := http.NewRequestWithContext(ctx, "GET", "https://unused/machine/tka/sync/send", &req) |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, fmt.Errorf("req: %w", err) |
|
|
|
|
} |
|
|
|
|
res, err := b.DoNoiseRequest(req2) |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, fmt.Errorf("resp: %w", err) |
|
|
|
|
} |
|
|
|
|
if res.StatusCode != 200 { |
|
|
|
|
body, _ := io.ReadAll(res.Body) |
|
|
|
|
res.Body.Close() |
|
|
|
|
return nil, fmt.Errorf("request returned (%d): %s", res.StatusCode, string(body)) |
|
|
|
|
} |
|
|
|
|
a := new(tailcfg.TKASyncSendResponse) |
|
|
|
|
err = json.NewDecoder(res.Body).Decode(a) |
|
|
|
|
res.Body.Close() |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, fmt.Errorf("decoding JSON: %w", err) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return a, nil |
|
|
|
|
} |
|
|
|
|
|