From 03c3551ee50921cd4dba46e8fe08a6a49b0e9920 Mon Sep 17 00:00:00 2001 From: Jonathan Nobels Date: Thu, 9 Apr 2026 15:45:41 -0400 Subject: [PATCH] ipn/ipnlocal: add netmap mutations to the ipn bus (#19120) ipn/local: add netmap mutations to the ipn bus updates tailscale/tailscale#1909 This adds a new new NotifyWatchOpt that allows watchers to receive PeerChange events (derived from node mutations) on the IPN bus in lieu of a complete netmap. We'll continue to send the full netmap for any map response that includes it, but for mutations, sending PeerChange events gives the client the option to manage it's own models more selectively and cuts way down on json serialization overhead. On chatty tailnets, this will vastly reduce the amount of chatter on the bus. This change should be backwards compatible, it is purely additive. Clients that subscribe to NotifyNetmap will get the full netmap for every delta. New clients can omit that and instead opt into NotifyPeerChanges. Signed-off-by: Jonathan Nobels --- ipn/backend.go | 19 ++++++- ipn/ipnlocal/bus.go | 57 ++++++++++++++++++++- ipn/ipnlocal/bus_test.go | 104 ++++++++++++++++++++++++++++++++++++++- ipn/ipnlocal/local.go | 75 +++++++++++++++++++++++++--- 4 files changed, 242 insertions(+), 13 deletions(-) diff --git a/ipn/backend.go b/ipn/backend.go index a5830565b..7ea7c92b4 100644 --- a/ipn/backend.go +++ b/ipn/backend.go @@ -87,6 +87,11 @@ const ( NotifyInitialSuggestedExitNode NotifyWatchOpt = 1 << 10 // if set, the first Notify message (sent immediately) will contain the current SuggestedExitNode if available NotifyInitialClientVersion NotifyWatchOpt = 1 << 11 // if set, the first Notify message (sent immediately) will contain the current ClientVersion if available and if update checks are enabled + + // NotifyPeerChanges, if set, causes netmap delta updates to be sent as [tailcfg.PeerChange] rather than a full NetMap. + // Full netmap responses from the control plane are still sent as a full NetMap. PeerChanges are only sent to sessions + // that have opted in to this mode. + NotifyPeerChanges NotifyWatchOpt = 1 << 12 ) // Notify is a communication from a backend (e.g. tailscaled) to a frontend @@ -112,8 +117,15 @@ type Notify struct { State *State // if non-nil, the new or current IPN state Prefs *PrefsView // if non-nil && Valid, the new or current preferences NetMap *netmap.NetworkMap // if non-nil, the new or current netmap - Engine *EngineStatus // if non-nil, the new or current wireguard stats - BrowseToURL *string // if non-nil, UI should open a browser right now + + // PeerChanges, if non-nil, is a list of [tailcfg.PeerChange] that have occurred since the last + // full netmap update. This is sent in lieu of a full NetMap when [NotifyPeerChanges] is set in + // the session's mask and a netmap update is derived from an incremental MapResponse. + // Full MapResponse updates from the control plane are sent as a full NetMap. + PeerChanges []*tailcfg.PeerChange `json:",omitzero"` + + Engine *EngineStatus // if non-nil, the new or current wireguard stats + BrowseToURL *string // if non-nil, UI should open a browser right now // FilesWaiting if non-nil means that files are buffered in // the Tailscale daemon and ready for local transfer to the @@ -184,6 +196,9 @@ func (n Notify) String() string { if n.NetMap != nil { sb.WriteString("NetMap{...} ") } + if n.PeerChanges != nil { + fmt.Fprintf(&sb, "PeerChanges(%d) ", len(n.PeerChanges)) + } if n.Engine != nil { fmt.Fprintf(&sb, "wg=%v ", *n.Engine) } diff --git a/ipn/ipnlocal/bus.go b/ipn/ipnlocal/bus.go index 6061f7223..de04fd09a 100644 --- a/ipn/ipnlocal/bus.go +++ b/ipn/ipnlocal/bus.go @@ -8,6 +8,7 @@ import ( "time" "tailscale.com/ipn" + "tailscale.com/tailcfg" "tailscale.com/tstime" ) @@ -116,8 +117,8 @@ func (s *rateLimitingBusSender) Run(ctx context.Context, ch <-chan *ipn.Notify) } } -// mergeBoringNotify merges new notify 'src' into possibly-nil 'dst', -// either mutating 'dst' or allocating a new one if 'dst' is nil, +// mergeBoringNotify merges new notify src into possibly-nil dst, +// either mutating dst or allocating a new one if dst is nil, // returning the merged result. // // dst and src must both be "boring" (i.e. not notable per isNotifiableNotify). @@ -127,6 +128,9 @@ func mergeBoringNotifies(dst, src *ipn.Notify) *ipn.Notify { } if src.NetMap != nil { dst.NetMap = src.NetMap + dst.PeerChanges = nil // full netmap supersedes any accumulated deltas + } else if src.PeerChanges != nil { + dst.PeerChanges = mergePeerChanges(dst.PeerChanges, src.PeerChanges) } if src.Engine != nil { dst.Engine = src.Engine @@ -134,6 +138,55 @@ func mergeBoringNotifies(dst, src *ipn.Notify) *ipn.Notify { return dst } +// mergePeerChanges merges new peer changes from src into dst, either +// mutating dst or allocating a new slice if dst is nil, returning the merged result. +// Values in src override those in dst for the same NodeID. +func mergePeerChanges(dst, src []*tailcfg.PeerChange) []*tailcfg.PeerChange { + idxByNode := make(map[tailcfg.NodeID]int, len(dst)) + for i, d := range dst { + idxByNode[d.NodeID] = i + } + + for _, nd := range src { + if oi, ok := idxByNode[nd.NodeID]; ok { + dst[oi] = mergePeerChangeForIpnBus(dst[oi], nd) + continue + } + idxByNode[nd.NodeID] = len(dst) + dst = append(dst, nd) + } + return dst +} + +// mergePeerChangeForIpnBus merges new with old, returning the result. +// Fields set in new override those in old; fields only set in old are preserved. +func mergePeerChangeForIpnBus(old, new *tailcfg.PeerChange) *tailcfg.PeerChange { + merged := *old + + // This is a subset of PeerChange that reflects only the fields that can + // be changed via a NodeMutation. If future fields can be updated via + // NodeMutations from map responses (and they are relevant to the ipn bus), then + // they should be added here and merged in the same way. + if new.DERPRegion != 0 { + // netmap.NodeMutationDerpHome + merged.DERPRegion = new.DERPRegion + } + if new.Online != nil { + // netmap.NodeMutationOnline + merged.Online = new.Online + } + if new.LastSeen != nil { + // netmap.NodeMutationLastSeen + merged.LastSeen = new.LastSeen + } + if new.Endpoints != nil { + // netmap.NodeMutationEndpoints + merged.Endpoints = new.Endpoints + } + + return &merged +} + // isNotableNotify reports whether n is a "notable" notification that // should be sent on the IPN bus immediately (e.g. to GUIs) without // rate limiting it for a few seconds. diff --git a/ipn/ipnlocal/bus_test.go b/ipn/ipnlocal/bus_test.go index 47d13f305..8e4d3ede8 100644 --- a/ipn/ipnlocal/bus_test.go +++ b/ipn/ipnlocal/bus_test.go @@ -12,6 +12,7 @@ import ( "tailscale.com/drive" "tailscale.com/ipn" + "tailscale.com/tailcfg" "tailscale.com/tstest" "tailscale.com/tstime" "tailscale.com/types/logger" @@ -29,6 +30,7 @@ func TestIsNotableNotify(t *testing.T) { {"empty", &ipn.Notify{}, false}, {"version", &ipn.Notify{Version: "foo"}, false}, {"netmap", &ipn.Notify{NetMap: new(netmap.NetworkMap)}, false}, + {"peerchanges", &ipn.Notify{PeerChanges: []*tailcfg.PeerChange{{}}}, false}, {"engine", &ipn.Notify{Engine: new(ipn.EngineStatus)}, false}, } @@ -39,7 +41,7 @@ func TestIsNotableNotify(t *testing.T) { for sf := range rt.Fields() { n := &ipn.Notify{} switch sf.Name { - case "_", "NetMap", "Engine", "Version": + case "_", "NetMap", "PeerChanges", "Engine", "Version": // Already covered above or not applicable. continue case "DriveShares": @@ -217,3 +219,103 @@ func TestRateLimitingBusSender(t *testing.T) { st.s.Run(ctx, incoming) }) } + +func TestMergePeerChanges(t *testing.T) { + online := true + offline := false + + t.Run("no_overlap_appends", func(t *testing.T) { + old := []*tailcfg.PeerChange{ + {NodeID: 1, DERPRegion: 1}, + } + new := []*tailcfg.PeerChange{ + {NodeID: 2, DERPRegion: 2}, + } + got := mergePeerChanges(old, new) + if len(got) != 2 { + t.Fatalf("len = %d; want 2", len(got)) + } + if got[0].NodeID != 1 || got[1].NodeID != 2 { + t.Errorf("got NodeIDs %d, %d; want 1, 2", got[0].NodeID, got[1].NodeID) + } + }) + + t.Run("overlap_merges", func(t *testing.T) { + old := []*tailcfg.PeerChange{ + {NodeID: 1, DERPRegion: 1, Online: &online}, + {NodeID: 2, DERPRegion: 10}, + } + new := []*tailcfg.PeerChange{ + {NodeID: 1, DERPRegion: 5, Online: &offline}, + } + got := mergePeerChanges(old, new) + if len(got) != 2 { + t.Fatalf("len = %d; want 2 (merged, not appended)", len(got)) + } + if got[0].DERPRegion != 5 { + t.Errorf("DERPRegion = %d; want 5 (from new)", got[0].DERPRegion) + } + if *got[0].Online != false { + t.Errorf("Online = %v; want false (from new)", *got[0].Online) + } + // Node 2 should be untouched. + if got[1].NodeID != 2 || got[1].DERPRegion != 10 { + t.Errorf("node 2 was modified unexpectedly") + } + }) + + t.Run("partial_overlap_merges_and_appends", func(t *testing.T) { + old := []*tailcfg.PeerChange{ + {NodeID: 1, DERPRegion: 1}, + } + new := []*tailcfg.PeerChange{ + {NodeID: 1, DERPRegion: 2}, + {NodeID: 3, DERPRegion: 30}, + } + got := mergePeerChanges(old, new) + if len(got) != 2 { + t.Fatalf("len = %d; want 2", len(got)) + } + if got[0].NodeID != 1 || got[0].DERPRegion != 2 { + t.Errorf("node 1: DERPRegion = %d; want 2", got[0].DERPRegion) + } + if got[1].NodeID != 3 || got[1].DERPRegion != 30 { + t.Errorf("node 3: DERPRegion = %d; want 30", got[1].DERPRegion) + } + }) + + t.Run("preserves_old_fields_on_merge", func(t *testing.T) { + old := []*tailcfg.PeerChange{ + {NodeID: 1, DERPRegion: 1, Online: &online, Cap: 10}, + } + new := []*tailcfg.PeerChange{ + {NodeID: 1, Online: &offline}, + } + got := mergePeerChanges(old, new) + if len(got) != 1 { + t.Fatalf("len = %d; want 1", len(got)) + } + if got[0].DERPRegion != 1 { + t.Errorf("DERPRegion = %d; want 1 (preserved from old)", got[0].DERPRegion) + } + if got[0].Cap != 10 { + t.Errorf("Cap = %d; want 10 (preserved from old)", got[0].Cap) + } + if *got[0].Online != false { + t.Errorf("Online = %v; want false (from new)", *got[0].Online) + } + }) + + t.Run("nil_old", func(t *testing.T) { + new := []*tailcfg.PeerChange{ + {NodeID: 1, DERPRegion: 1}, + } + got := mergePeerChanges(nil, new) + if len(got) != 1 { + t.Fatalf("len = %d; want 1", len(got)) + } + if got[0].NodeID != 1 { + t.Errorf("NodeID = %d; want 1", got[0].NodeID) + } + }) +} diff --git a/ipn/ipnlocal/local.go b/ipn/ipnlocal/local.go index 8e8b25f1c..610d1d7b5 100644 --- a/ipn/ipnlocal/local.go +++ b/ipn/ipnlocal/local.go @@ -152,6 +152,7 @@ type watchSession struct { owner ipnauth.Actor // or nil sessionID string cancel context.CancelFunc // to shut down the session + mask ipn.NotifyWatchOpt // watch options for this session } var ( @@ -2164,10 +2165,20 @@ func (b *LocalBackend) UpdateNetmapDelta(muts []netmap.NodeMutation) (handled bo b.suggestExitNodeLocked() } - if cn.NetMap() != nil && mutationsAreWorthyOfTellingIPNBus(muts) { + if cn.NetMap() == nil { + b.logf("[unexpected] got node mutations but netmap is nil; mutations not applied") + return true + } - nm := cn.netMapWithPeers() - notify = &ipn.Notify{NetMap: nm} + if mutationsAreWorthyOfTellingIPNBus(muts) { + // The notifier will strip the netmap based on the watchOpts mask if the watcher + // has indicated it can handle PeerChanges. + notify = &ipn.Notify{NetMap: cn.netMapWithPeers()} + if peerChanges, ok := ipnBusPeerChangesFromNodeMutations(muts); ok { + notify.PeerChanges = peerChanges + } else { + b.logf("[unexpected] got mutations worthy of telling IPN bus but failed to convert to peer changes") + } } else if testenv.InTest() { // In tests, send an empty Notify as a wake-up so end-to-end // integration tests in another repo can check on the status of @@ -2215,6 +2226,39 @@ func mutationsAreWorthyOfRecalculatingSuggestedExitNode(muts []netmap.NodeMutati return false } +// ipnBusPeerChangesFromNodeMutations converts a slice of NodeMutations to a slice of +// *tailcfg.PeerChange for use in ipn.Notify.PeerChanges. +// Multiple mutations to the same node are merged into a single PeerChange. +// If we encounter any mutations that we cannot convert to a PeerChange, we return (nil, false) +// to indicate that the caller should send a Notify with the full netmap instead of +// trying to send granular peer changes. +func ipnBusPeerChangesFromNodeMutations(muts []netmap.NodeMutation) ([]*tailcfg.PeerChange, bool) { + byID := map[tailcfg.NodeID]*tailcfg.PeerChange{} + var ordered []*tailcfg.PeerChange + for _, m := range muts { + nid := m.NodeIDBeingMutated() + pc := byID[nid] + if pc == nil { + pc = &tailcfg.PeerChange{NodeID: nid} + byID[nid] = pc + ordered = append(ordered, pc) + } + switch v := m.(type) { + case netmap.NodeMutationOnline: + pc.Online = &v.Online + case netmap.NodeMutationLastSeen: + pc.LastSeen = &v.LastSeen + case netmap.NodeMutationDERPHome: + pc.DERPRegion = v.DERPRegion + case netmap.NodeMutationEndpoints: + pc.Endpoints = v.Endpoints + default: + return nil, false + } + } + return ordered, true +} + // mutationsAreWorthyOfTellingIPNBus reports whether any mutation type in muts is // worthy of spamming the IPN bus (the Windows & Mac GUIs, basically) to tell them // about the update. @@ -3212,6 +3256,7 @@ func (b *LocalBackend) WatchNotificationsAs(ctx context.Context, actor ipnauth.A owner: actor, sessionID: sessionID, cancel: cancel, + mask: mask, } mak.Set(&b.notifyWatchers, sessionID, session) b.mu.Unlock() @@ -3449,13 +3494,27 @@ func (b *LocalBackend) sendToLocked(n ipn.Notify, recipient notificationTarget) } for _, sess := range b.notifyWatchers { - if recipient.match(sess.owner) { - select { - case sess.ch <- &n: - default: - // Drop the notification if the channel is full. + if !recipient.match(sess.owner) { + continue + } + nOut := &n + if n.PeerChanges != nil { + // Take a shallow copy of n so we can elide the PeerChanges or the Netmap + // based on the session's mask. + nOut = new(n) + if sess.mask&ipn.NotifyPeerChanges != 0 { + // Skip the full Netmap + nOut.NetMap = nil + } else { + // Skip the PeerChanges + nOut.PeerChanges = nil } } + select { + case sess.ch <- nOut: + default: + // Drop the notification if the channel is full. + } } }