all: migrate code off Notify.NetMap to Notify.SelfChange

Move tailscaled's in-tree reactive users from of IPN bus Notify.NetMap
updates to the narrower Notify.SelfChange signal introduced earlier in
this series. Consumers that need additional state (peers, DNS config,
etc.) fetch it on demand via the LocalAPI.

It is a step toward the larger goal of not fanning Notify.NetMap out
to every bus watcher on Linux/non-GUI hosts.

A future change stops sending Notify.NetMap entirely on Linux and
non-GUI platforms. (eventually once macOS/iOS/Windows migrate to the
upcoming new Notify APIs, we'll remove ipn.Notify.NetMap entirely)

Updates #12542

Change-Id: I51ea9d86bdca1909d6ac0e7d5bd3934a3a4e8516
Signed-off-by: Brad Fitzpatrick <bradfitz@tailscale.com>
This commit is contained in:
Brad Fitzpatrick
2026-05-01 03:47:18 +00:00
committed by Brad Fitzpatrick
parent ff9c3f0e00
commit 4c3ed5ab32
14 changed files with 498 additions and 410 deletions
+27 -27
View File
@@ -24,10 +24,10 @@ import (
"github.com/fsnotify/fsnotify" "github.com/fsnotify/fsnotify"
"tailscale.com/client/local" "tailscale.com/client/local"
"tailscale.com/ipn"
"tailscale.com/kube/egressservices" "tailscale.com/kube/egressservices"
"tailscale.com/kube/kubeclient" "tailscale.com/kube/kubeclient"
"tailscale.com/kube/kubetypes" "tailscale.com/kube/kubetypes"
"tailscale.com/types/netmap"
"tailscale.com/util/httpm" "tailscale.com/util/httpm"
"tailscale.com/util/linuxfw" "tailscale.com/util/linuxfw"
"tailscale.com/util/mak" "tailscale.com/util/mak"
@@ -55,7 +55,7 @@ type egressProxy struct {
tsClient *local.Client // never nil tsClient *local.Client // never nil
netmapChan chan ipn.Notify // chan to receive netmap updates on netmapChan chan *netmap.NetworkMap // chan to receive netmap updates on
podIPv4 string // never empty string, currently only IPv4 is supported podIPv4 string // never empty string, currently only IPv4 is supported
@@ -87,7 +87,7 @@ type httpClient interface {
// - the mounted egress config has changed // - the mounted egress config has changed
// - the proxy's tailnet IP addresses have changed // - the proxy's tailnet IP addresses have changed
// - tailnet IPs have changed for any backend targets specified by tailnet FQDN // - tailnet IPs have changed for any backend targets specified by tailnet FQDN
func (ep *egressProxy) run(ctx context.Context, n ipn.Notify, opts egressProxyRunOpts) error { func (ep *egressProxy) run(ctx context.Context, nm *netmap.NetworkMap, opts egressProxyRunOpts) error {
ep.configure(opts) ep.configure(opts)
var tickChan <-chan time.Time var tickChan <-chan time.Time
var eventChan <-chan fsnotify.Event var eventChan <-chan fsnotify.Event
@@ -106,7 +106,7 @@ func (ep *egressProxy) run(ctx context.Context, n ipn.Notify, opts egressProxyRu
eventChan = w.Events eventChan = w.Events
} }
if err := ep.sync(ctx, n); err != nil { if err := ep.sync(ctx, nm); err != nil {
return err return err
} }
for { for {
@@ -117,14 +117,14 @@ func (ep *egressProxy) run(ctx context.Context, n ipn.Notify, opts egressProxyRu
log.Printf("periodic sync, ensuring firewall config is up to date...") log.Printf("periodic sync, ensuring firewall config is up to date...")
case <-eventChan: case <-eventChan:
log.Printf("config file change detected, ensuring firewall config is up to date...") log.Printf("config file change detected, ensuring firewall config is up to date...")
case n = <-ep.netmapChan: case nm = <-ep.netmapChan:
shouldResync := ep.shouldResync(n) shouldResync := ep.shouldResync(nm)
if !shouldResync { if !shouldResync {
continue continue
} }
log.Printf("netmap change detected, ensuring firewall config is up to date...") log.Printf("netmap change detected, ensuring firewall config is up to date...")
} }
if err := ep.sync(ctx, n); err != nil { if err := ep.sync(ctx, nm); err != nil {
return fmt.Errorf("error syncing egress service config: %w", err) return fmt.Errorf("error syncing egress service config: %w", err)
} }
} }
@@ -136,7 +136,7 @@ type egressProxyRunOpts struct {
kc kubeclient.Client kc kubeclient.Client
tsClient *local.Client tsClient *local.Client
stateSecret string stateSecret string
netmapChan chan ipn.Notify netmapChan chan *netmap.NetworkMap
podIPv4 string podIPv4 string
tailnetAddrs []netip.Prefix tailnetAddrs []netip.Prefix
} }
@@ -165,7 +165,7 @@ func (ep *egressProxy) configure(opts egressProxyRunOpts) {
// any firewall rules need to be updated. Currently using status in state Secret as a reference for what is the current // any firewall rules need to be updated. Currently using status in state Secret as a reference for what is the current
// firewall configuration is good enough because - the status is keyed by the Pod IP - we crash the Pod on errors such // firewall configuration is good enough because - the status is keyed by the Pod IP - we crash the Pod on errors such
// as failed firewall update // as failed firewall update
func (ep *egressProxy) sync(ctx context.Context, n ipn.Notify) error { func (ep *egressProxy) sync(ctx context.Context, nm *netmap.NetworkMap) error {
cfgs, err := ep.getConfigs() cfgs, err := ep.getConfigs()
if err != nil { if err != nil {
return fmt.Errorf("error retrieving egress service configs: %w", err) return fmt.Errorf("error retrieving egress service configs: %w", err)
@@ -174,12 +174,12 @@ func (ep *egressProxy) sync(ctx context.Context, n ipn.Notify) error {
if err != nil { if err != nil {
return fmt.Errorf("error retrieving current egress proxy status: %w", err) return fmt.Errorf("error retrieving current egress proxy status: %w", err)
} }
newStatus, err := ep.syncEgressConfigs(cfgs, status, n) newStatus, err := ep.syncEgressConfigs(cfgs, status, nm)
if err != nil { if err != nil {
return fmt.Errorf("error syncing egress service configs: %w", err) return fmt.Errorf("error syncing egress service configs: %w", err)
} }
if !servicesStatusIsEqual(newStatus, status) { if !servicesStatusIsEqual(newStatus, status) {
if err := ep.setStatus(ctx, newStatus, n); err != nil { if err := ep.setStatus(ctx, newStatus, nm); err != nil {
return fmt.Errorf("error setting egress proxy status: %w", err) return fmt.Errorf("error setting egress proxy status: %w", err)
} }
} }
@@ -188,14 +188,14 @@ func (ep *egressProxy) sync(ctx context.Context, n ipn.Notify) error {
// addrsHaveChanged returns true if the provided netmap update contains tailnet address change for this proxy node. // addrsHaveChanged returns true if the provided netmap update contains tailnet address change for this proxy node.
// Netmap must not be nil. // Netmap must not be nil.
func (ep *egressProxy) addrsHaveChanged(n ipn.Notify) bool { func (ep *egressProxy) addrsHaveChanged(nm *netmap.NetworkMap) bool {
return !reflect.DeepEqual(ep.tailnetAddrs, n.NetMap.SelfNode.Addresses()) return !reflect.DeepEqual(ep.tailnetAddrs, nm.SelfNode.Addresses())
} }
// syncEgressConfigs adds and deletes firewall rules to match the desired // syncEgressConfigs adds and deletes firewall rules to match the desired
// configuration. It uses the provided status to determine what is currently // configuration. It uses the provided status to determine what is currently
// applied and updates the status after a successful sync. // applied and updates the status after a successful sync.
func (ep *egressProxy) syncEgressConfigs(cfgs egressservices.Configs, status *egressservices.Status, n ipn.Notify) (*egressservices.Status, error) { func (ep *egressProxy) syncEgressConfigs(cfgs egressservices.Configs, status *egressservices.Status, nm *netmap.NetworkMap) (*egressservices.Status, error) {
if !(wantsServicesConfigured(cfgs) || hasServicesConfigured(status)) { if !(wantsServicesConfigured(cfgs) || hasServicesConfigured(status)) {
return nil, nil return nil, nil
} }
@@ -214,7 +214,7 @@ func (ep *egressProxy) syncEgressConfigs(cfgs egressservices.Configs, status *eg
rulesPerSvcToAdd := make(map[string][]rule, 0) rulesPerSvcToAdd := make(map[string][]rule, 0)
rulesPerSvcToDelete := make(map[string][]rule, 0) rulesPerSvcToDelete := make(map[string][]rule, 0)
for svcName, cfg := range cfgs { for svcName, cfg := range cfgs {
tailnetTargetIPs, err := ep.tailnetTargetIPsForSvc(cfg, n) tailnetTargetIPs, err := ep.tailnetTargetIPsForSvc(cfg, nm)
if err != nil { if err != nil {
return nil, fmt.Errorf("error determining tailnet target IPs: %w", err) return nil, fmt.Errorf("error determining tailnet target IPs: %w", err)
} }
@@ -229,12 +229,12 @@ func (ep *egressProxy) syncEgressConfigs(cfgs egressservices.Configs, status *eg
if len(rulesToDelete) != 0 { if len(rulesToDelete) != 0 {
mak.Set(&rulesPerSvcToDelete, svcName, rulesToDelete) mak.Set(&rulesPerSvcToDelete, svcName, rulesToDelete)
} }
if len(rulesToAdd) != 0 || ep.addrsHaveChanged(n) { if len(rulesToAdd) != 0 || ep.addrsHaveChanged(nm) {
// For each tailnet target, set up SNAT from the local tailnet device address of the matching // For each tailnet target, set up SNAT from the local tailnet device address of the matching
// family. // family.
for _, t := range tailnetTargetIPs { for _, t := range tailnetTargetIPs {
var local netip.Addr var local netip.Addr
for _, pfx := range n.NetMap.SelfNode.Addresses().All() { for _, pfx := range nm.SelfNode.Addresses().All() {
if !pfx.IsSingleIP() { if !pfx.IsSingleIP() {
continue continue
} }
@@ -424,7 +424,7 @@ func (ep *egressProxy) getStatus(ctx context.Context) (*egressservices.Status, e
// setStatus writes egress proxy's currently configured firewall to the state // setStatus writes egress proxy's currently configured firewall to the state
// Secret and updates proxy's tailnet addresses. // Secret and updates proxy's tailnet addresses.
func (ep *egressProxy) setStatus(ctx context.Context, status *egressservices.Status, n ipn.Notify) error { func (ep *egressProxy) setStatus(ctx context.Context, status *egressservices.Status, nm *netmap.NetworkMap) error {
// Pod IP is used to determine if a stored status applies to THIS proxy Pod. // Pod IP is used to determine if a stored status applies to THIS proxy Pod.
if status == nil { if status == nil {
status = &egressservices.Status{} status = &egressservices.Status{}
@@ -447,7 +447,7 @@ func (ep *egressProxy) setStatus(ctx context.Context, status *egressservices.Sta
if err := ep.kc.JSONPatchResource(ctx, ep.stateSecret, kubeclient.TypeSecrets, []kubeclient.JSONPatch{patch}); err != nil { if err := ep.kc.JSONPatchResource(ctx, ep.stateSecret, kubeclient.TypeSecrets, []kubeclient.JSONPatch{patch}); err != nil {
return fmt.Errorf("error patching state Secret: %w", err) return fmt.Errorf("error patching state Secret: %w", err)
} }
ep.tailnetAddrs = n.NetMap.SelfNode.Addresses().AsSlice() ep.tailnetAddrs = nm.SelfNode.Addresses().AsSlice()
return nil return nil
} }
@@ -457,7 +457,7 @@ func (ep *egressProxy) setStatus(ctx context.Context, status *egressservices.Sta
// FQDN, resolve the FQDN and return the resolved IPs. It checks if the // FQDN, resolve the FQDN and return the resolved IPs. It checks if the
// netfilter runner supports IPv6 NAT and skips any IPv6 addresses if it // netfilter runner supports IPv6 NAT and skips any IPv6 addresses if it
// doesn't. // doesn't.
func (ep *egressProxy) tailnetTargetIPsForSvc(svc egressservices.Config, n ipn.Notify) (addrs []netip.Addr, err error) { func (ep *egressProxy) tailnetTargetIPsForSvc(svc egressservices.Config, nm *netmap.NetworkMap) (addrs []netip.Addr, err error) {
if svc.TailnetTarget.IP != "" { if svc.TailnetTarget.IP != "" {
addr, err := netip.ParseAddr(svc.TailnetTarget.IP) addr, err := netip.ParseAddr(svc.TailnetTarget.IP)
if err != nil { if err != nil {
@@ -473,11 +473,11 @@ func (ep *egressProxy) tailnetTargetIPsForSvc(svc egressservices.Config, n ipn.N
if svc.TailnetTarget.FQDN == "" { if svc.TailnetTarget.FQDN == "" {
return nil, errors.New("unexpected egress service config- neither tailnet target IP nor FQDN is set") return nil, errors.New("unexpected egress service config- neither tailnet target IP nor FQDN is set")
} }
if n.NetMap == nil { if nm == nil {
log.Printf("netmap is not available, unable to determine backend addresses for %s", svc.TailnetTarget.FQDN) log.Printf("netmap is not available, unable to determine backend addresses for %s", svc.TailnetTarget.FQDN)
return addrs, nil return addrs, nil
} }
egressAddrs, err := resolveTailnetFQDN(n.NetMap, svc.TailnetTarget.FQDN) egressAddrs, err := resolveTailnetFQDN(nm, svc.TailnetTarget.FQDN)
if err != nil { if err != nil {
log.Printf("error fetching backend addresses for %q: %v", svc.TailnetTarget.FQDN, err) log.Printf("error fetching backend addresses for %q: %v", svc.TailnetTarget.FQDN, err)
return addrs, nil return addrs, nil
@@ -503,22 +503,22 @@ func (ep *egressProxy) tailnetTargetIPsForSvc(svc egressservices.Config, n ipn.N
// shouldResync parses netmap update and returns true if the update contains // shouldResync parses netmap update and returns true if the update contains
// changes for which the egress proxy's firewall should be reconfigured. // changes for which the egress proxy's firewall should be reconfigured.
func (ep *egressProxy) shouldResync(n ipn.Notify) bool { func (ep *egressProxy) shouldResync(nm *netmap.NetworkMap) bool {
if n.NetMap == nil { if nm == nil {
return false return false
} }
// If proxy's tailnet addresses have changed, resync. // If proxy's tailnet addresses have changed, resync.
if !reflect.DeepEqual(n.NetMap.SelfNode.Addresses().AsSlice(), ep.tailnetAddrs) { if !reflect.DeepEqual(nm.SelfNode.Addresses().AsSlice(), ep.tailnetAddrs) {
log.Printf("node addresses have changed, trigger egress config resync") log.Printf("node addresses have changed, trigger egress config resync")
ep.tailnetAddrs = n.NetMap.SelfNode.Addresses().AsSlice() ep.tailnetAddrs = nm.SelfNode.Addresses().AsSlice()
return true return true
} }
// If the IPs for any of the egress services configured via FQDN have // If the IPs for any of the egress services configured via FQDN have
// changed, resync. // changed, resync.
for fqdn, ips := range ep.targetFQDNs { for fqdn, ips := range ep.targetFQDNs {
for _, nn := range n.NetMap.Peers { for _, nn := range nm.Peers {
if equalFQDNs(nn.Name(), fqdn) { if equalFQDNs(nn.Name(), fqdn) {
if !reflect.DeepEqual(ips, nn.Addresses().AsSlice()) { if !reflect.DeepEqual(ips, nn.Addresses().AsSlice()) {
log.Printf("backend addresses for egress target %q have changed old IPs %v, new IPs %v trigger egress config resync", nn.Name(), ips, nn.Addresses().AsSlice()) log.Printf("backend addresses for egress target %q have changed old IPs %v, new IPs %v trigger egress config resync", nn.Name(), ips, nn.Addresses().AsSlice())
+261 -230
View File
@@ -137,6 +137,7 @@ import (
"golang.org/x/sys/unix" "golang.org/x/sys/unix"
"tailscale.com/client/local"
"tailscale.com/health" "tailscale.com/health"
"tailscale.com/ipn" "tailscale.com/ipn"
kubeutils "tailscale.com/k8s-operator" kubeutils "tailscale.com/k8s-operator"
@@ -536,7 +537,7 @@ authLoop:
failedResolveAttempts++ failedResolveAttempts++
} }
var egressSvcsNotify chan ipn.Notify var egressSvcsNotify chan *netmap.NetworkMap
notifyChan := make(chan ipn.Notify) notifyChan := make(chan ipn.Notify)
errChan := make(chan error) errChan := make(chan error)
go func() { go func() {
@@ -550,10 +551,17 @@ authLoop:
} }
} }
}() }()
// Peer set changes (Add/Remove) no longer ride on the IPN bus; poll
// periodically so egress FQDN resolution and peer-aware work picks
// them up. SelfChange covers prompt self changes.
const peerPollInterval = 15 * time.Second
peerPoll := time.NewTicker(peerPollInterval)
defer peerPoll.Stop()
var wg sync.WaitGroup var wg sync.WaitGroup
runLoop: runLoop:
for { for {
var processNetmap bool
select { select {
case <-ctx.Done(): case <-ctx.Done():
// Although killTailscaled() is deferred earlier, if we // Although killTailscaled() is deferred earlier, if we
@@ -566,6 +574,8 @@ runLoop:
return fmt.Errorf("failed to read from tailscaled: %w", err) return fmt.Errorf("failed to read from tailscaled: %w", err)
case err := <-cfgWatchErrChan: case err := <-cfgWatchErrChan:
return fmt.Errorf("failed to watch tailscaled config: %w", err) return fmt.Errorf("failed to watch tailscaled config: %w", err)
case <-peerPoll.C:
processNetmap = true
case n := <-notifyChan: case n := <-notifyChan:
// TODO: (ChaosInTheCRD) Add node removed check when supported by ipn // TODO: (ChaosInTheCRD) Add node removed check when supported by ipn
if n.State != nil && *n.State != ipn.Running { if n.State != nil && *n.State != ipn.Running {
@@ -576,235 +586,8 @@ runLoop:
// whereupon we'll go through initial auth again. // whereupon we'll go through initial auth again.
return fmt.Errorf("tailscaled left running state (now in state %q), exiting", *n.State) return fmt.Errorf("tailscaled left running state (now in state %q), exiting", *n.State)
} }
if n.NetMap != nil { if n.SelfChange != nil {
addrs = n.NetMap.SelfNode.Addresses().AsSlice() processNetmap = true
newCurrentIPs := deephash.Hash(&addrs)
ipsHaveChanged := newCurrentIPs != currentIPs
// Store device ID in a Kubernetes Secret before
// setting up any routing rules. This ensures
// that, for containerboot instances that are
// Kubernetes operator proxies, the operator is
// able to retrieve the device ID from the
// Kubernetes Secret to clean up tailnet nodes
// for proxies whose route setup continuously
// fails.
deviceID := n.NetMap.SelfNode.StableID()
if hasKubeStateStore(cfg) && deephash.Update(&currentDeviceID, &deviceID) {
if err := kc.storeDeviceID(ctx, n.NetMap.SelfNode.StableID()); err != nil {
return fmt.Errorf("storing device ID in Kubernetes Secret: %w", err)
}
}
if cfg.TailnetTargetFQDN != "" {
egressAddrs, err := resolveTailnetFQDN(n.NetMap, cfg.TailnetTargetFQDN)
if err != nil {
log.Print(err.Error())
break
}
newCurentEgressIPs := deephash.Hash(&egressAddrs)
egressIPsHaveChanged := newCurentEgressIPs != currentEgressIPs
// The firewall rules get (re-)installed:
// - on startup
// - when the tailnet IPs of the tailnet target have changed
// - when the tailnet IPs of this node have changed
if (egressIPsHaveChanged || ipsHaveChanged) && len(egressAddrs) != 0 {
var rulesInstalled bool
for _, egressAddr := range egressAddrs {
ea := egressAddr.Addr()
if ea.Is4() || (ea.Is6() && nfr.HasIPV6NAT()) {
rulesInstalled = true
log.Printf("Installing forwarding rules for destination %v", ea.String())
if err := installEgressForwardingRule(ctx, ea.String(), addrs, nfr); err != nil {
return fmt.Errorf("installing egress proxy rules for destination %s: %v", ea.String(), err)
}
}
}
if !rulesInstalled {
return fmt.Errorf("no forwarding rules for egress addresses %v, host supports IPv6: %v", egressAddrs, nfr.HasIPV6NAT())
}
}
currentEgressIPs = newCurentEgressIPs
}
if cfg.ProxyTargetIP != "" && len(addrs) != 0 && ipsHaveChanged {
log.Printf("Installing proxy rules")
if err := installIngressForwardingRule(ctx, cfg.ProxyTargetIP, addrs, nfr); err != nil {
return fmt.Errorf("installing ingress proxy rules: %w", err)
}
}
if cfg.ProxyTargetDNSName != "" && len(addrs) != 0 && ipsHaveChanged {
newBackendAddrs, err := resolveDNS(ctx, cfg.ProxyTargetDNSName)
if err != nil {
log.Printf("[unexpected] error resolving DNS name %s: %v", cfg.ProxyTargetDNSName, err)
resetTimer(true)
continue
}
backendsHaveChanged := !(slices.EqualFunc(backendAddrs, newBackendAddrs, func(ip1 net.IP, ip2 net.IP) bool {
return slices.ContainsFunc(newBackendAddrs, func(ip net.IP) bool { return ip.Equal(ip1) })
}))
if backendsHaveChanged {
log.Printf("installing ingress proxy rules for backends %v", newBackendAddrs)
if err := installIngressForwardingRuleForDNSTarget(ctx, newBackendAddrs, addrs, nfr); err != nil {
return fmt.Errorf("error installing ingress proxy rules: %w", err)
}
}
resetTimer(false)
backendAddrs = newBackendAddrs
}
if cfg.ServeConfigPath != "" {
cd := certDomainFromNetmap(n.NetMap)
if cd == "" {
cd = kubetypes.ValueNoHTTPS
}
prev := certDomain.Swap(new(cd))
if prev == nil || *prev != cd {
select {
case certDomainChanged <- true:
default:
}
}
}
if cfg.TailnetTargetIP != "" && ipsHaveChanged && len(addrs) != 0 {
log.Printf("Installing forwarding rules for destination %v", cfg.TailnetTargetIP)
if err := installEgressForwardingRule(ctx, cfg.TailnetTargetIP, addrs, nfr); err != nil {
return fmt.Errorf("installing egress proxy rules: %w", err)
}
}
// If this is a L7 cluster ingress proxy (set up
// by Kubernetes operator) and proxying of
// cluster traffic to the ingress target is
// enabled, set up proxy rule each time the
// tailnet IPs of this node change (including
// the first time they become available).
if cfg.AllowProxyingClusterTrafficViaIngress && cfg.ServeConfigPath != "" && ipsHaveChanged && len(addrs) != 0 {
log.Printf("installing rules to forward traffic for %s to node's tailnet IP", cfg.PodIP)
if err := installTSForwardingRuleForDestination(ctx, cfg.PodIP, addrs, nfr); err != nil {
return fmt.Errorf("installing rules to forward traffic to node's tailnet IP: %w", err)
}
}
currentIPs = newCurrentIPs
// Only store device FQDN and IP addresses to
// Kubernetes Secret when any required proxy
// route setup has succeeded. IPs and FQDN are
// read from the Secret by the Tailscale
// Kubernetes operator and, for some proxy
// types, such as Tailscale Ingress, advertized
// on the Ingress status. Writing them to the
// Secret only after the proxy routing has been
// set up ensures that the operator does not
// advertize endpoints of broken proxies.
// TODO (irbekrm): instead of using the IP and FQDN, have some other mechanism for the proxy signal that it is 'Ready'.
deviceEndpoints := []any{n.NetMap.SelfNode.Name(), n.NetMap.SelfNode.Addresses()}
if hasKubeStateStore(cfg) && deephash.Update(&currentDeviceEndpoints, &deviceEndpoints) {
if err := kc.storeDeviceEndpoints(ctx, n.NetMap.SelfNode.Name(), n.NetMap.SelfNode.Addresses().AsSlice()); err != nil {
return fmt.Errorf("storing device IPs and FQDN in Kubernetes Secret: %w", err)
}
}
if healthCheck != nil {
healthCheck.Update(len(addrs) != 0)
}
var prevServeConfig *ipn.ServeConfig
if getAutoAdvertiseBool() {
prevServeConfig, err = client.GetServeConfig(ctx)
if err != nil {
return fmt.Errorf("autoadvertisement: failed to get serve config: %w", err)
}
err = refreshAdvertiseServices(ctx, prevServeConfig, klc.New(client))
if err != nil {
return fmt.Errorf("autoadvertisement: failed to refresh advertise services: %w", err)
}
}
if cfg.ServeConfigPath != "" {
triggerWatchServeConfigChanges.Do(func() {
go watchServeConfigChanges(ctx, certDomainChanged, certDomain, client, kc, cfg, prevServeConfig)
})
}
if egressSvcsNotify != nil {
egressSvcsNotify <- n
}
}
if !startupTasksDone {
// For containerboot instances that act as TCP proxies (proxying traffic to an endpoint
// passed via one of the env vars that containerboot reads) and store state in a
// Kubernetes Secret, we consider startup tasks done at the point when device info has
// been successfully stored to state Secret. For all other containerboot instances, if
// we just get to this point the startup tasks can be considered done.
if !isL3Proxy(cfg) || !hasKubeStateStore(cfg) || (currentDeviceEndpoints != deephash.Sum{} && currentDeviceID != deephash.Sum{}) {
// This log message is used in tests to detect when all
// post-auth configuration is done.
log.Println("Startup complete, waiting for shutdown signal")
startupTasksDone = true
// Configure egress proxy. Egress proxy will set up firewall rules to proxy
// traffic to tailnet targets configured in the provided configuration file. It
// will then continuously monitor the config file and netmap updates and
// reconfigure the firewall rules as needed. If any of its operations fail, it
// will crash this node.
if cfg.EgressProxiesCfgPath != "" {
log.Printf("configuring egress proxy using configuration file at %s", cfg.EgressProxiesCfgPath)
egressSvcsNotify = make(chan ipn.Notify)
opts := egressProxyRunOpts{
cfgPath: cfg.EgressProxiesCfgPath,
nfr: nfr,
kc: kc,
tsClient: client,
stateSecret: cfg.KubeSecret,
netmapChan: egressSvcsNotify,
podIPv4: cfg.PodIPv4,
tailnetAddrs: addrs,
}
go func() {
if err := ep.run(ctx, n, opts); err != nil {
egressSvcsErrorChan <- err
}
}()
}
ip := ingressProxy{}
if cfg.IngressProxiesCfgPath != "" {
log.Printf("configuring ingress proxy using configuration file at %s", cfg.IngressProxiesCfgPath)
opts := ingressProxyOpts{
cfgPath: cfg.IngressProxiesCfgPath,
nfr: nfr,
kc: kc,
stateSecret: cfg.KubeSecret,
podIPv4: cfg.PodIPv4,
podIPv6: cfg.PodIPv6,
}
go func() {
if err := ip.run(ctx, opts); err != nil {
ingressSvcsErrorChan <- err
}
}()
}
// Wait on tailscaled process. It won't be cleaned up by default when the
// container exits as it is not PID1. TODO (irbekrm): perhaps we can replace the
// reaper by a running cmd.Wait in a goroutine immediately after starting
// tailscaled?
reaper := func() {
defer wg.Done()
for {
var status unix.WaitStatus
_, err := unix.Wait4(daemonProcess.Pid, &status, 0, nil)
if errors.Is(err, unix.EINTR) {
continue
}
if err != nil {
log.Fatalf("Waiting for tailscaled to exit: %v", err)
}
log.Print("tailscaled exited")
os.Exit(0)
}
}
wg.Add(1)
go reaper()
}
} }
case <-tc: case <-tc:
newBackendAddrs, err := resolveDNS(ctx, cfg.ProxyTargetDNSName) newBackendAddrs, err := resolveDNS(ctx, cfg.ProxyTargetDNSName)
@@ -824,11 +607,250 @@ runLoop:
} }
backendAddrs = newBackendAddrs backendAddrs = newBackendAddrs
resetTimer(false) resetTimer(false)
continue
case e := <-egressSvcsErrorChan: case e := <-egressSvcsErrorChan:
return fmt.Errorf("egress proxy failed: %v", e) return fmt.Errorf("egress proxy failed: %v", e)
case e := <-ingressSvcsErrorChan: case e := <-ingressSvcsErrorChan:
return fmt.Errorf("ingress proxy failed: %v", e) return fmt.Errorf("ingress proxy failed: %v", e)
} }
if !processNetmap {
continue
}
nm, err := fetchNetMap(ctx, client)
if err != nil {
log.Printf("error fetching netmap: %v", err)
continue
}
if nm != nil {
addrs = nm.SelfNode.Addresses().AsSlice()
newCurrentIPs := deephash.Hash(&addrs)
ipsHaveChanged := newCurrentIPs != currentIPs
// Store device ID in a Kubernetes Secret before
// setting up any routing rules. This ensures
// that, for containerboot instances that are
// Kubernetes operator proxies, the operator is
// able to retrieve the device ID from the
// Kubernetes Secret to clean up tailnet nodes
// for proxies whose route setup continuously
// fails.
deviceID := nm.SelfNode.StableID()
if hasKubeStateStore(cfg) && deephash.Update(&currentDeviceID, &deviceID) {
if err := kc.storeDeviceID(ctx, nm.SelfNode.StableID()); err != nil {
return fmt.Errorf("storing device ID in Kubernetes Secret: %w", err)
}
}
if cfg.TailnetTargetFQDN != "" {
egressAddrs, err := resolveTailnetFQDN(nm, cfg.TailnetTargetFQDN)
if err != nil {
log.Print(err.Error())
break
}
newCurentEgressIPs := deephash.Hash(&egressAddrs)
egressIPsHaveChanged := newCurentEgressIPs != currentEgressIPs
// The firewall rules get (re-)installed:
// - on startup
// - when the tailnet IPs of the tailnet target have changed
// - when the tailnet IPs of this node have changed
if (egressIPsHaveChanged || ipsHaveChanged) && len(egressAddrs) != 0 {
var rulesInstalled bool
for _, egressAddr := range egressAddrs {
ea := egressAddr.Addr()
if ea.Is4() || (ea.Is6() && nfr.HasIPV6NAT()) {
rulesInstalled = true
log.Printf("Installing forwarding rules for destination %v", ea.String())
if err := installEgressForwardingRule(ctx, ea.String(), addrs, nfr); err != nil {
return fmt.Errorf("installing egress proxy rules for destination %s: %v", ea.String(), err)
}
}
}
if !rulesInstalled {
return fmt.Errorf("no forwarding rules for egress addresses %v, host supports IPv6: %v", egressAddrs, nfr.HasIPV6NAT())
}
}
currentEgressIPs = newCurentEgressIPs
}
if cfg.ProxyTargetIP != "" && len(addrs) != 0 && ipsHaveChanged {
log.Printf("Installing proxy rules")
if err := installIngressForwardingRule(ctx, cfg.ProxyTargetIP, addrs, nfr); err != nil {
return fmt.Errorf("installing ingress proxy rules: %w", err)
}
}
if cfg.ProxyTargetDNSName != "" && len(addrs) != 0 && ipsHaveChanged {
newBackendAddrs, err := resolveDNS(ctx, cfg.ProxyTargetDNSName)
if err != nil {
log.Printf("[unexpected] error resolving DNS name %s: %v", cfg.ProxyTargetDNSName, err)
resetTimer(true)
continue
}
backendsHaveChanged := !(slices.EqualFunc(backendAddrs, newBackendAddrs, func(ip1 net.IP, ip2 net.IP) bool {
return slices.ContainsFunc(newBackendAddrs, func(ip net.IP) bool { return ip.Equal(ip1) })
}))
if backendsHaveChanged {
log.Printf("installing ingress proxy rules for backends %v", newBackendAddrs)
if err := installIngressForwardingRuleForDNSTarget(ctx, newBackendAddrs, addrs, nfr); err != nil {
return fmt.Errorf("error installing ingress proxy rules: %w", err)
}
}
resetTimer(false)
backendAddrs = newBackendAddrs
}
if cfg.ServeConfigPath != "" {
cd := certDomainFromNetmap(nm)
if cd == "" {
cd = kubetypes.ValueNoHTTPS
}
prev := certDomain.Swap(new(cd))
if prev == nil || *prev != cd {
select {
case certDomainChanged <- true:
default:
}
}
}
if cfg.TailnetTargetIP != "" && ipsHaveChanged && len(addrs) != 0 {
log.Printf("Installing forwarding rules for destination %v", cfg.TailnetTargetIP)
if err := installEgressForwardingRule(ctx, cfg.TailnetTargetIP, addrs, nfr); err != nil {
return fmt.Errorf("installing egress proxy rules: %w", err)
}
}
// If this is a L7 cluster ingress proxy (set up
// by Kubernetes operator) and proxying of
// cluster traffic to the ingress target is
// enabled, set up proxy rule each time the
// tailnet IPs of this node change (including
// the first time they become available).
if cfg.AllowProxyingClusterTrafficViaIngress && cfg.ServeConfigPath != "" && ipsHaveChanged && len(addrs) != 0 {
log.Printf("installing rules to forward traffic for %s to node's tailnet IP", cfg.PodIP)
if err := installTSForwardingRuleForDestination(ctx, cfg.PodIP, addrs, nfr); err != nil {
return fmt.Errorf("installing rules to forward traffic to node's tailnet IP: %w", err)
}
}
currentIPs = newCurrentIPs
// Only store device FQDN and IP addresses to
// Kubernetes Secret when any required proxy
// route setup has succeeded. IPs and FQDN are
// read from the Secret by the Tailscale
// Kubernetes operator and, for some proxy
// types, such as Tailscale Ingress, advertized
// on the Ingress status. Writing them to the
// Secret only after the proxy routing has been
// set up ensures that the operator does not
// advertize endpoints of broken proxies.
// TODO (irbekrm): instead of using the IP and FQDN, have some other mechanism for the proxy signal that it is 'Ready'.
deviceEndpoints := []any{nm.SelfNode.Name(), nm.SelfNode.Addresses()}
if hasKubeStateStore(cfg) && deephash.Update(&currentDeviceEndpoints, &deviceEndpoints) {
if err := kc.storeDeviceEndpoints(ctx, nm.SelfNode.Name(), nm.SelfNode.Addresses().AsSlice()); err != nil {
return fmt.Errorf("storing device IPs and FQDN in Kubernetes Secret: %w", err)
}
}
if healthCheck != nil {
healthCheck.Update(len(addrs) != 0)
}
var prevServeConfig *ipn.ServeConfig
if getAutoAdvertiseBool() {
prevServeConfig, err = client.GetServeConfig(ctx)
if err != nil {
return fmt.Errorf("autoadvertisement: failed to get serve config: %w", err)
}
err = refreshAdvertiseServices(ctx, prevServeConfig, klc.New(client))
if err != nil {
return fmt.Errorf("autoadvertisement: failed to refresh advertise services: %w", err)
}
}
if cfg.ServeConfigPath != "" {
triggerWatchServeConfigChanges.Do(func() {
go watchServeConfigChanges(ctx, certDomainChanged, certDomain, client, kc, cfg, prevServeConfig)
})
}
if egressSvcsNotify != nil {
egressSvcsNotify <- nm
}
}
if !startupTasksDone {
// For containerboot instances that act as TCP proxies (proxying traffic to an endpoint
// passed via one of the env vars that containerboot reads) and store state in a
// Kubernetes Secret, we consider startup tasks done at the point when device info has
// been successfully stored to state Secret. For all other containerboot instances, if
// we just get to this point the startup tasks can be considered done.
if !isL3Proxy(cfg) || !hasKubeStateStore(cfg) || (currentDeviceEndpoints != deephash.Sum{} && currentDeviceID != deephash.Sum{}) {
// This log message is used in tests to detect when all
// post-auth configuration is done.
log.Println("Startup complete, waiting for shutdown signal")
startupTasksDone = true
// Configure egress proxy. Egress proxy will set up firewall rules to proxy
// traffic to tailnet targets configured in the provided configuration file. It
// will then continuously monitor the config file and netmap updates and
// reconfigure the firewall rules as needed. If any of its operations fail, it
// will crash this node.
if cfg.EgressProxiesCfgPath != "" {
log.Printf("configuring egress proxy using configuration file at %s", cfg.EgressProxiesCfgPath)
egressSvcsNotify = make(chan *netmap.NetworkMap)
opts := egressProxyRunOpts{
cfgPath: cfg.EgressProxiesCfgPath,
nfr: nfr,
kc: kc,
tsClient: client,
stateSecret: cfg.KubeSecret,
netmapChan: egressSvcsNotify,
podIPv4: cfg.PodIPv4,
tailnetAddrs: addrs,
}
go func() {
if err := ep.run(ctx, nm, opts); err != nil {
egressSvcsErrorChan <- err
}
}()
}
ip := ingressProxy{}
if cfg.IngressProxiesCfgPath != "" {
log.Printf("configuring ingress proxy using configuration file at %s", cfg.IngressProxiesCfgPath)
opts := ingressProxyOpts{
cfgPath: cfg.IngressProxiesCfgPath,
nfr: nfr,
kc: kc,
stateSecret: cfg.KubeSecret,
podIPv4: cfg.PodIPv4,
podIPv6: cfg.PodIPv6,
}
go func() {
if err := ip.run(ctx, opts); err != nil {
ingressSvcsErrorChan <- err
}
}()
}
// Wait on tailscaled process. It won't be cleaned up by default when the
// container exits as it is not PID1. TODO (irbekrm): perhaps we can replace the
// reaper by a running cmd.Wait in a goroutine immediately after starting
// tailscaled?
reaper := func() {
defer wg.Done()
for {
var status unix.WaitStatus
_, err := unix.Wait4(daemonProcess.Pid, &status, 0, nil)
if errors.Is(err, unix.EINTR) {
continue
}
if err != nil {
log.Fatalf("Waiting for tailscaled to exit: %v", err)
}
log.Print("tailscaled exited")
os.Exit(0)
}
}
wg.Add(1)
go reaper()
}
}
} }
wg.Wait() wg.Wait()
@@ -963,6 +985,15 @@ func runHTTPServer(mux *http.ServeMux, addr string) (close func() error) {
} }
} }
// fetchNetMap fetches the current netmap from tailscaled via the
// "current-netmap" localapi debug action. The debug action's payload
// shape is intentionally not part of any stable API; containerboot
// reads its own internal-package types out of it. New external consumers
// should not rely on this — see [local.Client.Status] and friends.
func fetchNetMap(ctx context.Context, lc *local.Client) (*netmap.NetworkMap, error) {
return local.GetDebugResultJSON[*netmap.NetworkMap](ctx, lc, "current-netmap")
}
// resolveTailnetFQDN resolves a tailnet FQDN to a list of IP prefixes, which // resolveTailnetFQDN resolves a tailnet FQDN to a list of IP prefixes, which
// can be either a peer device or a Tailscale Service. // can be either a peer device or a Tailscale Service.
func resolveTailnetFQDN(nm *netmap.NetworkMap, fqdn string) ([]netip.Prefix, error) { func resolveTailnetFQDN(nm *netmap.NetworkMap, fqdn string) ([]netip.Prefix, error) {
+104 -36
View File
@@ -71,6 +71,12 @@ func TestContainerBoot(t *testing.T) {
// Waits below to be true before proceeding to the next phase. // Waits below to be true before proceeding to the next phase.
Notify *ipn.Notify Notify *ipn.Notify
// If non-nil, install this NetMap on the fake LocalAPI before
// sending Notify. This is the replacement for the old
// Notify.NetMap field; reactive consumers fetch the current
// netmap via /localapi/v0/netmap on their own.
NetMap *netmap.NetworkMap
// WantCmds is the commands that containerboot should run in this phase. // WantCmds is the commands that containerboot should run in this phase.
WantCmds []string WantCmds []string
@@ -105,12 +111,10 @@ func TestContainerBoot(t *testing.T) {
} }
runningNotify := &ipn.Notify{ runningNotify := &ipn.Notify{
State: new(ipn.Running), State: new(ipn.Running),
NetMap: &netmap.NetworkMap{ SelfChange: &tailcfg.Node{
SelfNode: (&tailcfg.Node{ StableID: tailcfg.StableNodeID("myID"),
StableID: tailcfg.StableNodeID("myID"), Name: "test-node.test.ts.net.",
Name: "test-node.test.ts.net.", Addresses: []netip.Prefix{netip.MustParsePrefix("100.64.0.1/32")},
Addresses: []netip.Prefix{netip.MustParsePrefix("100.64.0.1/32")},
}).View(),
}, },
} }
type testCase struct { type testCase struct {
@@ -383,19 +387,24 @@ func TestContainerBoot(t *testing.T) {
{ {
Notify: &ipn.Notify{ Notify: &ipn.Notify{
State: new(ipn.Running), State: new(ipn.Running),
NetMap: &netmap.NetworkMap{ SelfChange: &tailcfg.Node{
SelfNode: (&tailcfg.Node{ StableID: tailcfg.StableNodeID("myID"),
StableID: tailcfg.StableNodeID("myID"), Name: "test-node.test.ts.net.",
Name: "test-node.test.ts.net.", Addresses: []netip.Prefix{netip.MustParsePrefix("100.64.0.1/32")},
Addresses: []netip.Prefix{netip.MustParsePrefix("100.64.0.1/32")}, },
},
NetMap: &netmap.NetworkMap{
SelfNode: (&tailcfg.Node{
StableID: tailcfg.StableNodeID("myID"),
Name: "test-node.test.ts.net.",
Addresses: []netip.Prefix{netip.MustParsePrefix("100.64.0.1/32")},
}).View(),
Peers: []tailcfg.NodeView{
(&tailcfg.Node{
StableID: tailcfg.StableNodeID("ipv6ID"),
Name: "ipv6-node.test.ts.net.",
Addresses: []netip.Prefix{netip.MustParsePrefix("::1/128")},
}).View(), }).View(),
Peers: []tailcfg.NodeView{
(&tailcfg.Node{
StableID: tailcfg.StableNodeID("ipv6ID"),
Name: "ipv6-node.test.ts.net.",
Addresses: []netip.Prefix{netip.MustParsePrefix("::1/128")},
}).View(),
},
}, },
}, },
WantLog: "no forwarding rules for egress addresses [::1/128], host supports IPv6: false", WantLog: "no forwarding rules for egress addresses [::1/128], host supports IPv6: false",
@@ -631,14 +640,19 @@ func TestContainerBoot(t *testing.T) {
{ {
Notify: &ipn.Notify{ Notify: &ipn.Notify{
State: new(ipn.Running), State: new(ipn.Running),
NetMap: &netmap.NetworkMap{ SelfChange: &tailcfg.Node{
SelfNode: (&tailcfg.Node{ StableID: tailcfg.StableNodeID("newID"),
StableID: tailcfg.StableNodeID("newID"), Name: "new-name.test.ts.net.",
Name: "new-name.test.ts.net.", Addresses: []netip.Prefix{netip.MustParsePrefix("100.64.0.1/32")},
Addresses: []netip.Prefix{netip.MustParsePrefix("100.64.0.1/32")},
}).View(),
}, },
}, },
NetMap: &netmap.NetworkMap{
SelfNode: (&tailcfg.Node{
StableID: tailcfg.StableNodeID("newID"),
Name: "new-name.test.ts.net.",
Addresses: []netip.Prefix{netip.MustParsePrefix("100.64.0.1/32")},
}).View(),
},
WantKubeSecret: map[string]string{ WantKubeSecret: map[string]string{
"authkey": "tskey-key", "authkey": "tskey-key",
"device_fqdn": "new-name.test.ts.net.", "device_fqdn": "new-name.test.ts.net.",
@@ -1095,19 +1109,24 @@ func TestContainerBoot(t *testing.T) {
{ {
Notify: &ipn.Notify{ Notify: &ipn.Notify{
State: new(ipn.Running), State: new(ipn.Running),
NetMap: &netmap.NetworkMap{ SelfChange: &tailcfg.Node{
SelfNode: (&tailcfg.Node{ StableID: tailcfg.StableNodeID("myID"),
StableID: tailcfg.StableNodeID("myID"), Name: "test-node.test.ts.net.",
Name: "test-node.test.ts.net.", Addresses: []netip.Prefix{netip.MustParsePrefix("100.64.0.1/32")},
Addresses: []netip.Prefix{netip.MustParsePrefix("100.64.0.1/32")}, },
},
NetMap: &netmap.NetworkMap{
SelfNode: (&tailcfg.Node{
StableID: tailcfg.StableNodeID("myID"),
Name: "test-node.test.ts.net.",
Addresses: []netip.Prefix{netip.MustParsePrefix("100.64.0.1/32")},
}).View(),
Peers: []tailcfg.NodeView{
(&tailcfg.Node{
StableID: tailcfg.StableNodeID("fooID"),
Name: "foo.tailnetxyz.ts.net.",
Addresses: []netip.Prefix{netip.MustParsePrefix("100.64.0.2/32")},
}).View(), }).View(),
Peers: []tailcfg.NodeView{
(&tailcfg.Node{
StableID: tailcfg.StableNodeID("fooID"),
Name: "foo.tailnetxyz.ts.net.",
Addresses: []netip.Prefix{netip.MustParsePrefix("100.64.0.2/32")},
}).View(),
},
}, },
}, },
WantKubeSecret: map[string]string{ WantKubeSecret: map[string]string{
@@ -1276,6 +1295,18 @@ func TestContainerBoot(t *testing.T) {
t.Fatalf("phase %d: updating mtime for %q: %v", i, path, err) t.Fatalf("phase %d: updating mtime for %q: %v", i, path, err)
} }
} }
nmForFake := p.NetMap
if nmForFake == nil && p.Notify != nil && p.Notify.SelfChange != nil {
// Synthesize a minimal netmap from SelfChange so
// containerboot's NetMap() fetch returns
// something usable when the test only set Notify.
nmForFake = &netmap.NetworkMap{
SelfNode: p.Notify.SelfChange.View(),
}
}
if nmForFake != nil {
env.lapi.SetNetMap(nmForFake)
}
env.lapi.Notify(p.Notify) env.lapi.Notify(p.Notify)
if p.Signal != nil { if p.Signal != nil {
cmd.Process.Signal(*p.Signal) cmd.Process.Signal(*p.Signal)
@@ -1468,6 +1499,7 @@ type localAPI struct {
sync.Mutex sync.Mutex
cond *sync.Cond cond *sync.Cond
notify *ipn.Notify notify *ipn.Notify
netmap *netmap.NetworkMap // served by /localapi/v0/netmap
} }
func (lc *localAPI) Start() error { func (lc *localAPI) Start() error {
@@ -1504,8 +1536,44 @@ func (lc *localAPI) Notify(n *ipn.Notify) {
lc.cond.Broadcast() lc.cond.Broadcast()
} }
// SetNetMap installs the netmap that the fake /localapi/v0/netmap endpoint
// will return.
func (lc *localAPI) SetNetMap(nm *netmap.NetworkMap) {
lc.Lock()
defer lc.Unlock()
lc.netmap = nm
}
func (lc *localAPI) ServeHTTP(w http.ResponseWriter, r *http.Request) { func (lc *localAPI) ServeHTTP(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path { switch r.URL.Path {
case "/localapi/v0/netmap":
w.Header().Set("Content-Type", "application/json")
lc.Lock()
nm := lc.netmap
lc.Unlock()
if nm == nil {
http.Error(w, "no netmap", http.StatusServiceUnavailable)
return
}
json.NewEncoder(w).Encode(nm)
return
case "/localapi/v0/debug":
// containerboot fetches the netmap via the "current-netmap"
// debug action; serve it like /localapi/v0/netmap above.
if r.URL.Query().Get("action") != "current-netmap" {
http.Error(w, "unsupported debug action", http.StatusNotFound)
return
}
w.Header().Set("Content-Type", "application/json")
lc.Lock()
nm := lc.netmap
lc.Unlock()
if nm == nil {
http.Error(w, "no netmap", http.StatusServiceUnavailable)
return
}
json.NewEncoder(w).Encode(nm)
return
case "/localapi/v0/serve-config": case "/localapi/v0/serve-config":
switch r.Method { switch r.Method {
case "GET": case "GET":
+26 -24
View File
@@ -138,9 +138,9 @@ func run(ctx context.Context, ts *tsnet.Server, wgPort int, hostname string, pro
} }
// Finally, start mainloop to configure app connector based on information // Finally, start mainloop to configure app connector based on information
// in the netmap. // in the self node's CapMap. We set NotifyInitialNetMap so the first
// We set the NotifyInitialNetMap flag so we will always get woken with the // Notify carries the current self node (now via Notify.SelfChange);
// current netmap, before only being woken on changes. // subsequent self changes wake us up too.
bus, err := lc.WatchIPNBus(ctx, ipn.NotifyWatchEngineUpdates|ipn.NotifyInitialNetMap) bus, err := lc.WatchIPNBus(ctx, ipn.NotifyWatchEngineUpdates|ipn.NotifyInitialNetMap)
if err != nil { if err != nil {
log.Fatalf("watching IPN bus: %v", err) log.Fatalf("watching IPN bus: %v", err)
@@ -155,28 +155,30 @@ func run(ctx context.Context, ts *tsnet.Server, wgPort int, hostname string, pro
log.Fatalf("reading IPN bus: %v", err) log.Fatalf("reading IPN bus: %v", err)
} }
// NetMap contains app-connector configuration self := msg.SelfChange
if nm := msg.NetMap; nm != nil && nm.SelfNode.Valid() { if self == nil {
var c appctype.AppConnectorConfig continue
nmConf, err := tailcfg.UnmarshalNodeCapViewJSON[appctype.AppConnectorConfig](nm.SelfNode.CapMap(), configCapKey)
if err != nil {
log.Printf("failed to read app connector configuration from coordination server: %v", err)
} else if len(nmConf) > 0 {
c = nmConf[0]
}
if c.AdvertiseRoutes {
if err := s.advertiseRoutesFromConfig(ctx, &c); err != nil {
log.Printf("failed to advertise routes: %v", err)
}
}
// Backwards compatibility: combine any configuration from control with flags specified
// on the command line. This is intentionally done after we advertise any routes
// because its never correct to advertise the nodes native IP addresses.
s.mergeConfigFromFlags(&c, ports, forwards)
s.srv.Configure(&c)
} }
var c appctype.AppConnectorConfig
// View() lets us reuse the existing CapView decoder.
nmConf, err := tailcfg.UnmarshalNodeCapViewJSON[appctype.AppConnectorConfig](self.View().CapMap(), configCapKey)
if err != nil {
log.Printf("failed to read app connector configuration from coordination server: %v", err)
} else if len(nmConf) > 0 {
c = nmConf[0]
}
if c.AdvertiseRoutes {
if err := s.advertiseRoutesFromConfig(ctx, &c); err != nil {
log.Printf("failed to advertise routes: %v", err)
}
}
// Backwards compatibility: combine any configuration from control with flags specified
// on the command line. This is intentionally done after we advertise any routes
// because its never correct to advertise the nodes native IP addresses.
s.mergeConfigFromFlags(&c, ports, forwards)
s.srv.Configure(&c)
} }
} }
+3 -3
View File
@@ -287,9 +287,9 @@ func runCp(ctx context.Context, args []string) error {
// caller's progress display stays at 0 — exactly the right degradation, // caller's progress display stays at 0 — exactly the right degradation,
// since the warning timer will then fire on its normal 3-second deadline. // since the warning timer will then fire on its normal 3-second deadline.
func watchOutgoingFiles(ctx context.Context, peer tailcfg.StableNodeID, onUpdate func(name string, sent int64)) { func watchOutgoingFiles(ctx context.Context, peer tailcfg.StableNodeID, onUpdate func(name string, sent int64)) {
// NotifyPeerChanges asks the broadcaster to deliver incremental peer // NotifyPeerChanges opts in to per-peer add/remove notifications so the
// updates as small PeerChanges blobs in place of the full NetMap, which // bus stays responsive without us also subscribing to the full NetMap,
// we don't read anyway. (See ipn/ipnlocal/local.go's notify-elision.) // which we don't read here.
w, err := localClient.WatchIPNBus(ctx, ipn.NotifyInitialOutgoingFiles|ipn.NotifyPeerChanges) w, err := localClient.WatchIPNBus(ctx, ipn.NotifyInitialOutgoingFiles|ipn.NotifyPeerChanges)
if err != nil { if err != nil {
return return
+2 -2
View File
@@ -848,10 +848,10 @@ func (e *serveEnv) enableFeatureInteractive(ctx context.Context, feature string,
e.lc.IncrementCounter(ctx, fmt.Sprintf("%s_enablement_lost_connection", feature), 1) e.lc.IncrementCounter(ctx, fmt.Sprintf("%s_enablement_lost_connection", feature), 1)
return err return err
} }
if nm := n.NetMap; nm != nil && nm.SelfNode.Valid() { if self := n.SelfChange; self != nil {
gotAll := true gotAll := true
for _, c := range caps { for _, c := range caps {
if !nm.SelfNode.HasCap(c) { if _, has := self.CapMap[c]; !has {
// The feature is not yet enabled. // The feature is not yet enabled.
// Continue blocking until it is. // Continue blocking until it is.
gotAll = false gotAll = false
+1 -1
View File
@@ -732,7 +732,7 @@ func runUp(ctx context.Context, cmd string, args []string, upArgs upArgsT) (retE
if s := n.State; s != nil { if s := n.State; s != nil {
ipnIsRunning = *s == ipn.Running ipnIsRunning = *s == ipn.Running
} }
if n.NetMap != nil && n.NetMap.NodeKey != origNodeKey { if n.SelfChange != nil && n.SelfChange.Key != origNodeKey {
waitingForKeyChange = false waitingForKeyChange = false
} }
if ipnIsRunning && !waitingForKeyChange { if ipnIsRunning && !waitingForKeyChange {
+42 -36
View File
@@ -258,44 +258,50 @@ func (i *jsIPN) run(jsCallbacks js.Value) {
if n.State != nil { if n.State != nil {
notifyState(*n.State) notifyState(*n.State)
} }
if nm := n.NetMap; nm != nil { if n.SelfChange != nil {
jsNetMap := jsNetMap{ // Self changed: rebuild the JS-side NetMap snapshot. Peers
Self: jsNetMapSelfNode{ // don't ride on the bus anymore, so fetch them on demand
jsNetMapNode: jsNetMapNode{ // from LocalBackend.
Name: nm.SelfName(), nm := i.lb.NetMapWithPeers()
Addresses: mapSliceView(nm.GetAddresses(), func(a netip.Prefix) string { return a.Addr().String() }), if nm != nil {
NodeKey: nm.NodeKey.String(), jsNetMap := jsNetMap{
MachineKey: nm.MachineKey.String(), Self: jsNetMapSelfNode{
},
MachineStatus: jsMachineStatus[nm.GetMachineStatus()],
},
Peers: mapSlice(nm.Peers, func(p tailcfg.NodeView) jsNetMapPeerNode {
name := p.Name()
if name == "" {
// In practice this should only happen for Hello.
name = p.Hostinfo().Hostname()
}
addrs := make([]string, p.Addresses().Len())
for i, ap := range p.Addresses().All() {
addrs[i] = ap.Addr().String()
}
return jsNetMapPeerNode{
jsNetMapNode: jsNetMapNode{ jsNetMapNode: jsNetMapNode{
Name: name, Name: nm.SelfName(),
Addresses: addrs, Addresses: mapSliceView(nm.GetAddresses(), func(a netip.Prefix) string { return a.Addr().String() }),
MachineKey: p.Machine().String(), NodeKey: nm.NodeKey.String(),
NodeKey: p.Key().String(), MachineKey: nm.MachineKey.String(),
}, },
Online: p.Online().Clone(), MachineStatus: jsMachineStatus[nm.GetMachineStatus()],
TailscaleSSHEnabled: p.Hostinfo().TailscaleSSHEnabled(), },
} Peers: mapSlice(nm.Peers, func(p tailcfg.NodeView) jsNetMapPeerNode {
}), name := p.Name()
LockedOut: nm.TKAEnabled && nm.SelfNode.KeySignature().Len() == 0, if name == "" {
} // In practice this should only happen for Hello.
if jsonNetMap, err := json.Marshal(jsNetMap); err == nil { name = p.Hostinfo().Hostname()
jsCallbacks.Call("notifyNetMap", string(jsonNetMap)) }
} else { addrs := make([]string, p.Addresses().Len())
log.Printf("Could not generate JSON netmap: %v", err) for i, ap := range p.Addresses().All() {
addrs[i] = ap.Addr().String()
}
return jsNetMapPeerNode{
jsNetMapNode: jsNetMapNode{
Name: name,
Addresses: addrs,
MachineKey: p.Machine().String(),
NodeKey: p.Key().String(),
},
Online: p.Online().Clone(),
TailscaleSSHEnabled: p.Hostinfo().TailscaleSSHEnabled(),
}
}),
LockedOut: nm.TKAEnabled && nm.SelfNode.KeySignature().Len() == 0,
}
if jsonNetMap, err := json.Marshal(jsNetMap); err == nil {
jsCallbacks.Call("notifyNetMap", string(jsonNetMap))
} else {
log.Printf("Could not generate JSON netmap: %v", err)
}
} }
} }
if n.BrowseToURL != nil { if n.BrowseToURL != nil {
+4 -9
View File
@@ -171,14 +171,9 @@ func (cm *CertManager) runCertLoop(ctx context.Context, domain string) {
} }
} }
// waitForCertDomain ensures the requested domain is in the list of allowed // domains before issuing the cert for the first time. It uses the IPN bus
// domains before issuing the cert for the first time. // only as a wake-up trigger (Notify.SelfChange) and queries the current
// It uses the IPN bus only as a wake-up trigger and queries the current cert // cert domains explicitly via [LocalClient.CertDomains].
// domains explicitly via [LocalClient.CertDomains].
//
// TODO(bradfitz): once Notify.SelfChange lands upstream, switch this to
// watch for SelfChange events instead of NotifyInitialNetMap, and drop the
// netmap dependency on the bus entirely.
func (cm *CertManager) waitForCertDomain(ctx context.Context, domain string) error { func (cm *CertManager) waitForCertDomain(ctx context.Context, domain string) error {
w, err := cm.lc.WatchIPNBus(ctx, ipn.NotifyInitialNetMap) w, err := cm.lc.WatchIPNBus(ctx, ipn.NotifyInitialNetMap)
if err != nil { if err != nil {
@@ -191,7 +186,7 @@ func (cm *CertManager) waitForCertDomain(ctx context.Context, domain string) err
if err != nil { if err != nil {
return err return err
} }
if n.NetMap == nil { if n.SelfChange == nil {
continue continue
} }
domains, err := cm.lc.CertDomains(ctx) domains, err := cm.lc.CertDomains(ctx)
+5 -6
View File
@@ -12,7 +12,6 @@ import (
"tailscale.com/ipn" "tailscale.com/ipn"
"tailscale.com/kube/localclient" "tailscale.com/kube/localclient"
"tailscale.com/tailcfg" "tailscale.com/tailcfg"
"tailscale.com/types/netmap"
) )
// TestEnsureCertLoops tests that the certManager correctly starts and stops // TestEnsureCertLoops tests that the certManager correctly starts and stops
@@ -201,12 +200,12 @@ func TestEnsureCertLoops(t *testing.T) {
notifyChan := make(chan ipn.Notify) notifyChan := make(chan ipn.Notify)
go func() { go func() {
// Drive waitForCertDomain by sending notifications // SelfChange wakes the cert manager; cert domains are
// with empty netmaps as wake-up triggers; the cert // then fetched via FakeLocalClient.CertDomainsResult.
// manager queries CertDomains via the local
// client and not by reading the bus payload.
for { for {
notifyChan <- ipn.Notify{NetMap: &netmap.NetworkMap{}} notifyChan <- ipn.Notify{
SelfChange: &tailcfg.Node{StableID: "test"},
}
} }
}() }()
cm := &CertManager{ cm := &CertManager{
+2 -2
View File
@@ -65,8 +65,8 @@ func (h *Healthz) MonitorHealth(ctx context.Context, lc *local.Client) error {
return err return err
} }
if n.NetMap != nil { if self := n.SelfChange; self != nil {
h.Update(n.NetMap.SelfNode.Addresses().Len() != 0) h.Update(len(self.Addresses) != 0)
} }
} }
} }
+9 -8
View File
@@ -44,9 +44,9 @@ func SetInitialKeys(store ipn.StateStore, podUID string) error {
// KeepKeysUpdated sets state store keys consistent with containerboot to // KeepKeysUpdated sets state store keys consistent with containerboot to
// signal proxy readiness to the operator. It runs until its context is // signal proxy readiness to the operator. It runs until its context is
// cancelled or it hits an error. The passed in next function is expected to be // cancelled or it hits an error. It watches the IPN bus for SelfChange
// from a local.IPNBusWatcher that is at least subscribed to // notifications (which fire whenever the self node changes) and reads
// ipn.NotifyInitialNetMap. // the new self node directly from the notify.
func KeepKeysUpdated(ctx context.Context, store ipn.StateStore, lc klc.LocalClient) error { func KeepKeysUpdated(ctx context.Context, store ipn.StateStore, lc klc.LocalClient) error {
w, err := lc.WatchIPNBus(ctx, ipn.NotifyInitialNetMap) w, err := lc.WatchIPNBus(ctx, ipn.NotifyInitialNetMap)
if err != nil { if err != nil {
@@ -63,25 +63,26 @@ func KeepKeysUpdated(ctx context.Context, store ipn.StateStore, lc klc.LocalClie
} }
return err return err
} }
if n.NetMap == nil { self := n.SelfChange
if self == nil {
continue continue
} }
if deviceID := n.NetMap.SelfNode.StableID(); deephash.Update(&currentDeviceID, &deviceID) { if deviceID := self.StableID; deephash.Update(&currentDeviceID, &deviceID) {
if err := store.WriteState(keyDeviceID, []byte(deviceID)); err != nil { if err := store.WriteState(keyDeviceID, []byte(deviceID)); err != nil {
return fmt.Errorf("failed to store device ID in state: %w", err) return fmt.Errorf("failed to store device ID in state: %w", err)
} }
} }
if fqdn := n.NetMap.SelfNode.Name(); deephash.Update(&currentDeviceFQDN, &fqdn) { if fqdn := self.Name; deephash.Update(&currentDeviceFQDN, &fqdn) {
if err := store.WriteState(keyDeviceFQDN, []byte(fqdn)); err != nil { if err := store.WriteState(keyDeviceFQDN, []byte(fqdn)); err != nil {
return fmt.Errorf("failed to store device FQDN in state: %w", err) return fmt.Errorf("failed to store device FQDN in state: %w", err)
} }
} }
if addrs := n.NetMap.SelfNode.Addresses(); deephash.Update(&currentDeviceIPs, &addrs) { if addrs := self.Addresses; deephash.Update(&currentDeviceIPs, &addrs) {
var deviceIPs []string var deviceIPs []string
for _, addr := range addrs.AsSlice() { for _, addr := range addrs {
deviceIPs = append(deviceIPs, addr.Addr().String()) deviceIPs = append(deviceIPs, addr.Addr().String())
} }
deviceIPsValue, err := json.Marshal(deviceIPs) deviceIPsValue, err := json.Marshal(deviceIPs)
+8 -13
View File
@@ -18,7 +18,6 @@ import (
klc "tailscale.com/kube/localclient" klc "tailscale.com/kube/localclient"
"tailscale.com/tailcfg" "tailscale.com/tailcfg"
"tailscale.com/types/logger" "tailscale.com/types/logger"
"tailscale.com/types/netmap"
) )
func TestSetInitialStateKeys(t *testing.T) { func TestSetInitialStateKeys(t *testing.T) {
@@ -133,12 +132,10 @@ func TestKeepStateKeysUpdated(t *testing.T) {
{ {
name: "authed", name: "authed",
notify: ipn.Notify{ notify: ipn.Notify{
NetMap: &netmap.NetworkMap{ SelfChange: &tailcfg.Node{
SelfNode: (&tailcfg.Node{ StableID: "TESTCTRL00000001",
StableID: "TESTCTRL00000001", Name: "test-node.test.ts.net",
Name: "test-node.test.ts.net", Addresses: []netip.Prefix{netip.MustParsePrefix("100.64.0.1/32"), netip.MustParsePrefix("fd7a:115c:a1e0:ab12:4843:cd96:0:1/128")},
Addresses: []netip.Prefix{netip.MustParsePrefix("100.64.0.1/32"), netip.MustParsePrefix("fd7a:115c:a1e0:ab12:4843:cd96:0:1/128")},
}).View(),
}, },
}, },
expected: []string{ expected: []string{
@@ -150,12 +147,10 @@ func TestKeepStateKeysUpdated(t *testing.T) {
{ {
name: "updated_fields", name: "updated_fields",
notify: ipn.Notify{ notify: ipn.Notify{
NetMap: &netmap.NetworkMap{ SelfChange: &tailcfg.Node{
SelfNode: (&tailcfg.Node{ StableID: "TESTCTRL00000001",
StableID: "TESTCTRL00000001", Name: "updated.test.ts.net",
Name: "updated.test.ts.net", Addresses: []netip.Prefix{netip.MustParsePrefix("100.64.0.250/32")},
Addresses: []netip.Prefix{netip.MustParsePrefix("100.64.0.250/32")},
}).View(),
}, },
}, },
expected: []string{ expected: []string{
+4 -13
View File
@@ -12,7 +12,6 @@ import (
"net/http" "net/http"
"slices" "slices"
"tailscale.com/ipn"
"tailscale.com/ipn/ipnstate" "tailscale.com/ipn/ipnstate"
"tailscale.com/tsnet" "tailscale.com/tsnet"
"tailscale.com/util/dnsname" "tailscale.com/util/dnsname"
@@ -108,24 +107,16 @@ func (m *monitor) handleNetmap(w http.ResponseWriter, r *http.Request) {
http.Error(w, "", http.StatusInternalServerError) http.Error(w, "", http.StatusInternalServerError)
return return
} }
watcher, err := lc.WatchIPNBus(r.Context(), ipn.NotifyInitialNetMap) st, err := lc.Status(r.Context())
if err != nil { if err != nil {
log.Printf("monitor: error WatchIPNBus: %v", err) log.Printf("monitor: error fetching status: %v", err)
http.Error(w, "", http.StatusInternalServerError)
return
}
defer watcher.Close()
n, err := watcher.Next()
if err != nil {
log.Printf("monitor: error watcher.Next: %v", err)
http.Error(w, "", http.StatusInternalServerError) http.Error(w, "", http.StatusInternalServerError)
return return
} }
encoder := json.NewEncoder(w) encoder := json.NewEncoder(w)
encoder.SetIndent("", "\t") encoder.SetIndent("", "\t")
if err := encoder.Encode(n); err != nil { if err := encoder.Encode(st); err != nil {
log.Printf("monitor: error encoding netmap: %v", err) log.Printf("monitor: error encoding status: %v", err)
return return
} }
} }