WIP: rebase for 2026-05-18 #7
+28
-85
@@ -21,6 +21,7 @@ import (
|
||||
"github.com/fsnotify/fsnotify"
|
||||
"tailscale.com/client/local"
|
||||
"tailscale.com/ipn"
|
||||
"tailscale.com/kube/authkey"
|
||||
"tailscale.com/kube/egressservices"
|
||||
"tailscale.com/kube/ingressservices"
|
||||
"tailscale.com/kube/kubeapi"
|
||||
@@ -32,7 +33,6 @@ import (
|
||||
)
|
||||
|
||||
const fieldManager = "tailscale-container"
|
||||
const kubeletMountedConfigLn = "..data"
|
||||
|
||||
// kubeClient is a wrapper around Tailscale's internal kube client that knows how to talk to the kube API server. We use
|
||||
// this rather than any of the upstream Kubernetes client libaries to avoid extra imports.
|
||||
@@ -127,6 +127,9 @@ func (kc *kubeClient) deleteAuthKey(ctx context.Context) error {
|
||||
|
||||
// resetContainerbootState resets state from previous runs of containerboot to
|
||||
// ensure the operator doesn't use stale state when a Pod is first recreated.
|
||||
//
|
||||
// Device identity keys (device_id, device_fqdn, device_ips) are preserved so
|
||||
// the operator can clean up the old device from the control plane.
|
||||
func (kc *kubeClient) resetContainerbootState(ctx context.Context, podUID string, tailscaledConfigAuthkey string) error {
|
||||
existingSecret, err := kc.GetSecret(ctx, kc.stateSecret)
|
||||
switch {
|
||||
@@ -139,12 +142,7 @@ func (kc *kubeClient) resetContainerbootState(ctx context.Context, podUID string
|
||||
|
||||
s := &kubeapi.Secret{
|
||||
Data: map[string][]byte{
|
||||
kubetypes.KeyCapVer: fmt.Appendf(nil, "%d", tailcfg.CurrentCapabilityVersion),
|
||||
|
||||
// TODO(tomhjp): Perhaps shouldn't clear device ID and use a different signal, as this could leak tailnet devices.
|
||||
kubetypes.KeyDeviceID: nil,
|
||||
kubetypes.KeyDeviceFQDN: nil,
|
||||
kubetypes.KeyDeviceIPs: nil,
|
||||
kubetypes.KeyCapVer: fmt.Appendf(nil, "%d", tailcfg.CurrentCapabilityVersion),
|
||||
kubetypes.KeyHTTPSEndpoint: nil,
|
||||
egressservices.KeyEgressServices: nil,
|
||||
ingressservices.IngressConfigKey: nil,
|
||||
@@ -169,47 +167,18 @@ func (kc *kubeClient) setAndWaitForAuthKeyReissue(ctx context.Context, client *l
|
||||
return fmt.Errorf("error disconnecting from control: %w", err)
|
||||
}
|
||||
|
||||
err = kc.setReissueAuthKey(ctx, tailscaledConfigAuthKey)
|
||||
err = authkey.SetReissueAuthKey(ctx, kc.Client, kc.stateSecret, tailscaledConfigAuthKey, authkey.TailscaleContainerFieldManager)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to set reissue_authkey in Kubernetes Secret: %w", err)
|
||||
}
|
||||
|
||||
err = kc.waitForAuthKeyReissue(ctx, cfg.TailscaledConfigFilePath, tailscaledConfigAuthKey, 10*time.Minute)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to receive new auth key: %w", err)
|
||||
clearFn := func(ctx context.Context) error {
|
||||
return authkey.ClearReissueAuthKey(ctx, kc.Client, kc.stateSecret, authkey.TailscaleContainerFieldManager)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (kc *kubeClient) setReissueAuthKey(ctx context.Context, authKey string) error {
|
||||
s := &kubeapi.Secret{
|
||||
Data: map[string][]byte{
|
||||
kubetypes.KeyReissueAuthkey: []byte(authKey),
|
||||
},
|
||||
}
|
||||
|
||||
log.Printf("Requesting a new auth key from operator")
|
||||
return kc.StrategicMergePatchSecret(ctx, kc.stateSecret, s, fieldManager)
|
||||
}
|
||||
|
||||
func (kc *kubeClient) waitForAuthKeyReissue(ctx context.Context, configPath string, oldAuthKey string, maxWait time.Duration) error {
|
||||
log.Printf("Waiting for operator to provide new auth key (max wait: %v)", maxWait)
|
||||
|
||||
ctx, cancel := context.WithTimeout(ctx, maxWait)
|
||||
defer cancel()
|
||||
|
||||
tailscaledCfgDir := filepath.Dir(configPath)
|
||||
toWatch := filepath.Join(tailscaledCfgDir, kubeletMountedConfigLn)
|
||||
|
||||
var (
|
||||
pollTicker <-chan time.Time
|
||||
eventChan <-chan fsnotify.Event
|
||||
)
|
||||
|
||||
pollInterval := 5 * time.Second
|
||||
|
||||
// Try to use fsnotify for faster notification
|
||||
getAuthKey := func() string { return authkey.AuthKeyFromConfig(cfg.TailscaledConfigFilePath) }
|
||||
tailscaledCfgDir := filepath.Dir(cfg.TailscaledConfigFilePath)
|
||||
var notify <-chan struct{}
|
||||
if w, err := fsnotify.NewWatcher(); err != nil {
|
||||
log.Printf("auth key reissue: fsnotify unavailable, using polling: %v", err)
|
||||
} else if err := w.Add(tailscaledCfgDir); err != nil {
|
||||
@@ -217,54 +186,28 @@ func (kc *kubeClient) waitForAuthKeyReissue(ctx context.Context, configPath stri
|
||||
log.Printf("auth key reissue: fsnotify watch failed, using polling: %v", err)
|
||||
} else {
|
||||
defer w.Close()
|
||||
ch := make(chan struct{}, 1)
|
||||
toWatch := filepath.Join(tailscaledCfgDir, "..data")
|
||||
go func() {
|
||||
for ev := range w.Events {
|
||||
if ev.Name == toWatch {
|
||||
select {
|
||||
case ch <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
notify = ch
|
||||
log.Printf("auth key reissue: watching for config changes via fsnotify")
|
||||
eventChan = w.Events
|
||||
}
|
||||
|
||||
// still keep polling if using fsnotify, for logging and in case fsnotify fails
|
||||
pt := time.NewTicker(pollInterval)
|
||||
defer pt.Stop()
|
||||
pollTicker = pt.C
|
||||
|
||||
start := time.Now()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return fmt.Errorf("timeout waiting for auth key reissue after %v", maxWait)
|
||||
case <-pollTicker: // Waits for polling tick, continues when received
|
||||
case event := <-eventChan:
|
||||
if event.Name != toWatch {
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
newAuthKey := authkeyFromTailscaledConfig(configPath)
|
||||
if newAuthKey != "" && newAuthKey != oldAuthKey {
|
||||
log.Printf("New auth key received from operator after %v", time.Since(start).Round(time.Second))
|
||||
|
||||
if err := kc.clearReissueAuthKeyRequest(ctx); err != nil {
|
||||
log.Printf("Warning: failed to clear reissue request: %v", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
if eventChan == nil && pollTicker != nil {
|
||||
log.Printf("Waiting for new auth key from operator (%v elapsed)", time.Since(start).Round(time.Second))
|
||||
}
|
||||
err = authkey.WaitForAuthKeyReissue(ctx, tailscaledConfigAuthKey, 10*time.Minute, getAuthKey, clearFn, notify)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to receive new auth key: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
// clearReissueAuthKeyRequest removes the reissue_authkey marker from the Secret
|
||||
// to signal to the operator that we've successfully received the new key.
|
||||
func (kc *kubeClient) clearReissueAuthKeyRequest(ctx context.Context) error {
|
||||
s := &kubeapi.Secret{
|
||||
Data: map[string][]byte{
|
||||
kubetypes.KeyReissueAuthkey: nil,
|
||||
},
|
||||
}
|
||||
return kc.StrategicMergePatchSecret(ctx, kc.stateSecret, s, fieldManager)
|
||||
return nil
|
||||
}
|
||||
|
||||
// waitForConsistentState waits for tailscaled to finish writing state if it
|
||||
|
||||
@@ -257,12 +257,8 @@ func TestResetContainerbootState(t *testing.T) {
|
||||
authkey: "new-authkey",
|
||||
initial: map[string][]byte{},
|
||||
expected: map[string][]byte{
|
||||
kubetypes.KeyCapVer: capver,
|
||||
kubetypes.KeyPodUID: []byte("1234"),
|
||||
// Cleared keys.
|
||||
kubetypes.KeyDeviceID: nil,
|
||||
kubetypes.KeyDeviceFQDN: nil,
|
||||
kubetypes.KeyDeviceIPs: nil,
|
||||
kubetypes.KeyCapVer: capver,
|
||||
kubetypes.KeyPodUID: []byte("1234"),
|
||||
kubetypes.KeyHTTPSEndpoint: nil,
|
||||
egressservices.KeyEgressServices: nil,
|
||||
ingressservices.IngressConfigKey: nil,
|
||||
@@ -271,11 +267,7 @@ func TestResetContainerbootState(t *testing.T) {
|
||||
"empty_initial_no_pod_uid": {
|
||||
initial: map[string][]byte{},
|
||||
expected: map[string][]byte{
|
||||
kubetypes.KeyCapVer: capver,
|
||||
// Cleared keys.
|
||||
kubetypes.KeyDeviceID: nil,
|
||||
kubetypes.KeyDeviceFQDN: nil,
|
||||
kubetypes.KeyDeviceIPs: nil,
|
||||
kubetypes.KeyCapVer: capver,
|
||||
kubetypes.KeyHTTPSEndpoint: nil,
|
||||
egressservices.KeyEgressServices: nil,
|
||||
ingressservices.IngressConfigKey: nil,
|
||||
@@ -303,9 +295,6 @@ func TestResetContainerbootState(t *testing.T) {
|
||||
kubetypes.KeyCapVer: capver,
|
||||
kubetypes.KeyPodUID: []byte("1234"),
|
||||
// Cleared keys.
|
||||
kubetypes.KeyDeviceID: nil,
|
||||
kubetypes.KeyDeviceFQDN: nil,
|
||||
kubetypes.KeyDeviceIPs: nil,
|
||||
kubetypes.KeyHTTPSEndpoint: nil,
|
||||
egressservices.KeyEgressServices: nil,
|
||||
ingressservices.IngressConfigKey: nil,
|
||||
@@ -321,9 +310,6 @@ func TestResetContainerbootState(t *testing.T) {
|
||||
kubetypes.KeyCapVer: capver,
|
||||
kubetypes.KeyReissueAuthkey: nil,
|
||||
// Cleared keys.
|
||||
kubetypes.KeyDeviceID: nil,
|
||||
kubetypes.KeyDeviceFQDN: nil,
|
||||
kubetypes.KeyDeviceIPs: nil,
|
||||
kubetypes.KeyHTTPSEndpoint: nil,
|
||||
egressservices.KeyEgressServices: nil,
|
||||
ingressservices.IngressConfigKey: nil,
|
||||
@@ -338,9 +324,6 @@ func TestResetContainerbootState(t *testing.T) {
|
||||
kubetypes.KeyCapVer: capver,
|
||||
// reissue_authkey not cleared.
|
||||
// Cleared keys.
|
||||
kubetypes.KeyDeviceID: nil,
|
||||
kubetypes.KeyDeviceFQDN: nil,
|
||||
kubetypes.KeyDeviceIPs: nil,
|
||||
kubetypes.KeyHTTPSEndpoint: nil,
|
||||
egressservices.KeyEgressServices: nil,
|
||||
ingressservices.IngressConfigKey: nil,
|
||||
@@ -355,9 +338,6 @@ func TestResetContainerbootState(t *testing.T) {
|
||||
kubetypes.KeyCapVer: capver,
|
||||
// reissue_authkey not cleared.
|
||||
// Cleared keys.
|
||||
kubetypes.KeyDeviceID: nil,
|
||||
kubetypes.KeyDeviceFQDN: nil,
|
||||
kubetypes.KeyDeviceIPs: nil,
|
||||
kubetypes.KeyHTTPSEndpoint: nil,
|
||||
egressservices.KeyEgressServices: nil,
|
||||
ingressservices.IngressConfigKey: nil,
|
||||
|
||||
@@ -139,8 +139,8 @@ import (
|
||||
|
||||
"tailscale.com/health"
|
||||
"tailscale.com/ipn"
|
||||
"tailscale.com/ipn/conffile"
|
||||
kubeutils "tailscale.com/k8s-operator"
|
||||
"tailscale.com/kube/authkey"
|
||||
healthz "tailscale.com/kube/health"
|
||||
"tailscale.com/kube/kubetypes"
|
||||
klc "tailscale.com/kube/localclient"
|
||||
@@ -209,7 +209,7 @@ func run() error {
|
||||
|
||||
var tailscaledConfigAuthkey string
|
||||
if isOneStepConfig(cfg) {
|
||||
tailscaledConfigAuthkey = authkeyFromTailscaledConfig(cfg.TailscaledConfigFilePath)
|
||||
tailscaledConfigAuthkey = authkey.AuthKeyFromConfig(cfg.TailscaledConfigFilePath)
|
||||
}
|
||||
|
||||
var kc *kubeClient
|
||||
@@ -374,7 +374,7 @@ authLoop:
|
||||
if hasKubeStateStore(cfg) {
|
||||
log.Printf("Auth key missing or invalid (NeedsLogin state), disconnecting from control and requesting new key from operator")
|
||||
|
||||
err := kc.setAndWaitForAuthKeyReissue(bootCtx, client, cfg, tailscaledConfigAuthkey)
|
||||
err := kc.setAndWaitForAuthKeyReissue(ctx, client, cfg, tailscaledConfigAuthkey)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get a reissued authkey: %w", err)
|
||||
}
|
||||
@@ -414,7 +414,7 @@ authLoop:
|
||||
if isOneStepConfig(cfg) && hasKubeStateStore(cfg) {
|
||||
log.Printf("Auth key failed to authenticate (may be expired or single-use), disconnecting from control and requesting new key from operator")
|
||||
|
||||
err := kc.setAndWaitForAuthKeyReissue(bootCtx, client, cfg, tailscaledConfigAuthkey)
|
||||
err := kc.setAndWaitForAuthKeyReissue(ctx, client, cfg, tailscaledConfigAuthkey)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get a reissued authkey: %w", err)
|
||||
}
|
||||
@@ -1024,11 +1024,3 @@ func serviceIPsFromNetMap(nm *netmap.NetworkMap, fqdn dnsname.FQDN) []netip.Pref
|
||||
|
||||
return prefixes
|
||||
}
|
||||
|
||||
func authkeyFromTailscaledConfig(path string) string {
|
||||
if cfg, err := conffile.Load(path); err == nil && cfg.Parsed.AuthKey != nil {
|
||||
return *cfg.Parsed.AuthKey
|
||||
}
|
||||
|
||||
return ""
|
||||
}
|
||||
|
||||
@@ -31,6 +31,7 @@ import (
|
||||
"k8s.io/utils/strings/slices"
|
||||
"tailscale.com/client/local"
|
||||
"tailscale.com/cmd/k8s-proxy/internal/config"
|
||||
"tailscale.com/health"
|
||||
"tailscale.com/hostinfo"
|
||||
"tailscale.com/ipn"
|
||||
"tailscale.com/ipn/store"
|
||||
@@ -41,6 +42,7 @@ import (
|
||||
"tailscale.com/kube/certs"
|
||||
healthz "tailscale.com/kube/health"
|
||||
"tailscale.com/kube/k8s-proxy/conf"
|
||||
"tailscale.com/kube/kubeclient"
|
||||
"tailscale.com/kube/kubetypes"
|
||||
klc "tailscale.com/kube/localclient"
|
||||
"tailscale.com/kube/metrics"
|
||||
@@ -171,10 +173,31 @@ func run(logger *zap.SugaredLogger) error {
|
||||
|
||||
// If Pod UID unset, assume we're running outside of a cluster/not managed
|
||||
// by the operator, so no need to set additional state keys.
|
||||
var kc kubeclient.Client
|
||||
var stateSecretName string
|
||||
if podUID != "" {
|
||||
if err := state.SetInitialKeys(st, podUID); err != nil {
|
||||
return fmt.Errorf("error setting initial state: %w", err)
|
||||
}
|
||||
|
||||
if cfg.Parsed.State != nil {
|
||||
if name, ok := strings.CutPrefix(*cfg.Parsed.State, "kube:"); ok {
|
||||
stateSecretName = name
|
||||
|
||||
kc, err = kubeclient.New(k8sProxyFieldManager)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var configAuthKey string
|
||||
if cfg.Parsed.AuthKey != nil {
|
||||
configAuthKey = *cfg.Parsed.AuthKey
|
||||
}
|
||||
if err := resetState(ctx, kc, stateSecretName, podUID, configAuthKey); err != nil {
|
||||
return fmt.Errorf("error resetting state: %w", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
var authKey string
|
||||
@@ -197,23 +220,69 @@ func run(logger *zap.SugaredLogger) error {
|
||||
ts.Hostname = *cfg.Parsed.Hostname
|
||||
}
|
||||
|
||||
// Make sure we crash loop if Up doesn't complete in reasonable time.
|
||||
upCtx, upCancel := context.WithTimeout(ctx, time.Minute)
|
||||
defer upCancel()
|
||||
if _, err := ts.Up(upCtx); err != nil {
|
||||
return fmt.Errorf("error starting tailscale server: %w", err)
|
||||
}
|
||||
defer ts.Close()
|
||||
lc, err := ts.LocalClient()
|
||||
if err != nil {
|
||||
return fmt.Errorf("error getting local client: %w", err)
|
||||
}
|
||||
|
||||
// Setup for updating state keys.
|
||||
// Make sure we crash loop if Up doesn't complete in reasonable time.
|
||||
upCtx, upCancel := context.WithTimeout(ctx, 30*time.Second)
|
||||
defer upCancel()
|
||||
|
||||
// ts.Up() deliberately ignores NeedsLogin because it fires transiently
|
||||
// during normal auth-key login. We can watch for the login-state health
|
||||
// warning here though, which only fires on terminal auth failure, and
|
||||
// cancel early.
|
||||
go func() {
|
||||
w, err := lc.WatchIPNBus(upCtx, ipn.NotifyInitialHealthState)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
defer w.Close()
|
||||
for {
|
||||
n, err := w.Next()
|
||||
if err != nil {
|
||||
logger.Debugf("failed to process message from ipn bus: %s", err.Error())
|
||||
return
|
||||
}
|
||||
if n.Health != nil {
|
||||
if _, ok := n.Health.Warnings[health.LoginStateWarnable.Code]; ok {
|
||||
upCancel()
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
if _, err := ts.Up(upCtx); err != nil {
|
||||
if kc != nil && stateSecretName != "" {
|
||||
return handleAuthKeyReissue(ctx, lc, kc, stateSecretName, authKey, cfgChan, logger)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
defer ts.Close()
|
||||
|
||||
reissueCh := make(chan struct{}, 1)
|
||||
if podUID != "" {
|
||||
group.Go(func() error {
|
||||
return state.KeepKeysUpdated(ctx, st, klc.New(lc))
|
||||
})
|
||||
|
||||
if kc != nil && stateSecretName != "" {
|
||||
needsReissue, err := checkInitialAuthState(ctx, lc)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error checking initial auth state: %w", err)
|
||||
}
|
||||
if needsReissue {
|
||||
logger.Info("Auth key missing or invalid after startup, requesting new key from operator")
|
||||
return handleAuthKeyReissue(ctx, lc, kc, stateSecretName, authKey, cfgChan, logger)
|
||||
}
|
||||
|
||||
group.Go(func() error {
|
||||
return monitorAuthHealth(ctx, lc, reissueCh, logger)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
if cfg.Parsed.HealthCheckEnabled.EqualBool(true) || cfg.Parsed.MetricsEnabled.EqualBool(true) {
|
||||
@@ -362,6 +431,8 @@ func run(logger *zap.SugaredLogger) error {
|
||||
}
|
||||
|
||||
cfgLogger.Infof("Config reloaded")
|
||||
case <-reissueCh:
|
||||
return handleAuthKeyReissue(ctx, lc, kc, stateSecretName, authKey, cfgChan, logger)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,161 @@
|
||||
// Copyright (c) Tailscale Inc & contributors
|
||||
// SPDX-License-Identifier: BSD-3-Clause
|
||||
|
||||
//go:build !plan9
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"go.uber.org/zap"
|
||||
"tailscale.com/client/local"
|
||||
"tailscale.com/health"
|
||||
"tailscale.com/ipn"
|
||||
"tailscale.com/kube/authkey"
|
||||
"tailscale.com/kube/k8s-proxy/conf"
|
||||
"tailscale.com/kube/kubeapi"
|
||||
"tailscale.com/kube/kubeclient"
|
||||
"tailscale.com/kube/kubetypes"
|
||||
"tailscale.com/tailcfg"
|
||||
)
|
||||
|
||||
const k8sProxyFieldManager = "tailscale-k8s-proxy"
|
||||
|
||||
// resetState clears k8s-proxy state from previous runs and sets
|
||||
// initial values. This ensures the operator doesn't use stale state when a Pod
|
||||
// is first recreated.
|
||||
//
|
||||
// It also clears the reissue_authkey marker if the operator has actioned it
|
||||
// (i.e., the config now has a different auth key than what was marked for
|
||||
// reissue).
|
||||
func resetState(ctx context.Context, kc kubeclient.Client, stateSecretName string, podUID string, configAuthKey string) error {
|
||||
existingSecret, err := kc.GetSecret(ctx, stateSecretName)
|
||||
switch {
|
||||
case kubeclient.IsNotFoundErr(err):
|
||||
return nil
|
||||
case err != nil:
|
||||
return fmt.Errorf("failed to read state Secret %q to reset state: %w", stateSecretName, err)
|
||||
}
|
||||
|
||||
s := &kubeapi.Secret{
|
||||
Data: map[string][]byte{
|
||||
kubetypes.KeyCapVer: fmt.Appendf(nil, "%d", tailcfg.CurrentCapabilityVersion),
|
||||
},
|
||||
}
|
||||
if podUID != "" {
|
||||
s.Data[kubetypes.KeyPodUID] = []byte(podUID)
|
||||
}
|
||||
|
||||
// Only clear reissue_authkey if the operator has actioned it.
|
||||
brokenAuthkey, ok := existingSecret.Data[kubetypes.KeyReissueAuthkey]
|
||||
if ok && configAuthKey != "" && string(brokenAuthkey) != configAuthKey {
|
||||
s.Data[kubetypes.KeyReissueAuthkey] = nil
|
||||
}
|
||||
|
||||
return kc.StrategicMergePatchSecret(ctx, stateSecretName, s, k8sProxyFieldManager)
|
||||
}
|
||||
|
||||
// needsAuthKeyReissue reports whether the given backend state and health
|
||||
// warnings indicate a terminal auth failure requiring a new key from the
|
||||
// operator.
|
||||
func needsAuthKeyReissue(backendState string, healthWarnings []string) bool {
|
||||
if backendState == ipn.NeedsLogin.String() {
|
||||
return true
|
||||
}
|
||||
loginWarnableCode := string(health.LoginStateWarnable.Code)
|
||||
for _, h := range healthWarnings {
|
||||
if strings.Contains(h, loginWarnableCode) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// checkInitialAuthState checks if the tsnet server is in an auth failure state
|
||||
// immediately after coming up. Returns true if auth key reissue is needed.
|
||||
func checkInitialAuthState(ctx context.Context, lc *local.Client) (bool, error) {
|
||||
status, err := lc.Status(ctx)
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("error getting status: %w", err)
|
||||
}
|
||||
return needsAuthKeyReissue(status.BackendState, status.Health), nil
|
||||
}
|
||||
|
||||
// monitorAuthHealth watches the IPN bus for auth failures and triggers reissue
|
||||
// when needed. Runs until context is cancelled or auth failure is detected.
|
||||
func monitorAuthHealth(ctx context.Context, lc *local.Client, reissueCh chan<- struct{}, logger *zap.SugaredLogger) error {
|
||||
w, err := lc.WatchIPNBus(ctx, ipn.NotifyInitialHealthState)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to watch IPN bus for auth health: %w", err)
|
||||
}
|
||||
defer w.Close()
|
||||
|
||||
for {
|
||||
if ctx.Err() != nil {
|
||||
return ctx.Err()
|
||||
}
|
||||
n, err := w.Next()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if n.Health != nil {
|
||||
if _, ok := n.Health.Warnings[health.LoginStateWarnable.Code]; ok {
|
||||
logger.Info("Auth key failed to authenticate (may be expired or single-use), requesting new key from operator")
|
||||
select {
|
||||
case reissueCh <- struct{}{}:
|
||||
case <-ctx.Done():
|
||||
}
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// handleAuthKeyReissue orchestrates the auth key reissue flow:
|
||||
// 1. Disconnect from control
|
||||
// 2. Set reissue marker in state Secret
|
||||
// 3. Wait for operator to provide new key
|
||||
// 4. Exit cleanly (Kubernetes will restart the pod with the new key)
|
||||
func handleAuthKeyReissue(ctx context.Context, lc *local.Client, kc kubeclient.Client, stateSecretName string, currentAuthKey string, cfgChan <-chan *conf.Config, logger *zap.SugaredLogger) error {
|
||||
if err := lc.DisconnectControl(ctx); err != nil {
|
||||
return fmt.Errorf("error disconnecting from control: %w", err)
|
||||
}
|
||||
if err := authkey.SetReissueAuthKey(ctx, kc, stateSecretName, currentAuthKey, k8sProxyFieldManager); err != nil {
|
||||
return fmt.Errorf("failed to set reissue_authkey in Kubernetes Secret: %w", err)
|
||||
}
|
||||
|
||||
var mu sync.Mutex
|
||||
var latestAuthKey string
|
||||
notify := make(chan struct{}, 1)
|
||||
|
||||
// we use this go func to abstract away conf.Config from the shared function
|
||||
go func() {
|
||||
for cfg := range cfgChan {
|
||||
if cfg.Parsed.AuthKey != nil {
|
||||
mu.Lock()
|
||||
latestAuthKey = *cfg.Parsed.AuthKey
|
||||
mu.Unlock()
|
||||
select {
|
||||
case notify <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
getAuthKey := func() string {
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
return latestAuthKey
|
||||
}
|
||||
clearFn := func(ctx context.Context) error {
|
||||
return authkey.ClearReissueAuthKey(ctx, kc, stateSecretName, k8sProxyFieldManager)
|
||||
}
|
||||
|
||||
return authkey.WaitForAuthKeyReissue(ctx, currentAuthKey, 10*time.Minute, getAuthKey, clearFn, notify)
|
||||
}
|
||||
@@ -0,0 +1,141 @@
|
||||
// Copyright (c) Tailscale Inc & contributors
|
||||
// SPDX-License-Identifier: BSD-3-Clause
|
||||
|
||||
//go:build !plan9
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"tailscale.com/health"
|
||||
"tailscale.com/kube/kubeapi"
|
||||
"tailscale.com/kube/kubeclient"
|
||||
"tailscale.com/kube/kubetypes"
|
||||
"tailscale.com/tailcfg"
|
||||
)
|
||||
|
||||
func TestResetState(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
existingData map[string][]byte
|
||||
podUID string
|
||||
configAuthKey string
|
||||
wantPatched map[string][]byte
|
||||
}{
|
||||
{
|
||||
name: "sets_capver_and_pod_uid",
|
||||
existingData: map[string][]byte{
|
||||
kubetypes.KeyDeviceID: []byte("device-123"),
|
||||
kubetypes.KeyDeviceFQDN: []byte("node.tailnet"),
|
||||
kubetypes.KeyDeviceIPs: []byte(`["100.64.0.1"]`),
|
||||
},
|
||||
podUID: "pod-123",
|
||||
configAuthKey: "new-key",
|
||||
wantPatched: map[string][]byte{
|
||||
kubetypes.KeyPodUID: []byte("pod-123"),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "clears_reissue_marker_when_actioned",
|
||||
existingData: map[string][]byte{
|
||||
kubetypes.KeyReissueAuthkey: []byte("old-key"),
|
||||
},
|
||||
podUID: "pod-123",
|
||||
configAuthKey: "new-key",
|
||||
wantPatched: map[string][]byte{
|
||||
kubetypes.KeyPodUID: []byte("pod-123"),
|
||||
kubetypes.KeyReissueAuthkey: nil,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "keeps_reissue_marker_when_not_actioned",
|
||||
existingData: map[string][]byte{
|
||||
kubetypes.KeyReissueAuthkey: []byte("old-key"),
|
||||
},
|
||||
podUID: "pod-123",
|
||||
configAuthKey: "old-key",
|
||||
wantPatched: map[string][]byte{
|
||||
kubetypes.KeyPodUID: []byte("pod-123"),
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
tt.wantPatched[kubetypes.KeyCapVer] = fmt.Appendf(nil, "%d", tailcfg.CurrentCapabilityVersion)
|
||||
|
||||
var patched map[string][]byte
|
||||
kc := &kubeclient.FakeClient{
|
||||
GetSecretImpl: func(ctx context.Context, name string) (*kubeapi.Secret, error) {
|
||||
return &kubeapi.Secret{Data: tt.existingData}, nil
|
||||
},
|
||||
StrategicMergePatchSecretImpl: func(ctx context.Context, name string, s *kubeapi.Secret, fm string) error {
|
||||
patched = s.Data
|
||||
return nil
|
||||
},
|
||||
}
|
||||
|
||||
err := resetState(context.Background(), kc, "test-secret", tt.podUID, tt.configAuthKey)
|
||||
if err != nil {
|
||||
t.Fatalf("resetState() error = %v", err)
|
||||
}
|
||||
|
||||
if diff := cmp.Diff(tt.wantPatched, patched); diff != "" {
|
||||
t.Errorf("resetState() mismatch (-want +got):\n%s", diff)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestNeedsAuthKeyReissue(t *testing.T) {
|
||||
loginWarnableCode := string(health.LoginStateWarnable.Code)
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
backendState string
|
||||
health []string
|
||||
want bool
|
||||
}{
|
||||
{
|
||||
name: "running_healthy",
|
||||
backendState: "Running",
|
||||
want: false,
|
||||
},
|
||||
{
|
||||
name: "needs_login",
|
||||
backendState: "NeedsLogin",
|
||||
want: true,
|
||||
},
|
||||
{
|
||||
name: "running_with_login_warning",
|
||||
backendState: "Running",
|
||||
health: []string{"warning: " + loginWarnableCode + ": you are logged out"},
|
||||
want: true,
|
||||
},
|
||||
{
|
||||
name: "running_with_unrelated_warning",
|
||||
backendState: "Running",
|
||||
health: []string{"dns-not-working"},
|
||||
want: false,
|
||||
},
|
||||
{
|
||||
name: "running_no_warnings",
|
||||
backendState: "Running",
|
||||
health: nil,
|
||||
want: false,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
got := needsAuthKeyReissue(tt.backendState, tt.health)
|
||||
if got != tt.want {
|
||||
t.Errorf("needsAuthKeyReissue() = %v, want %v", got, tt.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,122 @@
|
||||
// Copyright (c) Tailscale Inc & contributors
|
||||
// SPDX-License-Identifier: BSD-3-Clause
|
||||
|
||||
//go:build !plan9
|
||||
|
||||
// Package authkey provides shared logic for handling auth key reissue
|
||||
// requests between tailnet clients (containerboot, k8s-proxy) and the
|
||||
// operator.
|
||||
//
|
||||
// When a client fails to authenticate (expired key, single-use key already
|
||||
// used), it signals the operator by setting a marker in its state Secret.
|
||||
// The operator responds by deleting the old device and issuing a new auth
|
||||
// key. The client watches for the new key and restarts to apply it.
|
||||
package authkey
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
"time"
|
||||
|
||||
"tailscale.com/ipn"
|
||||
"tailscale.com/ipn/conffile"
|
||||
"tailscale.com/kube/kubeapi"
|
||||
"tailscale.com/kube/kubeclient"
|
||||
"tailscale.com/kube/kubetypes"
|
||||
)
|
||||
|
||||
const (
|
||||
TailscaleContainerFieldManager = "tailscale-container"
|
||||
)
|
||||
|
||||
// SetReissueAuthKey sets the reissue_authkey marker in the state Secret to
|
||||
// signal to the operator that a new auth key is needed. The marker value is
|
||||
// the auth key that failed to authenticate.
|
||||
func SetReissueAuthKey(ctx context.Context, kc kubeclient.Client, stateSecretName string, authKey string, fieldManager string) error {
|
||||
s := &kubeapi.Secret{
|
||||
Data: map[string][]byte{
|
||||
kubetypes.KeyReissueAuthkey: []byte(authKey),
|
||||
},
|
||||
}
|
||||
|
||||
log.Printf("Requesting a new auth key from operator")
|
||||
return kc.StrategicMergePatchSecret(ctx, stateSecretName, s, fieldManager)
|
||||
}
|
||||
|
||||
// ClearReissueAuthKey removes the reissue_authkey marker from the state Secret
|
||||
// to signal to the operator that we've successfully received the new key.
|
||||
func ClearReissueAuthKey(ctx context.Context, kc kubeclient.Client, stateSecretName string, fieldManager string) error {
|
||||
existing, err := kc.GetSecret(ctx, stateSecretName)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error getting state secret: %w", err)
|
||||
}
|
||||
|
||||
s := &kubeapi.Secret{
|
||||
Data: map[string][]byte{
|
||||
kubetypes.KeyReissueAuthkey: nil,
|
||||
kubetypes.KeyDeviceID: nil,
|
||||
kubetypes.KeyDeviceFQDN: nil,
|
||||
kubetypes.KeyDeviceIPs: nil,
|
||||
string(ipn.MachineKeyStateKey): nil,
|
||||
string(ipn.CurrentProfileStateKey): nil,
|
||||
string(ipn.KnownProfilesStateKey): nil,
|
||||
},
|
||||
}
|
||||
|
||||
if profileKey := string(existing.Data[string(ipn.CurrentProfileStateKey)]); profileKey != "" {
|
||||
s.Data[profileKey] = nil
|
||||
}
|
||||
|
||||
return kc.StrategicMergePatchSecret(ctx, stateSecretName, s, fieldManager)
|
||||
}
|
||||
|
||||
// WaitForAuthKeyReissue polls getAuthKey for a new auth key different from
|
||||
// oldAuthKey, returning when one is found or maxWait expires. If notify is
|
||||
// non-nil, it is used to wake the loop on config changes; otherwise it falls
|
||||
// back to periodic polling. The clearFn callback is called when a new key is
|
||||
// detected, to clear the reissue marker from the state Secret.
|
||||
func WaitForAuthKeyReissue(ctx context.Context, oldAuthKey string, maxWait time.Duration, getAuthKey func() string, clearFn func(context.Context) error, notify <-chan struct{}) error {
|
||||
log.Printf("Waiting for operator to provide new auth key (max wait: %v)", maxWait)
|
||||
|
||||
ctx, cancel := context.WithTimeout(ctx, maxWait)
|
||||
defer cancel()
|
||||
|
||||
pollInterval := 5 * time.Second
|
||||
pt := time.NewTicker(pollInterval)
|
||||
defer pt.Stop()
|
||||
|
||||
start := time.Now()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return fmt.Errorf("timeout waiting for auth key reissue after %v", maxWait)
|
||||
case <-pt.C:
|
||||
case <-notify:
|
||||
}
|
||||
|
||||
newAuthKey := getAuthKey()
|
||||
if newAuthKey != "" && newAuthKey != oldAuthKey {
|
||||
log.Printf("New auth key received from operator after %v", time.Since(start).Round(time.Second))
|
||||
if err := clearFn(ctx); err != nil {
|
||||
log.Printf("Warning: failed to clear reissue request: %v", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
if notify == nil {
|
||||
log.Printf("Waiting for new auth key from operator (%v elapsed)", time.Since(start).Round(time.Second))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// AuthKeyFromConfig extracts the auth key from a tailscaled config file.
|
||||
// Returns empty string if the file cannot be read or contains no auth key.
|
||||
func AuthKeyFromConfig(path string) string {
|
||||
if cfg, err := conffile.Load(path); err == nil && cfg.Parsed.AuthKey != nil {
|
||||
return *cfg.Parsed.AuthKey
|
||||
}
|
||||
|
||||
return ""
|
||||
}
|
||||
@@ -0,0 +1,124 @@
|
||||
// Copyright (c) Tailscale Inc & contributors
|
||||
// SPDX-License-Identifier: BSD-3-Clause
|
||||
|
||||
//go:build !plan9
|
||||
|
||||
package authkey
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"tailscale.com/ipn"
|
||||
"tailscale.com/kube/kubeapi"
|
||||
"tailscale.com/kube/kubeclient"
|
||||
"tailscale.com/kube/kubetypes"
|
||||
)
|
||||
|
||||
func TestSetReissueAuthKey(t *testing.T) {
|
||||
var patched map[string][]byte
|
||||
kc := &kubeclient.FakeClient{
|
||||
StrategicMergePatchSecretImpl: func(ctx context.Context, name string, secret *kubeapi.Secret, _ string) error {
|
||||
patched = secret.Data
|
||||
return nil
|
||||
},
|
||||
}
|
||||
|
||||
err := SetReissueAuthKey(context.Background(), kc, "test-secret", "old-auth-key", TailscaleContainerFieldManager)
|
||||
if err != nil {
|
||||
t.Fatalf("SetReissueAuthKey() error = %v", err)
|
||||
}
|
||||
|
||||
want := map[string][]byte{
|
||||
kubetypes.KeyReissueAuthkey: []byte("old-auth-key"),
|
||||
}
|
||||
if diff := cmp.Diff(want, patched); diff != "" {
|
||||
t.Errorf("SetReissueAuthKey() mismatch (-want +got):\n%s", diff)
|
||||
}
|
||||
}
|
||||
|
||||
func TestClearReissueAuthKey(t *testing.T) {
|
||||
var patched map[string][]byte
|
||||
kc := &kubeclient.FakeClient{
|
||||
GetSecretImpl: func(ctx context.Context, name string) (*kubeapi.Secret, error) {
|
||||
return &kubeapi.Secret{
|
||||
Data: map[string][]byte{
|
||||
"_current-profile": []byte("profile-abc1"),
|
||||
"profile-abc1": []byte("some-profile-data"),
|
||||
"_machinekey": []byte("machine-key-data"),
|
||||
},
|
||||
}, nil
|
||||
},
|
||||
StrategicMergePatchSecretImpl: func(ctx context.Context, name string, secret *kubeapi.Secret, _ string) error {
|
||||
patched = secret.Data
|
||||
return nil
|
||||
},
|
||||
}
|
||||
|
||||
err := ClearReissueAuthKey(context.Background(), kc, "test-secret", TailscaleContainerFieldManager)
|
||||
if err != nil {
|
||||
t.Fatalf("ClearReissueAuthKey() error = %v", err)
|
||||
}
|
||||
|
||||
want := map[string][]byte{
|
||||
kubetypes.KeyReissueAuthkey: nil,
|
||||
kubetypes.KeyDeviceID: nil,
|
||||
kubetypes.KeyDeviceFQDN: nil,
|
||||
kubetypes.KeyDeviceIPs: nil,
|
||||
string(ipn.MachineKeyStateKey): nil,
|
||||
string(ipn.CurrentProfileStateKey): nil,
|
||||
string(ipn.KnownProfilesStateKey): nil,
|
||||
"profile-abc1": nil,
|
||||
}
|
||||
if diff := cmp.Diff(want, patched); diff != "" {
|
||||
t.Errorf("ClearReissueAuthKey() mismatch (-want +got):\n%s", diff)
|
||||
}
|
||||
}
|
||||
|
||||
func TestAuthKeyFromConfig(t *testing.T) {
|
||||
for name, tc := range map[string]struct {
|
||||
configContent string
|
||||
want string
|
||||
}{
|
||||
"valid_config_with_authkey": {
|
||||
configContent: `{"Version":"alpha0","AuthKey":"test-auth-key"}`,
|
||||
want: "test-auth-key",
|
||||
},
|
||||
"valid_config_without_authkey": {
|
||||
configContent: `{"Version":"alpha0"}`,
|
||||
want: "",
|
||||
},
|
||||
"invalid_config": {
|
||||
configContent: `not valid json`,
|
||||
want: "",
|
||||
},
|
||||
"empty_config": {
|
||||
configContent: ``,
|
||||
want: "",
|
||||
},
|
||||
} {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
tmpDir := t.TempDir()
|
||||
configPath := filepath.Join(tmpDir, "config.json")
|
||||
|
||||
if err := os.WriteFile(configPath, []byte(tc.configContent), 0600); err != nil {
|
||||
t.Fatalf("failed to write config file: %v", err)
|
||||
}
|
||||
|
||||
got := AuthKeyFromConfig(configPath)
|
||||
if got != tc.want {
|
||||
t.Errorf("AuthKeyFromConfig() = %q, want %q", got, tc.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
t.Run("nonexistent_file", func(t *testing.T) {
|
||||
got := AuthKeyFromConfig("/nonexistent/path/config.json")
|
||||
if got != "" {
|
||||
t.Errorf("AuthKeyFromConfig() = %q, want empty string for nonexistent file", got)
|
||||
}
|
||||
})
|
||||
}
|
||||
+1
-12
@@ -30,19 +30,8 @@ const (
|
||||
keyDeviceFQDN = ipn.StateKey(kubetypes.KeyDeviceFQDN)
|
||||
)
|
||||
|
||||
// SetInitialKeys sets Pod UID and cap ver and clears tailnet device state
|
||||
// keys to help stop the operator using stale tailnet device state.
|
||||
// SetInitialKeys sets Pod UID and cap ver.
|
||||
func SetInitialKeys(store ipn.StateStore, podUID string) error {
|
||||
// Clear device state keys first so the operator knows if the pod UID
|
||||
// matches, the other values are definitely not stale.
|
||||
for _, key := range []ipn.StateKey{keyDeviceID, keyDeviceFQDN, keyDeviceIPs} {
|
||||
if _, err := store.ReadState(key); err == nil {
|
||||
if err := store.WriteState(key, nil); err != nil {
|
||||
return fmt.Errorf("error writing %q to state store: %w", key, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if err := store.WriteState(keyPodUID, []byte(podUID)); err != nil {
|
||||
return fmt.Errorf("error writing pod UID to state store: %w", err)
|
||||
}
|
||||
|
||||
@@ -58,9 +58,9 @@ func TestSetInitialStateKeys(t *testing.T) {
|
||||
expected: map[ipn.StateKey][]byte{
|
||||
keyPodUID: podUID,
|
||||
keyCapVer: expectedCapVer,
|
||||
keyDeviceID: nil,
|
||||
keyDeviceFQDN: nil,
|
||||
keyDeviceIPs: nil,
|
||||
keyDeviceID: []byte("existing-device-id"),
|
||||
keyDeviceFQDN: []byte("existing-device-fqdn"),
|
||||
keyDeviceIPs: []byte(`["1.2.3.4"]`),
|
||||
},
|
||||
},
|
||||
} {
|
||||
|
||||
Reference in New Issue
Block a user