ipn/ipnlocal: add netmap mutations to the ipn bus (#19120)

ipn/local: add netmap mutations to the ipn bus

updates tailscale/tailscale#1909

This adds a new new NotifyWatchOpt that allows watchers to
receive PeerChange events (derived from node mutations)
on the IPN bus in lieu of a complete netmap.  We'll continue
to send the full netmap for any map response that includes it,
but for  mutations, sending PeerChange events gives the client
the option to manage it's own models more selectively and cuts
way down on json serialization overhead.

On chatty tailnets, this will vastly reduce the amount of
chatter on the bus.

This change should be backwards compatible, it is
purely additive.  Clients that subscribe to NotifyNetmap will
get the full netmap for every delta.  New clients can
omit that and instead opt into NotifyPeerChanges.

Signed-off-by: Jonathan Nobels <jonathan@tailscale.com>
main
Jonathan Nobels 1 week ago committed by GitHub
parent 6b7caaf7ee
commit 03c3551ee5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 19
      ipn/backend.go
  2. 57
      ipn/ipnlocal/bus.go
  3. 104
      ipn/ipnlocal/bus_test.go
  4. 75
      ipn/ipnlocal/local.go

@ -87,6 +87,11 @@ const (
NotifyInitialSuggestedExitNode NotifyWatchOpt = 1 << 10 // if set, the first Notify message (sent immediately) will contain the current SuggestedExitNode if available
NotifyInitialClientVersion NotifyWatchOpt = 1 << 11 // if set, the first Notify message (sent immediately) will contain the current ClientVersion if available and if update checks are enabled
// NotifyPeerChanges, if set, causes netmap delta updates to be sent as [tailcfg.PeerChange] rather than a full NetMap.
// Full netmap responses from the control plane are still sent as a full NetMap. PeerChanges are only sent to sessions
// that have opted in to this mode.
NotifyPeerChanges NotifyWatchOpt = 1 << 12
)
// Notify is a communication from a backend (e.g. tailscaled) to a frontend
@ -112,8 +117,15 @@ type Notify struct {
State *State // if non-nil, the new or current IPN state
Prefs *PrefsView // if non-nil && Valid, the new or current preferences
NetMap *netmap.NetworkMap // if non-nil, the new or current netmap
Engine *EngineStatus // if non-nil, the new or current wireguard stats
BrowseToURL *string // if non-nil, UI should open a browser right now
// PeerChanges, if non-nil, is a list of [tailcfg.PeerChange] that have occurred since the last
// full netmap update. This is sent in lieu of a full NetMap when [NotifyPeerChanges] is set in
// the session's mask and a netmap update is derived from an incremental MapResponse.
// Full MapResponse updates from the control plane are sent as a full NetMap.
PeerChanges []*tailcfg.PeerChange `json:",omitzero"`
Engine *EngineStatus // if non-nil, the new or current wireguard stats
BrowseToURL *string // if non-nil, UI should open a browser right now
// FilesWaiting if non-nil means that files are buffered in
// the Tailscale daemon and ready for local transfer to the
@ -184,6 +196,9 @@ func (n Notify) String() string {
if n.NetMap != nil {
sb.WriteString("NetMap{...} ")
}
if n.PeerChanges != nil {
fmt.Fprintf(&sb, "PeerChanges(%d) ", len(n.PeerChanges))
}
if n.Engine != nil {
fmt.Fprintf(&sb, "wg=%v ", *n.Engine)
}

@ -8,6 +8,7 @@ import (
"time"
"tailscale.com/ipn"
"tailscale.com/tailcfg"
"tailscale.com/tstime"
)
@ -116,8 +117,8 @@ func (s *rateLimitingBusSender) Run(ctx context.Context, ch <-chan *ipn.Notify)
}
}
// mergeBoringNotify merges new notify 'src' into possibly-nil 'dst',
// either mutating 'dst' or allocating a new one if 'dst' is nil,
// mergeBoringNotify merges new notify src into possibly-nil dst,
// either mutating dst or allocating a new one if dst is nil,
// returning the merged result.
//
// dst and src must both be "boring" (i.e. not notable per isNotifiableNotify).
@ -127,6 +128,9 @@ func mergeBoringNotifies(dst, src *ipn.Notify) *ipn.Notify {
}
if src.NetMap != nil {
dst.NetMap = src.NetMap
dst.PeerChanges = nil // full netmap supersedes any accumulated deltas
} else if src.PeerChanges != nil {
dst.PeerChanges = mergePeerChanges(dst.PeerChanges, src.PeerChanges)
}
if src.Engine != nil {
dst.Engine = src.Engine
@ -134,6 +138,55 @@ func mergeBoringNotifies(dst, src *ipn.Notify) *ipn.Notify {
return dst
}
// mergePeerChanges merges new peer changes from src into dst, either
// mutating dst or allocating a new slice if dst is nil, returning the merged result.
// Values in src override those in dst for the same NodeID.
func mergePeerChanges(dst, src []*tailcfg.PeerChange) []*tailcfg.PeerChange {
idxByNode := make(map[tailcfg.NodeID]int, len(dst))
for i, d := range dst {
idxByNode[d.NodeID] = i
}
for _, nd := range src {
if oi, ok := idxByNode[nd.NodeID]; ok {
dst[oi] = mergePeerChangeForIpnBus(dst[oi], nd)
continue
}
idxByNode[nd.NodeID] = len(dst)
dst = append(dst, nd)
}
return dst
}
// mergePeerChangeForIpnBus merges new with old, returning the result.
// Fields set in new override those in old; fields only set in old are preserved.
func mergePeerChangeForIpnBus(old, new *tailcfg.PeerChange) *tailcfg.PeerChange {
merged := *old
// This is a subset of PeerChange that reflects only the fields that can
// be changed via a NodeMutation. If future fields can be updated via
// NodeMutations from map responses (and they are relevant to the ipn bus), then
// they should be added here and merged in the same way.
if new.DERPRegion != 0 {
// netmap.NodeMutationDerpHome
merged.DERPRegion = new.DERPRegion
}
if new.Online != nil {
// netmap.NodeMutationOnline
merged.Online = new.Online
}
if new.LastSeen != nil {
// netmap.NodeMutationLastSeen
merged.LastSeen = new.LastSeen
}
if new.Endpoints != nil {
// netmap.NodeMutationEndpoints
merged.Endpoints = new.Endpoints
}
return &merged
}
// isNotableNotify reports whether n is a "notable" notification that
// should be sent on the IPN bus immediately (e.g. to GUIs) without
// rate limiting it for a few seconds.

@ -12,6 +12,7 @@ import (
"tailscale.com/drive"
"tailscale.com/ipn"
"tailscale.com/tailcfg"
"tailscale.com/tstest"
"tailscale.com/tstime"
"tailscale.com/types/logger"
@ -29,6 +30,7 @@ func TestIsNotableNotify(t *testing.T) {
{"empty", &ipn.Notify{}, false},
{"version", &ipn.Notify{Version: "foo"}, false},
{"netmap", &ipn.Notify{NetMap: new(netmap.NetworkMap)}, false},
{"peerchanges", &ipn.Notify{PeerChanges: []*tailcfg.PeerChange{{}}}, false},
{"engine", &ipn.Notify{Engine: new(ipn.EngineStatus)}, false},
}
@ -39,7 +41,7 @@ func TestIsNotableNotify(t *testing.T) {
for sf := range rt.Fields() {
n := &ipn.Notify{}
switch sf.Name {
case "_", "NetMap", "Engine", "Version":
case "_", "NetMap", "PeerChanges", "Engine", "Version":
// Already covered above or not applicable.
continue
case "DriveShares":
@ -217,3 +219,103 @@ func TestRateLimitingBusSender(t *testing.T) {
st.s.Run(ctx, incoming)
})
}
func TestMergePeerChanges(t *testing.T) {
online := true
offline := false
t.Run("no_overlap_appends", func(t *testing.T) {
old := []*tailcfg.PeerChange{
{NodeID: 1, DERPRegion: 1},
}
new := []*tailcfg.PeerChange{
{NodeID: 2, DERPRegion: 2},
}
got := mergePeerChanges(old, new)
if len(got) != 2 {
t.Fatalf("len = %d; want 2", len(got))
}
if got[0].NodeID != 1 || got[1].NodeID != 2 {
t.Errorf("got NodeIDs %d, %d; want 1, 2", got[0].NodeID, got[1].NodeID)
}
})
t.Run("overlap_merges", func(t *testing.T) {
old := []*tailcfg.PeerChange{
{NodeID: 1, DERPRegion: 1, Online: &online},
{NodeID: 2, DERPRegion: 10},
}
new := []*tailcfg.PeerChange{
{NodeID: 1, DERPRegion: 5, Online: &offline},
}
got := mergePeerChanges(old, new)
if len(got) != 2 {
t.Fatalf("len = %d; want 2 (merged, not appended)", len(got))
}
if got[0].DERPRegion != 5 {
t.Errorf("DERPRegion = %d; want 5 (from new)", got[0].DERPRegion)
}
if *got[0].Online != false {
t.Errorf("Online = %v; want false (from new)", *got[0].Online)
}
// Node 2 should be untouched.
if got[1].NodeID != 2 || got[1].DERPRegion != 10 {
t.Errorf("node 2 was modified unexpectedly")
}
})
t.Run("partial_overlap_merges_and_appends", func(t *testing.T) {
old := []*tailcfg.PeerChange{
{NodeID: 1, DERPRegion: 1},
}
new := []*tailcfg.PeerChange{
{NodeID: 1, DERPRegion: 2},
{NodeID: 3, DERPRegion: 30},
}
got := mergePeerChanges(old, new)
if len(got) != 2 {
t.Fatalf("len = %d; want 2", len(got))
}
if got[0].NodeID != 1 || got[0].DERPRegion != 2 {
t.Errorf("node 1: DERPRegion = %d; want 2", got[0].DERPRegion)
}
if got[1].NodeID != 3 || got[1].DERPRegion != 30 {
t.Errorf("node 3: DERPRegion = %d; want 30", got[1].DERPRegion)
}
})
t.Run("preserves_old_fields_on_merge", func(t *testing.T) {
old := []*tailcfg.PeerChange{
{NodeID: 1, DERPRegion: 1, Online: &online, Cap: 10},
}
new := []*tailcfg.PeerChange{
{NodeID: 1, Online: &offline},
}
got := mergePeerChanges(old, new)
if len(got) != 1 {
t.Fatalf("len = %d; want 1", len(got))
}
if got[0].DERPRegion != 1 {
t.Errorf("DERPRegion = %d; want 1 (preserved from old)", got[0].DERPRegion)
}
if got[0].Cap != 10 {
t.Errorf("Cap = %d; want 10 (preserved from old)", got[0].Cap)
}
if *got[0].Online != false {
t.Errorf("Online = %v; want false (from new)", *got[0].Online)
}
})
t.Run("nil_old", func(t *testing.T) {
new := []*tailcfg.PeerChange{
{NodeID: 1, DERPRegion: 1},
}
got := mergePeerChanges(nil, new)
if len(got) != 1 {
t.Fatalf("len = %d; want 1", len(got))
}
if got[0].NodeID != 1 {
t.Errorf("NodeID = %d; want 1", got[0].NodeID)
}
})
}

@ -152,6 +152,7 @@ type watchSession struct {
owner ipnauth.Actor // or nil
sessionID string
cancel context.CancelFunc // to shut down the session
mask ipn.NotifyWatchOpt // watch options for this session
}
var (
@ -2164,10 +2165,20 @@ func (b *LocalBackend) UpdateNetmapDelta(muts []netmap.NodeMutation) (handled bo
b.suggestExitNodeLocked()
}
if cn.NetMap() != nil && mutationsAreWorthyOfTellingIPNBus(muts) {
if cn.NetMap() == nil {
b.logf("[unexpected] got node mutations but netmap is nil; mutations not applied")
return true
}
nm := cn.netMapWithPeers()
notify = &ipn.Notify{NetMap: nm}
if mutationsAreWorthyOfTellingIPNBus(muts) {
// The notifier will strip the netmap based on the watchOpts mask if the watcher
// has indicated it can handle PeerChanges.
notify = &ipn.Notify{NetMap: cn.netMapWithPeers()}
if peerChanges, ok := ipnBusPeerChangesFromNodeMutations(muts); ok {
notify.PeerChanges = peerChanges
} else {
b.logf("[unexpected] got mutations worthy of telling IPN bus but failed to convert to peer changes")
}
} else if testenv.InTest() {
// In tests, send an empty Notify as a wake-up so end-to-end
// integration tests in another repo can check on the status of
@ -2215,6 +2226,39 @@ func mutationsAreWorthyOfRecalculatingSuggestedExitNode(muts []netmap.NodeMutati
return false
}
// ipnBusPeerChangesFromNodeMutations converts a slice of NodeMutations to a slice of
// *tailcfg.PeerChange for use in ipn.Notify.PeerChanges.
// Multiple mutations to the same node are merged into a single PeerChange.
// If we encounter any mutations that we cannot convert to a PeerChange, we return (nil, false)
// to indicate that the caller should send a Notify with the full netmap instead of
// trying to send granular peer changes.
func ipnBusPeerChangesFromNodeMutations(muts []netmap.NodeMutation) ([]*tailcfg.PeerChange, bool) {
byID := map[tailcfg.NodeID]*tailcfg.PeerChange{}
var ordered []*tailcfg.PeerChange
for _, m := range muts {
nid := m.NodeIDBeingMutated()
pc := byID[nid]
if pc == nil {
pc = &tailcfg.PeerChange{NodeID: nid}
byID[nid] = pc
ordered = append(ordered, pc)
}
switch v := m.(type) {
case netmap.NodeMutationOnline:
pc.Online = &v.Online
case netmap.NodeMutationLastSeen:
pc.LastSeen = &v.LastSeen
case netmap.NodeMutationDERPHome:
pc.DERPRegion = v.DERPRegion
case netmap.NodeMutationEndpoints:
pc.Endpoints = v.Endpoints
default:
return nil, false
}
}
return ordered, true
}
// mutationsAreWorthyOfTellingIPNBus reports whether any mutation type in muts is
// worthy of spamming the IPN bus (the Windows & Mac GUIs, basically) to tell them
// about the update.
@ -3212,6 +3256,7 @@ func (b *LocalBackend) WatchNotificationsAs(ctx context.Context, actor ipnauth.A
owner: actor,
sessionID: sessionID,
cancel: cancel,
mask: mask,
}
mak.Set(&b.notifyWatchers, sessionID, session)
b.mu.Unlock()
@ -3449,13 +3494,27 @@ func (b *LocalBackend) sendToLocked(n ipn.Notify, recipient notificationTarget)
}
for _, sess := range b.notifyWatchers {
if recipient.match(sess.owner) {
select {
case sess.ch <- &n:
default:
// Drop the notification if the channel is full.
if !recipient.match(sess.owner) {
continue
}
nOut := &n
if n.PeerChanges != nil {
// Take a shallow copy of n so we can elide the PeerChanges or the Netmap
// based on the session's mask.
nOut = new(n)
if sess.mask&ipn.NotifyPeerChanges != 0 {
// Skip the full Netmap
nOut.NetMap = nil
} else {
// Skip the PeerChanges
nOut.PeerChanges = nil
}
}
select {
case sess.ch <- nOut:
default:
// Drop the notification if the channel is full.
}
}
}

Loading…
Cancel
Save