feature/portlist: pull portlist service porting into extension, use eventbus
And yay: tsnet (and thus k8s-operator etc) no longer depends on portlist! And LocalBackend is smaller. Removes 50 KB from the minimal binary. Updates #12614 Change-Id: Iee04057053dc39305303e8bd1d9599db8368d926 Signed-off-by: Brad Fitzpatrick <bradfitz@tailscale.com>
This commit is contained in:
committed by
Brad Fitzpatrick
parent
bbc5107d7d
commit
45d635cc98
@@ -372,6 +372,10 @@ type Hooks struct {
|
||||
// SetPeerStatus is called to mutate PeerStatus.
|
||||
// Callers must only use NodeBackend to read data.
|
||||
SetPeerStatus feature.Hooks[func(*ipnstate.PeerStatus, tailcfg.NodeView, NodeBackend)]
|
||||
|
||||
// ShouldUploadServices reports whether this node should include services
|
||||
// in Hostinfo from the portlist extension.
|
||||
ShouldUploadServices feature.Hook[func() bool]
|
||||
}
|
||||
|
||||
// NodeBackend is an interface to query the current node and its peers.
|
||||
@@ -398,4 +402,9 @@ type NodeBackend interface {
|
||||
// It effectively just reports whether PeerAPIBase(node) is non-empty, but
|
||||
// potentially more efficiently.
|
||||
PeerHasPeerAPI(tailcfg.NodeView) bool
|
||||
|
||||
// CollectServices reports whether the control plane is telling this
|
||||
// node that the portlist service collection is desirable, should it
|
||||
// choose to report them.
|
||||
CollectServices() bool
|
||||
}
|
||||
|
||||
+37
-94
@@ -61,7 +61,6 @@ import (
|
||||
"tailscale.com/ipn/ipnauth"
|
||||
"tailscale.com/ipn/ipnext"
|
||||
"tailscale.com/ipn/ipnstate"
|
||||
"tailscale.com/ipn/policy"
|
||||
"tailscale.com/log/sockstatlog"
|
||||
"tailscale.com/logpolicy"
|
||||
"tailscale.com/net/dns"
|
||||
@@ -77,7 +76,6 @@ import (
|
||||
"tailscale.com/net/tsaddr"
|
||||
"tailscale.com/net/tsdial"
|
||||
"tailscale.com/paths"
|
||||
"tailscale.com/portlist"
|
||||
"tailscale.com/posture"
|
||||
"tailscale.com/syncs"
|
||||
"tailscale.com/tailcfg"
|
||||
@@ -211,12 +209,10 @@ type LocalBackend struct {
|
||||
pushDeviceToken syncs.AtomicValue[string]
|
||||
backendLogID logid.PublicID
|
||||
unregisterSysPolicyWatch func()
|
||||
portpoll *portlist.Poller // may be nil
|
||||
portpollOnce sync.Once // guards starting readPoller
|
||||
varRoot string // or empty if SetVarRoot never called
|
||||
logFlushFunc func() // or nil if SetLogFlusher wasn't called
|
||||
em *expiryManager // non-nil; TODO(nickkhyl): move to nodeBackend
|
||||
sshAtomicBool atomic.Bool // TODO(nickkhyl): move to nodeBackend
|
||||
varRoot string // or empty if SetVarRoot never called
|
||||
logFlushFunc func() // or nil if SetLogFlusher wasn't called
|
||||
em *expiryManager // non-nil; TODO(nickkhyl): move to nodeBackend
|
||||
sshAtomicBool atomic.Bool // TODO(nickkhyl): move to nodeBackend
|
||||
// webClientAtomicBool controls whether the web client is running. This should
|
||||
// be true unless the disable-web-client node attribute has been set.
|
||||
webClientAtomicBool atomic.Bool // TODO(nickkhyl): move to nodeBackend
|
||||
@@ -522,7 +518,6 @@ func NewLocalBackend(logf logger.Logf, logID logid.PublicID, sys *tsd.System, lo
|
||||
pm: pm,
|
||||
backendLogID: logID,
|
||||
state: ipn.NoState,
|
||||
portpoll: new(portlist.Poller),
|
||||
em: newExpiryManager(logf, sys.Bus.Get()),
|
||||
loginFlags: loginFlags,
|
||||
clock: clock,
|
||||
@@ -619,6 +614,12 @@ func (b *LocalBackend) consumeEventbusTopics(ec *eventbus.Client) func(*eventbus
|
||||
healthChangeSub := eventbus.Subscribe[health.Change](ec)
|
||||
changeDeltaSub := eventbus.Subscribe[netmon.ChangeDelta](ec)
|
||||
|
||||
var portlist <-chan PortlistServices
|
||||
if buildfeatures.HasPortList {
|
||||
portlistSub := eventbus.Subscribe[PortlistServices](ec)
|
||||
portlist = portlistSub.Events()
|
||||
}
|
||||
|
||||
return func(ec *eventbus.Client) {
|
||||
for {
|
||||
select {
|
||||
@@ -632,6 +633,10 @@ func (b *LocalBackend) consumeEventbusTopics(ec *eventbus.Client) func(*eventbus
|
||||
b.onHealthChange(change)
|
||||
case changeDelta := <-changeDeltaSub.Events():
|
||||
b.linkChange(&changeDelta)
|
||||
case pl := <-portlist:
|
||||
if buildfeatures.HasPortList { // redundant, but explicit for linker deadcode and humans
|
||||
b.setPortlistServices(pl)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -2300,15 +2305,6 @@ func (b *LocalBackend) SetControlClientGetterForTesting(newControlClient func(co
|
||||
b.ccGen = newControlClient
|
||||
}
|
||||
|
||||
// DisablePortPollerForTest disables the port list poller for tests.
|
||||
// It must be called before Start.
|
||||
func (b *LocalBackend) DisablePortPollerForTest() {
|
||||
testenv.AssertInTest()
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
b.portpoll = nil
|
||||
}
|
||||
|
||||
// PeersForTest returns all the current peers, sorted by Node.ID,
|
||||
// for integration tests in another repo.
|
||||
func (b *LocalBackend) PeersForTest() []tailcfg.NodeView {
|
||||
@@ -2457,12 +2453,6 @@ func (b *LocalBackend) Start(opts ipn.Options) error {
|
||||
persistv = new(persist.Persist)
|
||||
}
|
||||
|
||||
if b.portpoll != nil {
|
||||
b.portpollOnce.Do(func() {
|
||||
b.goTracker.Go(b.readPoller)
|
||||
})
|
||||
}
|
||||
|
||||
discoPublic := b.MagicConn().DiscoPublicKey()
|
||||
|
||||
var err error
|
||||
@@ -2906,57 +2896,6 @@ func shrinkDefaultRoute(route netip.Prefix, localInterfaceRoutes *netipx.IPSet,
|
||||
return b.IPSet()
|
||||
}
|
||||
|
||||
// readPoller is a goroutine that receives service lists from
|
||||
// b.portpoll and propagates them into the controlclient's HostInfo.
|
||||
func (b *LocalBackend) readPoller() {
|
||||
if !envknob.BoolDefaultTrue("TS_PORTLIST") {
|
||||
return
|
||||
}
|
||||
|
||||
ticker, tickerChannel := b.clock.NewTicker(portlist.PollInterval())
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-tickerChannel:
|
||||
case <-b.ctx.Done():
|
||||
return
|
||||
}
|
||||
|
||||
if !b.shouldUploadServices() {
|
||||
continue
|
||||
}
|
||||
|
||||
ports, changed, err := b.portpoll.Poll()
|
||||
if err != nil {
|
||||
b.logf("error polling for open ports: %v", err)
|
||||
return
|
||||
}
|
||||
if !changed {
|
||||
continue
|
||||
}
|
||||
sl := []tailcfg.Service{}
|
||||
for _, p := range ports {
|
||||
s := tailcfg.Service{
|
||||
Proto: tailcfg.ServiceProto(p.Proto),
|
||||
Port: p.Port,
|
||||
Description: p.Process,
|
||||
}
|
||||
if policy.IsInterestingService(s, version.OS()) {
|
||||
sl = append(sl, s)
|
||||
}
|
||||
}
|
||||
|
||||
b.mu.Lock()
|
||||
if b.hostinfo == nil {
|
||||
b.hostinfo = new(tailcfg.Hostinfo)
|
||||
}
|
||||
b.hostinfo.Services = sl
|
||||
b.mu.Unlock()
|
||||
|
||||
b.doSetHostinfoFilterServices()
|
||||
}
|
||||
}
|
||||
|
||||
// GetPushDeviceToken returns the push notification device token.
|
||||
func (b *LocalBackend) GetPushDeviceToken() string {
|
||||
return b.pushDeviceToken.Load()
|
||||
@@ -3853,23 +3792,6 @@ func (b *LocalBackend) parseWgStatusLocked(s *wgengine.Status) (ret ipn.EngineSt
|
||||
return ret
|
||||
}
|
||||
|
||||
// shouldUploadServices reports whether this node should include services
|
||||
// in Hostinfo. When the user preferences currently request "shields up"
|
||||
// mode, all inbound connections are refused, so services are not reported.
|
||||
// Otherwise, shouldUploadServices respects NetMap.CollectServices.
|
||||
// TODO(nickkhyl): move this into [nodeBackend]?
|
||||
func (b *LocalBackend) shouldUploadServices() bool {
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
|
||||
p := b.pm.CurrentPrefs()
|
||||
nm := b.currentNode().NetMap()
|
||||
if !p.Valid() || nm == nil {
|
||||
return false // default to safest setting
|
||||
}
|
||||
return !p.ShieldsUp() && nm.CollectServices
|
||||
}
|
||||
|
||||
// SetCurrentUser is used to implement support for multi-user systems (only
|
||||
// Windows 2022-11-25). On such systems, the actor is used to determine which
|
||||
// user's state should be used. The current user is maintained by active
|
||||
@@ -4812,6 +4734,25 @@ func (b *LocalBackend) peerAPIServicesLocked() (ret []tailcfg.Service) {
|
||||
return ret
|
||||
}
|
||||
|
||||
// PortlistServices is an eventbus topic for the portlist extension
|
||||
// to advertise the running services on the host.
|
||||
type PortlistServices []tailcfg.Service
|
||||
|
||||
func (b *LocalBackend) setPortlistServices(sl []tailcfg.Service) {
|
||||
if !buildfeatures.HasPortList { // redundant, but explicit for linker deadcode and humans
|
||||
return
|
||||
}
|
||||
|
||||
b.mu.Lock()
|
||||
if b.hostinfo == nil {
|
||||
b.hostinfo = new(tailcfg.Hostinfo)
|
||||
}
|
||||
b.hostinfo.Services = sl
|
||||
b.mu.Unlock()
|
||||
|
||||
b.doSetHostinfoFilterServices()
|
||||
}
|
||||
|
||||
// doSetHostinfoFilterServices calls SetHostinfo on the controlclient,
|
||||
// possibly after mangling the given hostinfo.
|
||||
//
|
||||
@@ -4837,13 +4778,15 @@ func (b *LocalBackend) doSetHostinfoFilterServices() {
|
||||
|
||||
// TODO(maisem,bradfitz): store hostinfo as a view, not as a mutable struct.
|
||||
hi := *b.hostinfo // shallow copy
|
||||
unlock.UnlockEarly()
|
||||
|
||||
// Make a shallow copy of hostinfo so we can mutate
|
||||
// at the Service field.
|
||||
if !b.shouldUploadServices() {
|
||||
if f, ok := b.extHost.Hooks().ShouldUploadServices.GetOk(); !ok || !f() {
|
||||
hi.Services = []tailcfg.Service{}
|
||||
}
|
||||
|
||||
unlock.UnlockEarly()
|
||||
|
||||
// Don't mutate hi.Service's underlying array. Append to
|
||||
// the slice with no free capacity.
|
||||
c := len(hi.Services)
|
||||
|
||||
@@ -5816,7 +5816,6 @@ func newLocalBackendWithSysAndTestControl(t *testing.T, enableLogging bool, sys
|
||||
t.Fatalf("NewLocalBackend: %v", err)
|
||||
}
|
||||
t.Cleanup(b.Shutdown)
|
||||
b.DisablePortPollerForTest()
|
||||
|
||||
b.SetControlClientGetterForTesting(func(opts controlclient.Options) (controlclient.Client, error) {
|
||||
return newControl(t, opts), nil
|
||||
|
||||
@@ -258,6 +258,12 @@ func (nb *nodeBackend) PeersForTest() []tailcfg.NodeView {
|
||||
return ret
|
||||
}
|
||||
|
||||
func (nb *nodeBackend) CollectServices() bool {
|
||||
nb.mu.Lock()
|
||||
defer nb.mu.Unlock()
|
||||
return nb.netMap != nil && nb.netMap.CollectServices
|
||||
}
|
||||
|
||||
// AppendMatchingPeers returns base with all peers that match pred appended.
|
||||
//
|
||||
// It acquires b.mu to read the netmap but releases it before calling pred.
|
||||
|
||||
@@ -358,7 +358,6 @@ func TestStateMachine(t *testing.T) {
|
||||
t.Fatalf("NewLocalBackend: %v", err)
|
||||
}
|
||||
t.Cleanup(b.Shutdown)
|
||||
b.DisablePortPollerForTest()
|
||||
|
||||
var cc, previousCC *mockControl
|
||||
b.SetControlClientGetterForTesting(func(opts controlclient.Options) (controlclient.Client, error) {
|
||||
|
||||
@@ -45,7 +45,6 @@ func newBackend(opts *options) *ipnlocal.LocalBackend {
|
||||
tb.Fatalf("NewLocalBackend: %v", err)
|
||||
}
|
||||
tb.Cleanup(b.Shutdown)
|
||||
b.DisablePortPollerForTest()
|
||||
b.SetControlClientGetterForTesting(opts.MakeControlClient)
|
||||
return b
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user