feature/relayserver,ipn/ipnlocal,net/udprelay: plumb DERPMap (#17881)

This commit replaces usage of local.Client in net/udprelay with DERPMap
plumbing over the eventbus. This has been a longstanding TODO. This work
was also accelerated by a memory leak in net/http when using
local.Client over long periods of time. So, this commit also addresses
said leak.

Updates #17801

Signed-off-by: Jordan Whited <jordan@tailscale.com>
This commit is contained in:
Jordan Whited
2025-11-13 20:57:48 -08:00
committed by GitHub
parent 146ea42822
commit 9e4d1fd87f
4 changed files with 324 additions and 169 deletions
+109 -111
View File
@@ -21,8 +21,10 @@ import (
"tailscale.com/ipn/ipnext"
"tailscale.com/ipn/localapi"
"tailscale.com/net/udprelay"
"tailscale.com/net/udprelay/endpoint"
"tailscale.com/net/udprelay/status"
"tailscale.com/tailcfg"
"tailscale.com/types/key"
"tailscale.com/types/logger"
"tailscale.com/types/ptr"
"tailscale.com/util/eventbus"
@@ -68,25 +70,41 @@ func servePeerRelayDebugSessions(h *localapi.Handler, w http.ResponseWriter, r *
// extension. It is registered with [ipnext.RegisterExtension] if the package is
// imported.
func newExtension(logf logger.Logf, sb ipnext.SafeBackend) (ipnext.Extension, error) {
return &extension{
e := &extension{
newServerFn: func(logf logger.Logf, port int, overrideAddrs []netip.Addr) (relayServer, error) {
return udprelay.NewServer(logf, port, overrideAddrs)
},
logf: logger.WithPrefix(logf, featureName+": "),
bus: sb.Sys().Bus.Get(),
}, nil
}
e.ec = sb.Sys().Bus.Get().Client("relayserver.extension")
e.respPub = eventbus.Publish[magicsock.UDPRelayAllocResp](e.ec)
eventbus.SubscribeFunc(e.ec, e.onDERPMapView)
eventbus.SubscribeFunc(e.ec, e.onAllocReq)
return e, nil
}
// relayServer is an interface for [udprelay.Server].
type relayServer interface {
Close() error
AllocateEndpoint(discoA, discoB key.DiscoPublic) (endpoint.ServerEndpoint, error)
GetSessions() []status.ServerSession
SetDERPMapView(tailcfg.DERPMapView)
}
// extension is an [ipnext.Extension] managing the relay server on platforms
// that import this package.
type extension struct {
logf logger.Logf
bus *eventbus.Bus
newServerFn func(logf logger.Logf, port int, overrideAddrs []netip.Addr) (relayServer, error) // swappable for tests
logf logger.Logf
ec *eventbus.Client
respPub *eventbus.Publisher[magicsock.UDPRelayAllocResp]
mu sync.Mutex // guards the following fields
shutdown bool
port *int // ipn.Prefs.RelayServerPort, nil if disabled
eventSubs *eventbus.Monitor // nil if not connected to eventbus
debugSessionsCh chan chan []status.ServerSession // non-nil if consumeEventbusTopics is running
hasNodeAttrDisableRelayServer bool // tailcfg.NodeAttrDisableRelayServer
mu sync.Mutex // guards the following fields
shutdown bool // true if Shutdown() has been called
rs relayServer // nil when disabled
port *int // ipn.Prefs.RelayServerPort, nil if disabled
derpMapView tailcfg.DERPMapView // latest seen over the eventbus
hasNodeAttrDisableRelayServer bool // [tailcfg.NodeAttrDisableRelayServer]
}
// Name implements [ipnext.Extension].
@@ -104,26 +122,83 @@ func (e *extension) Init(host ipnext.Host) error {
return nil
}
// handleBusLifetimeLocked handles the lifetime of consumeEventbusTopics.
func (e *extension) handleBusLifetimeLocked() {
busShouldBeRunning := !e.shutdown && e.port != nil && !e.hasNodeAttrDisableRelayServer
if !busShouldBeRunning {
e.disconnectFromBusLocked()
func (e *extension) onDERPMapView(view tailcfg.DERPMapView) {
e.mu.Lock()
defer e.mu.Unlock()
e.derpMapView = view
if e.rs != nil {
e.rs.SetDERPMapView(view)
}
}
func (e *extension) onAllocReq(req magicsock.UDPRelayAllocReq) {
e.mu.Lock()
defer e.mu.Unlock()
if e.shutdown {
return
} else if e.eventSubs != nil {
}
if e.rs == nil {
if !e.relayServerShouldBeRunningLocked() {
return
}
e.tryStartRelayServerLocked()
if e.rs == nil {
return
}
}
se, err := e.rs.AllocateEndpoint(req.Message.ClientDisco[0], req.Message.ClientDisco[1])
if err != nil {
e.logf("error allocating endpoint: %v", err)
return
}
e.respPub.Publish(magicsock.UDPRelayAllocResp{
ReqRxFromNodeKey: req.RxFromNodeKey,
ReqRxFromDiscoKey: req.RxFromDiscoKey,
Message: &disco.AllocateUDPRelayEndpointResponse{
Generation: req.Message.Generation,
UDPRelayEndpoint: disco.UDPRelayEndpoint{
ServerDisco: se.ServerDisco,
ClientDisco: se.ClientDisco,
LamportID: se.LamportID,
VNI: se.VNI,
BindLifetime: se.BindLifetime.Duration,
SteadyStateLifetime: se.SteadyStateLifetime.Duration,
AddrPorts: se.AddrPorts,
},
},
})
}
func (e *extension) tryStartRelayServerLocked() {
rs, err := e.newServerFn(e.logf, *e.port, overrideAddrs())
if err != nil {
e.logf("error initializing server: %v", err)
return
}
e.rs = rs
e.rs.SetDERPMapView(e.derpMapView)
}
func (e *extension) relayServerShouldBeRunningLocked() bool {
return !e.shutdown && e.port != nil && !e.hasNodeAttrDisableRelayServer
}
// handleRelayServerLifetimeLocked handles the lifetime of [e.rs].
func (e *extension) handleRelayServerLifetimeLocked() {
if !e.relayServerShouldBeRunningLocked() {
e.stopRelayServerLocked()
return
} else if e.rs != nil {
return // already running
}
ec := e.bus.Client("relayserver.extension")
e.debugSessionsCh = make(chan chan []status.ServerSession)
e.eventSubs = ptr.To(ec.Monitor(e.consumeEventbusTopics(ec, *e.port)))
e.tryStartRelayServerLocked()
}
func (e *extension) selfNodeViewChanged(nodeView tailcfg.NodeView) {
e.mu.Lock()
defer e.mu.Unlock()
e.hasNodeAttrDisableRelayServer = nodeView.HasCap(tailcfg.NodeAttrDisableRelayServer)
e.handleBusLifetimeLocked()
e.handleRelayServerLifetimeLocked()
}
func (e *extension) profileStateChanged(_ ipn.LoginProfileView, prefs ipn.PrefsView, sameNode bool) {
@@ -133,13 +208,13 @@ func (e *extension) profileStateChanged(_ ipn.LoginProfileView, prefs ipn.PrefsV
enableOrDisableServer := ok != (e.port != nil)
portChanged := ok && e.port != nil && newPort != *e.port
if enableOrDisableServer || portChanged || !sameNode {
e.disconnectFromBusLocked()
e.stopRelayServerLocked()
e.port = nil
if ok {
e.port = ptr.To(newPort)
}
}
e.handleBusLifetimeLocked()
e.handleRelayServerLifetimeLocked()
}
// overrideAddrs returns TS_DEBUG_RELAY_SERVER_ADDRS as []netip.Addr, if set. It
@@ -162,88 +237,20 @@ var overrideAddrs = sync.OnceValue(func() (ret []netip.Addr) {
return
})
// consumeEventbusTopics serves endpoint allocation requests over the eventbus.
// It also serves [relayServer] debug information on a channel.
// consumeEventbusTopics must never acquire [extension.mu], which can be held
// by other goroutines while waiting to receive on [extension.eventSubs] or the
// inner [extension.debugSessionsCh] channel.
func (e *extension) consumeEventbusTopics(ec *eventbus.Client, port int) func(*eventbus.Client) {
reqSub := eventbus.Subscribe[magicsock.UDPRelayAllocReq](ec)
respPub := eventbus.Publish[magicsock.UDPRelayAllocResp](ec)
debugSessionsCh := e.debugSessionsCh
return func(ec *eventbus.Client) {
rs, err := udprelay.NewServer(e.logf, port, overrideAddrs())
if err != nil {
e.logf("error initializing server: %v", err)
}
defer func() {
if rs != nil {
rs.Close()
}
}()
for {
select {
case <-ec.Done():
return
case respCh := <-debugSessionsCh:
if rs == nil {
respCh <- nil
continue
}
sessions := rs.GetSessions()
respCh <- sessions
case req := <-reqSub.Events():
if rs == nil {
// The server may have previously failed to initialize if
// the configured port was in use, try again.
rs, err = udprelay.NewServer(e.logf, port, overrideAddrs())
if err != nil {
e.logf("error initializing server: %v", err)
continue
}
}
se, err := rs.AllocateEndpoint(req.Message.ClientDisco[0], req.Message.ClientDisco[1])
if err != nil {
e.logf("error allocating endpoint: %v", err)
continue
}
respPub.Publish(magicsock.UDPRelayAllocResp{
ReqRxFromNodeKey: req.RxFromNodeKey,
ReqRxFromDiscoKey: req.RxFromDiscoKey,
Message: &disco.AllocateUDPRelayEndpointResponse{
Generation: req.Message.Generation,
UDPRelayEndpoint: disco.UDPRelayEndpoint{
ServerDisco: se.ServerDisco,
ClientDisco: se.ClientDisco,
LamportID: se.LamportID,
VNI: se.VNI,
BindLifetime: se.BindLifetime.Duration,
SteadyStateLifetime: se.SteadyStateLifetime.Duration,
AddrPorts: se.AddrPorts,
},
},
})
}
}
}
}
func (e *extension) disconnectFromBusLocked() {
if e.eventSubs != nil {
e.eventSubs.Close()
e.eventSubs = nil
e.debugSessionsCh = nil
func (e *extension) stopRelayServerLocked() {
if e.rs != nil {
e.rs.Close()
}
e.rs = nil
}
// Shutdown implements [ipnlocal.Extension].
func (e *extension) Shutdown() error {
e.mu.Lock()
defer e.mu.Unlock()
e.disconnectFromBusLocked()
e.shutdown = true
e.ec.Close()
e.stopRelayServerLocked()
return nil
}
@@ -253,23 +260,14 @@ func (e *extension) Shutdown() error {
func (e *extension) serverStatus() status.ServerStatus {
e.mu.Lock()
defer e.mu.Unlock()
st := status.ServerStatus{
UDPPort: nil,
Sessions: nil,
}
if e.port == nil || e.eventSubs == nil {
if e.rs == nil {
return st
}
st.UDPPort = ptr.To(*e.port)
ch := make(chan []status.ServerSession)
select {
case e.debugSessionsCh <- ch:
resp := <-ch
st.Sessions = resp
return st
case <-e.eventSubs.Done():
return st
}
st.Sessions = e.rs.GetSessions()
return st
}