magicsock, ipnlocal: revert eventbus-based node/filter updates, remove Synchronize hack

Restore synchronous method calls from LocalBackend to magicsock.Conn
for node views, filter, and delta mutations. The eventbus delivery
introduced in 8e6f63cf1 was invalid for these updates because
subsequent operations in the same call chain depend on magicsock
already having the current state. The Synchronize/settleEventBus
workaround was fragile and kept requiring more workarounds and
introducing new mystery bugs.

Since eventbus was added, we've since learned more about when to use
eventbus, and this wasn't one of the cases.

We can take another swing at using eventbus for netmap changes in a
future change.

Fixes #16369
Updates #18575 (likely fixes)

Change-Id: I79057cc9259993368bb1e350ff0e073adf6b9a8f
Signed-off-by: Brad Fitzpatrick <bradfitz@tailscale.com>
This commit is contained in:
Brad Fitzpatrick
2026-02-08 18:07:33 +00:00
committed by Brad Fitzpatrick
parent 086968c15b
commit dc1d811d48
5 changed files with 95 additions and 217 deletions
+42 -104
View File
@@ -177,9 +177,6 @@ type Conn struct {
connCtxCancel func() // closes connCtx
donec <-chan struct{} // connCtx.Done()'s to avoid context.cancelCtx.Done()'s mutex per call
// A publisher for synchronization points to ensure correct ordering of
// config changes between magicsock and wireguard.
syncPub *eventbus.Publisher[syncPoint]
allocRelayEndpointPub *eventbus.Publisher[UDPRelayAllocReq]
portUpdatePub *eventbus.Publisher[router.PortUpdate]
tsmpDiscoKeyAvailablePub *eventbus.Publisher[NewDiscoKeyAvailable]
@@ -362,11 +359,11 @@ type Conn struct {
netInfoLast *tailcfg.NetInfo
derpMap *tailcfg.DERPMap // nil (or zero regions/nodes) means DERP is disabled
self tailcfg.NodeView // from last onNodeViewsUpdate
peers views.Slice[tailcfg.NodeView] // from last onNodeViewsUpdate, sorted by Node.ID; Note: [netmap.NodeMutation]'s rx'd in onNodeMutationsUpdate are never applied
filt *filter.Filter // from last onFilterUpdate
self tailcfg.NodeView // from last SetNetworkMap
peers views.Slice[tailcfg.NodeView] // from last SetNetworkMap, sorted by Node.ID; Note: [netmap.NodeMutation]'s rx'd in UpdateNetmapDelta are never applied
filt *filter.Filter // from last SetFilter
relayClientEnabled bool // whether we can allocate UDP relay endpoints on UDP relay servers or receive CallMeMaybeVia messages from peers
lastFlags debugFlags // at time of last onNodeViewsUpdate
lastFlags debugFlags // at time of last SetNetworkMap
privateKey key.NodePrivate // WireGuard private key for this node
everHadKey bool // whether we ever had a non-zero private key
myDerp int // nearest DERP region ID; 0 means none/unknown
@@ -521,47 +518,6 @@ func (o *Options) derpActiveFunc() func() {
return o.DERPActiveFunc
}
// NodeViewsUpdate represents an update event of [tailcfg.NodeView] for all
// nodes. This event is published over an [eventbus.Bus]. It may be published
// with an invalid SelfNode, and/or zero/nil Peers. [magicsock.Conn] is the sole
// subscriber as of 2025-06. If you are adding more subscribers consider moving
// this type out of magicsock.
type NodeViewsUpdate struct {
SelfNode tailcfg.NodeView
Peers []tailcfg.NodeView // sorted by Node.ID
}
// NodeMutationsUpdate represents an update event of one or more
// [netmap.NodeMutation]. This event is published over an [eventbus.Bus].
// [magicsock.Conn] is the sole subscriber as of 2025-06. If you are adding more
// subscribers consider moving this type out of magicsock.
type NodeMutationsUpdate struct {
Mutations []netmap.NodeMutation
}
// FilterUpdate represents an update event for a [*filter.Filter]. This event is
// signaled over an [eventbus.Bus]. [magicsock.Conn] is the sole subscriber as
// of 2025-06. If you are adding more subscribers consider moving this type out
// of magicsock.
type FilterUpdate struct {
*filter.Filter
}
// syncPoint is an event published over an [eventbus.Bus] by [Conn.Synchronize].
// It serves as a synchronization point, allowing to wait until magicsock
// has processed all pending events.
type syncPoint chan struct{}
// Wait blocks until [syncPoint.Signal] is called.
func (s syncPoint) Wait() {
<-s
}
// Signal signals the sync point, unblocking the [syncPoint.Wait] call.
func (s syncPoint) Signal() {
close(s)
}
// UDPRelayAllocReq represents a [*disco.AllocateUDPRelayEndpointRequest]
// reception event. This is signaled over an [eventbus.Bus] from
// [magicsock.Conn] towards [relayserver.extension].
@@ -654,21 +610,6 @@ func (c *Conn) onUDPRelayAllocResp(allocResp UDPRelayAllocResp) {
}
}
// Synchronize waits for all [eventbus] events published
// prior to this call to be processed by the receiver.
func (c *Conn) Synchronize() {
if c.syncPub == nil {
// Eventbus is not used; no need to synchronize (in certain tests).
return
}
sp := syncPoint(make(chan struct{}))
c.syncPub.Publish(sp)
select {
case <-sp:
case <-c.donec:
}
}
// NewConn creates a magic Conn listening on opts.Port.
// As the set of possible endpoints for a Conn changes, the
// callback opts.EndpointsFunc is called.
@@ -694,18 +635,10 @@ func NewConn(opts Options) (*Conn, error) {
// NewConn otherwise published events can be missed.
ec := c.eventBus.Client("magicsock.Conn")
c.eventClient = ec
c.syncPub = eventbus.Publish[syncPoint](ec)
c.allocRelayEndpointPub = eventbus.Publish[UDPRelayAllocReq](ec)
c.portUpdatePub = eventbus.Publish[router.PortUpdate](ec)
c.tsmpDiscoKeyAvailablePub = eventbus.Publish[NewDiscoKeyAvailable](ec)
eventbus.SubscribeFunc(ec, c.onPortMapChanged)
eventbus.SubscribeFunc(ec, c.onFilterUpdate)
eventbus.SubscribeFunc(ec, c.onNodeViewsUpdate)
eventbus.SubscribeFunc(ec, c.onNodeMutationsUpdate)
eventbus.SubscribeFunc(ec, func(sp syncPoint) {
c.dlogf("magicsock: received sync point after reconfig")
sp.Signal()
})
eventbus.SubscribeFunc(ec, c.onUDPRelayAllocResp)
c.connCtx, c.connCtxCancel = context.WithCancel(context.Background())
@@ -2907,11 +2840,12 @@ func capVerIsRelayCapable(version tailcfg.CapabilityVersion) bool {
return version >= 121
}
// onFilterUpdate is called when a [FilterUpdate] is received over the
// [eventbus.Bus].
func (c *Conn) onFilterUpdate(f FilterUpdate) {
// SetFilter updates the packet filter used by the connection.
// It must be called synchronously from the caller's goroutine to ensure
// magicsock has the current filter before subsequent operations proceed.
func (c *Conn) SetFilter(f *filter.Filter) {
c.mu.Lock()
c.filt = f.Filter
c.filt = f
self := c.self
peers := c.peers
relayClientEnabled := c.relayClientEnabled
@@ -2924,7 +2858,7 @@ func (c *Conn) onFilterUpdate(f FilterUpdate) {
// The filter has changed, and we are operating as a relay server client.
// Re-evaluate it in order to produce an updated relay server set.
c.updateRelayServersSet(f.Filter, self, peers)
c.updateRelayServersSet(f, self, peers)
}
// updateRelayServersSet iterates all peers and self, evaluating filt for each
@@ -3015,21 +2949,24 @@ func (c *candidatePeerRelay) isValid() bool {
return !c.nodeKey.IsZero() && !c.discoKey.IsZero()
}
// onNodeViewsUpdate is called when a [NodeViewsUpdate] is received over the
// [eventbus.Bus].
func (c *Conn) onNodeViewsUpdate(update NodeViewsUpdate) {
peersChanged := c.updateNodes(update)
// SetNetworkMap updates the network map with the given self node and peers.
// It must be called synchronously from the caller's goroutine to ensure
// magicsock has the current state before subsequent operations proceed.
//
// self may be invalid if there's no network map.
func (c *Conn) SetNetworkMap(self tailcfg.NodeView, peers []tailcfg.NodeView) {
peersChanged := c.updateNodes(self, peers)
relayClientEnabled := update.SelfNode.Valid() &&
!update.SelfNode.HasCap(tailcfg.NodeAttrDisableRelayClient) &&
!update.SelfNode.HasCap(tailcfg.NodeAttrOnlyTCP443)
relayClientEnabled := self.Valid() &&
!self.HasCap(tailcfg.NodeAttrDisableRelayClient) &&
!self.HasCap(tailcfg.NodeAttrOnlyTCP443)
c.mu.Lock()
relayClientChanged := c.relayClientEnabled != relayClientEnabled
c.relayClientEnabled = relayClientEnabled
filt := c.filt
self := c.self
peers := c.peers
selfView := c.self
peersView := c.peers
isClosed := c.closed
c.mu.Unlock() // release c.mu before potentially calling c.updateRelayServersSet which is O(m * n)
@@ -3042,15 +2979,14 @@ func (c *Conn) onNodeViewsUpdate(update NodeViewsUpdate) {
c.relayManager.handleRelayServersSet(nil)
c.hasPeerRelayServers.Store(false)
} else {
c.updateRelayServersSet(filt, self, peers)
c.updateRelayServersSet(filt, selfView, peersView)
}
}
}
// updateNodes updates [Conn] to reflect the [tailcfg.NodeView]'s contained
// in update. It returns true if update.Peers was unequal to c.peers, otherwise
// false.
func (c *Conn) updateNodes(update NodeViewsUpdate) (peersChanged bool) {
// updateNodes updates [Conn] to reflect the given self node and peers.
// It reports whether the peers were changed from before.
func (c *Conn) updateNodes(self tailcfg.NodeView, peers []tailcfg.NodeView) (peersChanged bool) {
c.mu.Lock()
defer c.mu.Unlock()
@@ -3059,11 +2995,11 @@ func (c *Conn) updateNodes(update NodeViewsUpdate) (peersChanged bool) {
}
priorPeers := c.peers
metricNumPeers.Set(int64(len(update.Peers)))
metricNumPeers.Set(int64(len(peers)))
// Update c.self & c.peers regardless, before the following early return.
c.self = update.SelfNode
curPeers := views.SliceOf(update.Peers)
c.self = self
curPeers := views.SliceOf(peers)
c.peers = curPeers
// [debugFlags] are mutable in [Conn.SetSilentDisco] &
@@ -3072,7 +3008,7 @@ func (c *Conn) updateNodes(update NodeViewsUpdate) (peersChanged bool) {
// [controlknobs.Knobs] are simply self [tailcfg.NodeCapability]'s. They are
// useful as a global view of notable feature toggles, but the magicsock
// setters are completely unnecessary as we have the same values right here
// (update.SelfNode.Capabilities) at a time they are considered most
// (self.Capabilities) at a time they are considered most
// up-to-date.
// TODO: mutate [debugFlags] here instead of in various [Conn] setters.
flags := c.debugFlagsLocked()
@@ -3088,16 +3024,16 @@ func (c *Conn) updateNodes(update NodeViewsUpdate) (peersChanged bool) {
c.lastFlags = flags
c.logf("[v1] magicsock: got updated network map; %d peers", len(update.Peers))
c.logf("[v1] magicsock: got updated network map; %d peers", len(peers))
entriesPerBuffer := debugRingBufferSize(len(update.Peers))
entriesPerBuffer := debugRingBufferSize(len(peers))
// Try a pass of just upserting nodes and creating missing
// endpoints. If the set of nodes is the same, this is an
// efficient alloc-free update. If the set of nodes is different,
// we'll fall through to the next pass, which allocates but can
// handle full set updates.
for _, n := range update.Peers {
for _, n := range peers {
if n.ID() == 0 {
devPanicf("node with zero ID")
continue
@@ -3197,14 +3133,14 @@ func (c *Conn) updateNodes(update NodeViewsUpdate) (peersChanged bool) {
c.peerMap.upsertEndpoint(ep, key.DiscoPublic{})
}
// If the set of nodes changed since the last onNodeViewsUpdate, the
// If the set of nodes changed since the last SetNetworkMap, the
// upsert loop just above made c.peerMap contain the union of the
// old and new peers - which will be larger than the set from the
// current netmap. If that happens, go through the allocful
// deletion path to clean up moribund nodes.
if c.peerMap.nodeCount() != len(update.Peers) {
if c.peerMap.nodeCount() != len(peers) {
keep := set.Set[key.NodePublic]{}
for _, n := range update.Peers {
for _, n := range peers {
keep.Add(n.Key())
}
c.peerMap.forEachEndpoint(func(ep *endpoint) {
@@ -3739,13 +3675,15 @@ func simpleDur(d time.Duration) time.Duration {
return d.Round(time.Minute)
}
// onNodeMutationsUpdate is called when a [NodeMutationsUpdate] is received over
// the [eventbus.Bus]. Note: It does not apply these mutations to c.peers.
func (c *Conn) onNodeMutationsUpdate(update NodeMutationsUpdate) {
// UpdateNetmapDelta applies the given node mutations to the connection's peer
// state. It must be called synchronously from the caller's goroutine to ensure
// magicsock has the current state before subsequent operations proceed.
// Note: It does not apply these mutations to c.peers.
func (c *Conn) UpdateNetmapDelta(muts []netmap.NodeMutation) {
c.mu.Lock()
defer c.mu.Unlock()
for _, m := range update.Mutations {
for _, m := range muts {
nodeID := m.NodeIDBeingMutated()
ep, ok := c.peerMap.endpointForNodeID(nodeID)
if !ok {
+39 -68
View File
@@ -171,7 +171,7 @@ type magicStack struct {
}
// newMagicStack builds and initializes an idle magicsock and
// friends. You need to call conn.onNodeViewsUpdate and dev.Reconfig
// friends. You need to call conn.SetNetworkMap and dev.Reconfig
// before anything interesting happens.
func newMagicStack(t testing.TB, logf logger.Logf, ln nettype.PacketListener, derpMap *tailcfg.DERPMap) *magicStack {
privateKey := key.NewNode()
@@ -346,13 +346,9 @@ func meshStacks(logf logger.Logf, mutateNetmap func(idx int, nm *netmap.NetworkM
for i, m := range ms {
nm := buildNetmapLocked(i)
nv := NodeViewsUpdate{
SelfNode: nm.SelfNode,
Peers: nm.Peers,
}
m.conn.onNodeViewsUpdate(nv)
peerSet := make(set.Set[key.NodePublic], len(nv.Peers))
for _, peer := range nv.Peers {
m.conn.SetNetworkMap(nm.SelfNode, nm.Peers)
peerSet := make(set.Set[key.NodePublic], len(nm.Peers))
for _, peer := range nm.Peers {
peerSet.Add(peer.Key())
}
m.conn.UpdatePeers(peerSet)
@@ -1388,16 +1384,14 @@ func addTestEndpoint(tb testing.TB, conn *Conn, sendConn net.PacketConn) (key.No
// codepath.
discoKey := key.DiscoPublicFromRaw32(mem.B([]byte{31: 1}))
nodeKey := key.NodePublicFromRaw32(mem.B([]byte{0: 'N', 1: 'K', 31: 0}))
conn.onNodeViewsUpdate(NodeViewsUpdate{
Peers: nodeViews([]*tailcfg.Node{
{
ID: 1,
Key: nodeKey,
DiscoKey: discoKey,
Endpoints: eps(sendConn.LocalAddr().String()),
},
}),
})
conn.SetNetworkMap(tailcfg.NodeView{}, nodeViews([]*tailcfg.Node{
{
ID: 1,
Key: nodeKey,
DiscoKey: discoKey,
Endpoints: eps(sendConn.LocalAddr().String()),
},
}))
conn.SetPrivateKey(key.NodePrivateFromRaw32(mem.B([]byte{0: 1, 31: 0})))
_, err := conn.ParseEndpoint(nodeKey.UntypedHexString())
if err != nil {
@@ -1581,7 +1575,7 @@ func nodeViews(v []*tailcfg.Node) []tailcfg.NodeView {
// doesn't change its disco key doesn't result in a broken state.
//
// https://github.com/tailscale/tailscale/issues/1391
func TestOnNodeViewsUpdateChangingNodeKey(t *testing.T) {
func TestSetNetworkMapChangingNodeKey(t *testing.T) {
conn := newTestConn(t)
t.Cleanup(func() { conn.Close() })
var buf tstest.MemLogger
@@ -1593,32 +1587,28 @@ func TestOnNodeViewsUpdateChangingNodeKey(t *testing.T) {
nodeKey1 := key.NodePublicFromRaw32(mem.B([]byte{0: 'N', 1: 'K', 2: '1', 31: 0}))
nodeKey2 := key.NodePublicFromRaw32(mem.B([]byte{0: 'N', 1: 'K', 2: '2', 31: 0}))
conn.onNodeViewsUpdate(NodeViewsUpdate{
Peers: nodeViews([]*tailcfg.Node{
{
ID: 1,
Key: nodeKey1,
DiscoKey: discoKey,
Endpoints: eps("192.168.1.2:345"),
},
}),
})
conn.SetNetworkMap(tailcfg.NodeView{}, nodeViews([]*tailcfg.Node{
{
ID: 1,
Key: nodeKey1,
DiscoKey: discoKey,
Endpoints: eps("192.168.1.2:345"),
},
}))
_, err := conn.ParseEndpoint(nodeKey1.UntypedHexString())
if err != nil {
t.Fatal(err)
}
for range 3 {
conn.onNodeViewsUpdate(NodeViewsUpdate{
Peers: nodeViews([]*tailcfg.Node{
{
ID: 2,
Key: nodeKey2,
DiscoKey: discoKey,
Endpoints: eps("192.168.1.2:345"),
},
}),
})
conn.SetNetworkMap(tailcfg.NodeView{}, nodeViews([]*tailcfg.Node{
{
ID: 2,
Key: nodeKey2,
DiscoKey: discoKey,
Endpoints: eps("192.168.1.2:345"),
},
}))
}
de, ok := conn.peerMap.endpointForNodeKey(nodeKey2)
@@ -1932,7 +1922,7 @@ func eps(s ...string) []netip.AddrPort {
return eps
}
func TestStressOnNodeViewsUpdate(t *testing.T) {
func TestStressSetNetworkMap(t *testing.T) {
t.Parallel()
conn := newTestConn(t)
@@ -1988,9 +1978,7 @@ func TestStressOnNodeViewsUpdate(t *testing.T) {
}
}
// Set the node views.
conn.onNodeViewsUpdate(NodeViewsUpdate{
Peers: nodeViews(peers),
})
conn.SetNetworkMap(tailcfg.NodeView{}, nodeViews(peers))
// Check invariants.
if err := conn.peerMap.validate(); err != nil {
t.Error(err)
@@ -2113,10 +2101,10 @@ func TestRebindingUDPConn(t *testing.T) {
}
// https://github.com/tailscale/tailscale/issues/6680: don't ignore
// onNodeViewsUpdate calls when there are no peers. (A too aggressive fast path was
// SetNetworkMap calls when there are no peers. (A too aggressive fast path was
// previously bailing out early, thinking there were no changes since all zero
// peers didn't change, but the node views has non-peer info in it too we shouldn't discard)
func TestOnNodeViewsUpdateWithNoPeers(t *testing.T) {
func TestSetNetworkMapWithNoPeers(t *testing.T) {
var c Conn
knobs := &controlknobs.Knobs{}
c.logf = logger.Discard
@@ -2125,9 +2113,7 @@ func TestOnNodeViewsUpdateWithNoPeers(t *testing.T) {
for i := 1; i <= 3; i++ {
v := !debugEnableSilentDisco()
envknob.Setenv("TS_DEBUG_ENABLE_SILENT_DISCO", fmt.Sprint(v))
nv := NodeViewsUpdate{}
c.onNodeViewsUpdate(nv)
t.Logf("ptr %d: %p", i, nv)
c.SetNetworkMap(tailcfg.NodeView{}, nil)
if c.lastFlags.heartbeatDisabled != v {
t.Fatalf("call %d: didn't store netmap", i)
}
@@ -2215,11 +2201,7 @@ func TestIsWireGuardOnlyPeer(t *testing.T) {
},
}),
}
nv := NodeViewsUpdate{
SelfNode: nm.SelfNode,
Peers: nm.Peers,
}
m.conn.onNodeViewsUpdate(nv)
m.conn.SetNetworkMap(nm.SelfNode, nm.Peers)
cfg, err := nmcfg.WGCfg(m.privateKey, nm, t.Logf, netmap.AllowSubnetRoutes, "")
if err != nil {
@@ -2280,11 +2262,7 @@ func TestIsWireGuardOnlyPeerWithMasquerade(t *testing.T) {
},
}),
}
nv := NodeViewsUpdate{
SelfNode: nm.SelfNode,
Peers: nm.Peers,
}
m.conn.onNodeViewsUpdate(nv)
m.conn.SetNetworkMap(nm.SelfNode, nm.Peers)
cfg, err := nmcfg.WGCfg(m.privateKey, nm, t.Logf, netmap.AllowSubnetRoutes, "")
if err != nil {
@@ -2321,11 +2299,7 @@ func TestIsWireGuardOnlyPeerWithMasquerade(t *testing.T) {
// configures WG.
func applyNetworkMap(t *testing.T, m *magicStack, nm *netmap.NetworkMap) {
t.Helper()
nv := NodeViewsUpdate{
SelfNode: nm.SelfNode,
Peers: nm.Peers,
}
m.conn.onNodeViewsUpdate(nv)
m.conn.SetNetworkMap(nm.SelfNode, nm.Peers)
// Make sure we can't use v6 to avoid test failures.
m.conn.noV6.Store(true)
@@ -3590,7 +3564,7 @@ func Test_nodeHasCap(t *testing.T) {
}
}
func TestConn_onNodeViewsUpdate_updateRelayServersSet(t *testing.T) {
func TestConn_SetNetworkMap_updateRelayServersSet(t *testing.T) {
peerNodeCandidateRelay := &tailcfg.Node{
Cap: 121,
ID: 1,
@@ -3752,10 +3726,7 @@ func TestConn_onNodeViewsUpdate_updateRelayServersSet(t *testing.T) {
c.hasPeerRelayServers.Store(true)
}
c.onNodeViewsUpdate(NodeViewsUpdate{
SelfNode: tt.self,
Peers: tt.peers,
})
c.SetNetworkMap(tt.self, tt.peers)
got := c.relayManager.getServers()
if !got.Equal(tt.wantRelayServers) {
t.Fatalf("got: %v != want: %v", got, tt.wantRelayServers)