wgengine/magicsock: replace peers slice with peersByID map; add Upsert/RemovePeer
Replace Conn.peers (sorted views.Slice) with peersByID, a map[tailcfg.NodeID]tailcfg.NodeView. The only caller that needed the sorted slice (the disco message receive path's binary search) becomes a single map lookup. Drop nodesEqual. Add Conn.UpsertPeer / Conn.RemovePeer for O(1) single-peer endpoint work. RemovePeer also performs a targeted single-disco-key cleanup (previously that scan was O(discoInfo)). Extract the shared per-peer upsert body as upsertPeerLocked; still used by SetNetworkMap's bulk path. SetNetworkMap is documented as the bulk / initial / self-change path; UpsertPeer and RemovePeer are preferred for single-peer changes. Make the relay server set update O(1) per peer: add serverUpsertCh / serverRemoveCh to relayManager with matching run-loop handlers. UpsertPeer / RemovePeer evaluate the per-peer relay predicate locally and dispatch upsert or remove. The full-rebuild updateRelayServersSet stays for the initial netmap, filter changes, and fallback. Move the hasPeerRelayServers atomic from Conn onto relayManager, next to the serversByNodeKey map it summarizes. The run loop is now the single writer and needs no back-pointer to Conn; endpoint's two hot-path readers take one extra hop to de.c.relayManager.hasPeerRelayServers but the cost is the same atomic load. No callers use UpsertPeer/RemovePeer yet; a subsequent change will plumb per-peer add/remove through the incremental map update path. Updates #12542 Change-Id: If6a3442fe29ccbd77890ea61b754a4d1ad6ef225 Signed-off-by: Brad Fitzpatrick <bradfitz@tailscale.com>
This commit is contained in:
committed by
Brad Fitzpatrick
parent
f289f7e77c
commit
311dd3839d
@@ -9,6 +9,7 @@ import (
|
||||
"fmt"
|
||||
"net/netip"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"tailscale.com/disco"
|
||||
@@ -34,6 +35,14 @@ import (
|
||||
type relayManager struct {
|
||||
initOnce sync.Once
|
||||
|
||||
// hasPeerRelayServers is whether relayManager is configured with at
|
||||
// least one peer relay server via [relayManager.handleRelayServersSet]
|
||||
// (or per-peer variants). Exposed as an atomic so [endpoint] hot paths
|
||||
// can short-circuit when there are no relay servers without taking any
|
||||
// lock or entering the run loop. Written only from runLoop() via
|
||||
// [relayManager.publishHasServersRunLoop].
|
||||
hasPeerRelayServers atomic.Bool
|
||||
|
||||
// ===================================================================
|
||||
// The following fields are owned by a single goroutine, runLoop().
|
||||
serversByNodeKey map[key.NodePublic]candidatePeerRelay
|
||||
@@ -56,6 +65,8 @@ type relayManager struct {
|
||||
newServerEndpointCh chan newRelayServerEndpointEvent
|
||||
rxDiscoMsgCh chan relayDiscoMsgEvent
|
||||
serversCh chan set.Set[candidatePeerRelay]
|
||||
serverUpsertCh chan candidatePeerRelay
|
||||
serverRemoveCh chan key.NodePublic
|
||||
getServersCh chan chan set.Set[candidatePeerRelay]
|
||||
derpHomeChangeCh chan derpHomeChangeEvent
|
||||
|
||||
@@ -228,6 +239,16 @@ func (r *relayManager) runLoop() {
|
||||
if !r.hasActiveWorkRunLoop() {
|
||||
return
|
||||
}
|
||||
case upsert := <-r.serverUpsertCh:
|
||||
r.handleServerUpsertRunLoop(upsert)
|
||||
if !r.hasActiveWorkRunLoop() {
|
||||
return
|
||||
}
|
||||
case nk := <-r.serverRemoveCh:
|
||||
r.handleServerRemoveRunLoop(nk)
|
||||
if !r.hasActiveWorkRunLoop() {
|
||||
return
|
||||
}
|
||||
case getServersCh := <-r.getServersCh:
|
||||
r.handleGetServersRunLoop(getServersCh)
|
||||
if !r.hasActiveWorkRunLoop() {
|
||||
@@ -265,6 +286,34 @@ func (r *relayManager) handleServersUpdateRunLoop(update set.Set[candidatePeerRe
|
||||
for _, v := range update.Slice() {
|
||||
r.serversByNodeKey[v.nodeKey] = v
|
||||
}
|
||||
r.publishHasServersRunLoop()
|
||||
}
|
||||
|
||||
// handleServerUpsertRunLoop inserts or updates cp in serversByNodeKey. It is
|
||||
// the per-peer analog of [relayManager.handleServersUpdateRunLoop] used by
|
||||
// [Conn.UpsertPeer].
|
||||
func (r *relayManager) handleServerUpsertRunLoop(cp candidatePeerRelay) {
|
||||
r.serversByNodeKey[cp.nodeKey] = cp
|
||||
r.publishHasServersRunLoop()
|
||||
}
|
||||
|
||||
// handleServerRemoveRunLoop deletes nk from serversByNodeKey. It is a no-op
|
||||
// if nk isn't currently a known server. It is the per-peer analog of
|
||||
// [relayManager.handleServersUpdateRunLoop] used by [Conn.RemovePeer] and by
|
||||
// [Conn.UpsertPeer] when a peer is upserted with fields that make it no
|
||||
// longer a relay candidate.
|
||||
func (r *relayManager) handleServerRemoveRunLoop(nk key.NodePublic) {
|
||||
if _, ok := r.serversByNodeKey[nk]; !ok {
|
||||
return
|
||||
}
|
||||
delete(r.serversByNodeKey, nk)
|
||||
r.publishHasServersRunLoop()
|
||||
}
|
||||
|
||||
// publishHasServersRunLoop updates [relayManager.hasPeerRelayServers] to
|
||||
// reflect whether any relay servers are currently known.
|
||||
func (r *relayManager) publishHasServersRunLoop() {
|
||||
r.hasPeerRelayServers.Store(len(r.serversByNodeKey) > 0)
|
||||
}
|
||||
|
||||
type relayDiscoMsgEvent struct {
|
||||
@@ -330,6 +379,8 @@ func (r *relayManager) init() {
|
||||
r.newServerEndpointCh = make(chan newRelayServerEndpointEvent)
|
||||
r.rxDiscoMsgCh = make(chan relayDiscoMsgEvent)
|
||||
r.serversCh = make(chan set.Set[candidatePeerRelay])
|
||||
r.serverUpsertCh = make(chan candidatePeerRelay)
|
||||
r.serverRemoveCh = make(chan key.NodePublic)
|
||||
r.getServersCh = make(chan chan set.Set[candidatePeerRelay])
|
||||
r.derpHomeChangeCh = make(chan derpHomeChangeEvent)
|
||||
r.runLoopStoppedCh = make(chan struct{}, 1)
|
||||
@@ -436,6 +487,21 @@ func (r *relayManager) handleRelayServersSet(servers set.Set[candidatePeerRelay]
|
||||
relayManagerInputEvent(r, nil, &r.serversCh, servers)
|
||||
}
|
||||
|
||||
// handleRelayServerUpsert is the O(1) per-peer variant of
|
||||
// [relayManager.handleRelayServersSet]: it inserts or updates a single
|
||||
// relay server entry.
|
||||
func (r *relayManager) handleRelayServerUpsert(cp candidatePeerRelay) {
|
||||
relayManagerInputEvent(r, nil, &r.serverUpsertCh, cp)
|
||||
}
|
||||
|
||||
// handleRelayServerRemove is the O(1) per-peer variant of
|
||||
// [relayManager.handleRelayServersSet]: it removes a single relay server
|
||||
// entry by node key. It is a no-op if nk is not currently a known relay
|
||||
// server.
|
||||
func (r *relayManager) handleRelayServerRemove(nk key.NodePublic) {
|
||||
relayManagerInputEvent(r, nil, &r.serverRemoveCh, nk)
|
||||
}
|
||||
|
||||
// relayManagerInputEvent initializes [relayManager] if necessary, starts
|
||||
// relayManager.runLoop() if it is not running, and writes 'event' on 'eventCh'.
|
||||
//
|
||||
|
||||
Reference in New Issue
Block a user