cmd/{containerboot,k8s-operator}: reissue auth keys for broken proxies (#16450)
Adds logic for containerboot to signal that it can't auth, so the
operator can reissue a new auth key. This only applies when running with
a config file and with a kube state store.
If the operator sees reissue_authkey in a state Secret, it will create a
new auth key iff the config has no auth key or its auth key matches the
value of reissue_authkey from the state Secret. This is to ensure we
don't reissue auth keys in a tight loop if the proxy is slow to start or
failing for some other reason. The reissue logic also uses a burstable
rate limiter to ensure there's no way a terminally misconfigured
or buggy operator can automatically generate new auth keys in a tight loop.
Additional implementation details (ChaosInTheCRD):
- Added `ipn.NotifyInitialHealthState` to ipn watcher, to ensure that
`n.Health` is populated when notify's are returned.
- on auth failure, containerboot:
- Disconnects from control server
- Sets reissue_authkey marker in state Secret with the failing key
- Polls config file for new auth key (10 minute timeout)
- Restarts after receiving new key to apply it
- modified operator's reissue logic slightly:
- Deletes old device from tailnet before creating new key
- Rate limiting: 1 key per 30s with initial burst equal to replica count
- In-flight tracking (authKeyReissuing map) prevents duplicate API calls
across reconcile loops
Updates #14080
Change-Id: I6982f8e741932a6891f2f48a2936f7f6a455317f
(cherry picked from commit 969927c47c3d4de05e90f5b26a6d8d931c5ceed4)
Signed-off-by: Tom Proctor <tomhjp@users.noreply.github.com>
Co-authored-by: chaosinthecrd <tom@tmlabs.co.uk>
This commit is contained in:
+166
-50
@@ -16,10 +16,12 @@ import (
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
dockerref "github.com/distribution/reference"
|
||||
"go.uber.org/zap"
|
||||
xslices "golang.org/x/exp/slices"
|
||||
"golang.org/x/time/rate"
|
||||
appsv1 "k8s.io/api/apps/v1"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
rbacv1 "k8s.io/api/rbac/v1"
|
||||
@@ -94,10 +96,12 @@ type ProxyGroupReconciler struct {
|
||||
defaultProxyClass string
|
||||
loginServer string
|
||||
|
||||
mu sync.Mutex // protects following
|
||||
egressProxyGroups set.Slice[types.UID] // for egress proxygroups gauge
|
||||
ingressProxyGroups set.Slice[types.UID] // for ingress proxygroups gauge
|
||||
apiServerProxyGroups set.Slice[types.UID] // for kube-apiserver proxygroups gauge
|
||||
mu sync.Mutex // protects following
|
||||
egressProxyGroups set.Slice[types.UID] // for egress proxygroups gauge
|
||||
ingressProxyGroups set.Slice[types.UID] // for ingress proxygroups gauge
|
||||
apiServerProxyGroups set.Slice[types.UID] // for kube-apiserver proxygroups gauge
|
||||
authKeyRateLimits map[string]*rate.Limiter // per-ProxyGroup rate limiters for auth key re-issuance.
|
||||
authKeyReissuing map[string]bool
|
||||
}
|
||||
|
||||
func (r *ProxyGroupReconciler) logger(name string) *zap.SugaredLogger {
|
||||
@@ -294,7 +298,7 @@ func (r *ProxyGroupReconciler) validate(ctx context.Context, pg *tsapi.ProxyGrou
|
||||
func (r *ProxyGroupReconciler) maybeProvision(ctx context.Context, tailscaleClient tsClient, loginUrl string, pg *tsapi.ProxyGroup, proxyClass *tsapi.ProxyClass) (map[string][]netip.AddrPort, *notReadyReason, error) {
|
||||
logger := r.logger(pg.Name)
|
||||
r.mu.Lock()
|
||||
r.ensureAddedToGaugeForProxyGroup(pg)
|
||||
r.ensureStateAddedForProxyGroup(pg)
|
||||
r.mu.Unlock()
|
||||
|
||||
svcToNodePorts := make(map[string]uint16)
|
||||
@@ -629,13 +633,13 @@ func (r *ProxyGroupReconciler) cleanupDanglingResources(ctx context.Context, tai
|
||||
}
|
||||
|
||||
for _, m := range metadata {
|
||||
if m.ordinal+1 <= int(pgReplicas(pg)) {
|
||||
if m.ordinal+1 <= pgReplicas(pg) {
|
||||
continue
|
||||
}
|
||||
|
||||
// Dangling resource, delete the config + state Secrets, as well as
|
||||
// deleting the device from the tailnet.
|
||||
if err := r.deleteTailnetDevice(ctx, tailscaleClient, m.tsID, logger); err != nil {
|
||||
if err := r.ensureDeviceDeleted(ctx, tailscaleClient, m.tsID, logger); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := r.Delete(ctx, m.stateSecret); err != nil && !apierrors.IsNotFound(err) {
|
||||
@@ -687,7 +691,7 @@ func (r *ProxyGroupReconciler) maybeCleanup(ctx context.Context, tailscaleClient
|
||||
}
|
||||
|
||||
for _, m := range metadata {
|
||||
if err := r.deleteTailnetDevice(ctx, tailscaleClient, m.tsID, logger); err != nil {
|
||||
if err := r.ensureDeviceDeleted(ctx, tailscaleClient, m.tsID, logger); err != nil {
|
||||
return false, err
|
||||
}
|
||||
}
|
||||
@@ -703,12 +707,12 @@ func (r *ProxyGroupReconciler) maybeCleanup(ctx context.Context, tailscaleClient
|
||||
|
||||
logger.Infof("cleaned up ProxyGroup resources")
|
||||
r.mu.Lock()
|
||||
r.ensureRemovedFromGaugeForProxyGroup(pg)
|
||||
r.ensureStateRemovedForProxyGroup(pg)
|
||||
r.mu.Unlock()
|
||||
return true, nil
|
||||
}
|
||||
|
||||
func (r *ProxyGroupReconciler) deleteTailnetDevice(ctx context.Context, tailscaleClient tsClient, id tailcfg.StableNodeID, logger *zap.SugaredLogger) error {
|
||||
func (r *ProxyGroupReconciler) ensureDeviceDeleted(ctx context.Context, tailscaleClient tsClient, id tailcfg.StableNodeID, logger *zap.SugaredLogger) error {
|
||||
logger.Debugf("deleting device %s from control", string(id))
|
||||
if err := tailscaleClient.DeleteDevice(ctx, string(id)); err != nil {
|
||||
if errResp, ok := errors.AsType[tailscale.ErrResponse](err); ok && errResp.Status == http.StatusNotFound {
|
||||
@@ -734,6 +738,7 @@ func (r *ProxyGroupReconciler) ensureConfigSecretsCreated(
|
||||
logger := r.logger(pg.Name)
|
||||
endpoints = make(map[string][]netip.AddrPort, pgReplicas(pg)) // keyed by Service name.
|
||||
for i := range pgReplicas(pg) {
|
||||
logger = logger.With("Pod", fmt.Sprintf("%s-%d", pg.Name, i))
|
||||
cfgSecret := &corev1.Secret{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: pgConfigSecretName(pg.Name, i),
|
||||
@@ -751,38 +756,9 @@ func (r *ProxyGroupReconciler) ensureConfigSecretsCreated(
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var authKey *string
|
||||
if existingCfgSecret == nil {
|
||||
logger.Debugf("Creating authkey for new ProxyGroup proxy")
|
||||
tags := pg.Spec.Tags.Stringify()
|
||||
if len(tags) == 0 {
|
||||
tags = r.defaultTags
|
||||
}
|
||||
key, err := newAuthKey(ctx, tailscaleClient, tags)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
authKey = &key
|
||||
}
|
||||
|
||||
if authKey == nil {
|
||||
// Get state Secret to check if it's already authed.
|
||||
stateSecret := &corev1.Secret{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: pgStateSecretName(pg.Name, i),
|
||||
Namespace: r.tsNamespace,
|
||||
},
|
||||
}
|
||||
if err = r.Get(ctx, client.ObjectKeyFromObject(stateSecret), stateSecret); err != nil && !apierrors.IsNotFound(err) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if shouldRetainAuthKey(stateSecret) && existingCfgSecret != nil {
|
||||
authKey, err = authKeyFromSecret(existingCfgSecret)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error retrieving auth key from existing config Secret: %w", err)
|
||||
}
|
||||
}
|
||||
authKey, err := r.getAuthKey(ctx, tailscaleClient, pg, existingCfgSecret, i, logger)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
nodePortSvcName := pgNodePortServiceName(pg.Name, i)
|
||||
@@ -918,11 +894,137 @@ func (r *ProxyGroupReconciler) ensureConfigSecretsCreated(
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
return endpoints, nil
|
||||
}
|
||||
|
||||
// getAuthKey returns an auth key for the proxy, or nil if none is needed.
|
||||
// A new key is created if the config Secret doesn't exist yet, or if the
|
||||
// proxy has requested a reissue via its state Secret. An existing key is
|
||||
// retained while the device hasn't authed or a reissue is in progress.
|
||||
func (r *ProxyGroupReconciler) getAuthKey(ctx context.Context, tailscaleClient tsClient, pg *tsapi.ProxyGroup, existingCfgSecret *corev1.Secret, ordinal int32, logger *zap.SugaredLogger) (*string, error) {
|
||||
// Get state Secret to check if it's already authed or has requested
|
||||
// a fresh auth key.
|
||||
stateSecret := &corev1.Secret{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: pgStateSecretName(pg.Name, ordinal),
|
||||
Namespace: r.tsNamespace,
|
||||
},
|
||||
}
|
||||
if err := r.Get(ctx, client.ObjectKeyFromObject(stateSecret), stateSecret); err != nil && !apierrors.IsNotFound(err) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var createAuthKey bool
|
||||
var cfgAuthKey *string
|
||||
if existingCfgSecret == nil {
|
||||
createAuthKey = true
|
||||
} else {
|
||||
var err error
|
||||
cfgAuthKey, err = authKeyFromSecret(existingCfgSecret)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error retrieving auth key from existing config Secret: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
if !createAuthKey {
|
||||
var err error
|
||||
createAuthKey, err = r.shouldReissueAuthKey(ctx, tailscaleClient, pg, stateSecret, cfgAuthKey)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
var authKey *string
|
||||
if createAuthKey {
|
||||
logger.Debugf("creating auth key for ProxyGroup proxy %q", stateSecret.Name)
|
||||
|
||||
tags := pg.Spec.Tags.Stringify()
|
||||
if len(tags) == 0 {
|
||||
tags = r.defaultTags
|
||||
}
|
||||
key, err := newAuthKey(ctx, tailscaleClient, tags)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
authKey = &key
|
||||
} else {
|
||||
// Retain auth key if the device hasn't authed yet, or if a
|
||||
// reissue is in progress (device_id is stale during reissue).
|
||||
_, reissueRequested := stateSecret.Data[kubetypes.KeyReissueAuthkey]
|
||||
if !deviceAuthed(stateSecret) || reissueRequested {
|
||||
authKey = cfgAuthKey
|
||||
}
|
||||
}
|
||||
|
||||
return authKey, nil
|
||||
}
|
||||
|
||||
// shouldReissueAuthKey returns true if the proxy needs a new auth key. It
|
||||
// tracks in-flight reissues via authKeyReissuing to avoid duplicate API calls
|
||||
// across reconciles.
|
||||
func (r *ProxyGroupReconciler) shouldReissueAuthKey(ctx context.Context, tailscaleClient tsClient, pg *tsapi.ProxyGroup, stateSecret *corev1.Secret, cfgAuthKey *string) (shouldReissue bool, err error) {
|
||||
r.mu.Lock()
|
||||
reissuing := r.authKeyReissuing[stateSecret.Name]
|
||||
r.mu.Unlock()
|
||||
|
||||
if reissuing {
|
||||
// Check if reissue is complete by seeing if request was cleared
|
||||
_, requestStillPresent := stateSecret.Data[kubetypes.KeyReissueAuthkey]
|
||||
if !requestStillPresent {
|
||||
// Containerboot cleared the request, reissue is complete
|
||||
r.mu.Lock()
|
||||
r.authKeyReissuing[stateSecret.Name] = false
|
||||
r.mu.Unlock()
|
||||
r.log.Debugf("auth key reissue completed for %q", stateSecret.Name)
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// Reissue still in-flight; waiting for containerboot to pick up new key
|
||||
r.log.Debugf("auth key already in process of re-issuance, waiting for secret to be updated")
|
||||
return false, nil
|
||||
}
|
||||
|
||||
defer func() {
|
||||
r.mu.Lock()
|
||||
r.authKeyReissuing[stateSecret.Name] = shouldReissue
|
||||
r.mu.Unlock()
|
||||
}()
|
||||
|
||||
brokenAuthkey, ok := stateSecret.Data[kubetypes.KeyReissueAuthkey]
|
||||
if !ok {
|
||||
// reissue hasn't been requested since the key in the secret hasn't been populated
|
||||
return false, nil
|
||||
}
|
||||
|
||||
empty := cfgAuthKey == nil || *cfgAuthKey == ""
|
||||
broken := cfgAuthKey != nil && *cfgAuthKey == string(brokenAuthkey)
|
||||
|
||||
// A new key has been written but the proxy hasn't picked it up yet.
|
||||
if !empty && !broken {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
lim := r.authKeyRateLimits[pg.Name]
|
||||
if !lim.Allow() {
|
||||
r.log.Debugf("auth key re-issuance rate limit exceeded, limit: %.2f, burst: %d, tokens: %.2f",
|
||||
lim.Limit(), lim.Burst(), lim.Tokens())
|
||||
return false, fmt.Errorf("auth key re-issuance rate limit exceeded for ProxyGroup %q, will retry with backoff", pg.Name)
|
||||
}
|
||||
|
||||
r.log.Infof("Proxy failing to auth; attempting cleanup and new key")
|
||||
if tsID := stateSecret.Data[kubetypes.KeyDeviceID]; len(tsID) > 0 {
|
||||
id := tailcfg.StableNodeID(tsID)
|
||||
if err := r.ensureDeviceDeleted(ctx, tailscaleClient, id, r.log); err != nil {
|
||||
return false, err
|
||||
}
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}
|
||||
|
||||
type FindStaticEndpointErr struct {
|
||||
msg string
|
||||
}
|
||||
@@ -1016,9 +1118,9 @@ func getStaticEndpointAddress(a *corev1.NodeAddress, port uint16) *netip.AddrPor
|
||||
return new(netip.AddrPortFrom(addr, port))
|
||||
}
|
||||
|
||||
// ensureAddedToGaugeForProxyGroup ensures the gauge metric for the ProxyGroup resource is updated when the ProxyGroup
|
||||
// is created. r.mu must be held.
|
||||
func (r *ProxyGroupReconciler) ensureAddedToGaugeForProxyGroup(pg *tsapi.ProxyGroup) {
|
||||
// ensureStateAddedForProxyGroup ensures the gauge metric for the ProxyGroup resource is updated when the ProxyGroup
|
||||
// is created, and initialises per-ProxyGroup rate limits on re-issuing auth keys. r.mu must be held.
|
||||
func (r *ProxyGroupReconciler) ensureStateAddedForProxyGroup(pg *tsapi.ProxyGroup) {
|
||||
switch pg.Spec.Type {
|
||||
case tsapi.ProxyGroupTypeEgress:
|
||||
r.egressProxyGroups.Add(pg.UID)
|
||||
@@ -1030,11 +1132,24 @@ func (r *ProxyGroupReconciler) ensureAddedToGaugeForProxyGroup(pg *tsapi.ProxyGr
|
||||
gaugeEgressProxyGroupResources.Set(int64(r.egressProxyGroups.Len()))
|
||||
gaugeIngressProxyGroupResources.Set(int64(r.ingressProxyGroups.Len()))
|
||||
gaugeAPIServerProxyGroupResources.Set(int64(r.apiServerProxyGroups.Len()))
|
||||
|
||||
if _, ok := r.authKeyRateLimits[pg.Name]; !ok {
|
||||
// Allow every replica to have its auth key re-issued quickly the first
|
||||
// time, but with an overall limit of 1 every 30s after a burst.
|
||||
r.authKeyRateLimits[pg.Name] = rate.NewLimiter(rate.Every(30*time.Second), int(pgReplicas(pg)))
|
||||
}
|
||||
|
||||
for i := range pgReplicas(pg) {
|
||||
rep := pgStateSecretName(pg.Name, i)
|
||||
if _, ok := r.authKeyReissuing[rep]; !ok {
|
||||
r.authKeyReissuing[rep] = false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ensureRemovedFromGaugeForProxyGroup ensures the gauge metric for the ProxyGroup resource type is updated when the
|
||||
// ProxyGroup is deleted. r.mu must be held.
|
||||
func (r *ProxyGroupReconciler) ensureRemovedFromGaugeForProxyGroup(pg *tsapi.ProxyGroup) {
|
||||
// ensureStateRemovedForProxyGroup ensures the gauge metric for the ProxyGroup resource type is updated when the
|
||||
// ProxyGroup is deleted, and deletes the per-ProxyGroup rate limiter to free memory. r.mu must be held.
|
||||
func (r *ProxyGroupReconciler) ensureStateRemovedForProxyGroup(pg *tsapi.ProxyGroup) {
|
||||
switch pg.Spec.Type {
|
||||
case tsapi.ProxyGroupTypeEgress:
|
||||
r.egressProxyGroups.Remove(pg.UID)
|
||||
@@ -1046,6 +1161,7 @@ func (r *ProxyGroupReconciler) ensureRemovedFromGaugeForProxyGroup(pg *tsapi.Pro
|
||||
gaugeEgressProxyGroupResources.Set(int64(r.egressProxyGroups.Len()))
|
||||
gaugeIngressProxyGroupResources.Set(int64(r.ingressProxyGroups.Len()))
|
||||
gaugeAPIServerProxyGroupResources.Set(int64(r.apiServerProxyGroups.Len()))
|
||||
delete(r.authKeyRateLimits, pg.Name)
|
||||
}
|
||||
|
||||
func pgTailscaledConfig(pg *tsapi.ProxyGroup, loginServer string, pc *tsapi.ProxyClass, idx int32, authKey *string, staticEndpoints []netip.AddrPort, oldAdvertiseServices []string) (tailscaledConfigs, error) {
|
||||
@@ -1106,7 +1222,7 @@ func getNodeMetadata(ctx context.Context, pg *tsapi.ProxyGroup, cl client.Client
|
||||
return nil, fmt.Errorf("failed to list state Secrets: %w", err)
|
||||
}
|
||||
for _, secret := range secrets.Items {
|
||||
var ordinal int
|
||||
var ordinal int32
|
||||
if _, err := fmt.Sscanf(secret.Name, pg.Name+"-%d", &ordinal); err != nil {
|
||||
return nil, fmt.Errorf("unexpected secret %s was labelled as owned by the ProxyGroup %s: %w", secret.Name, pg.Name, err)
|
||||
}
|
||||
@@ -1213,7 +1329,7 @@ func (r *ProxyGroupReconciler) getClientAndLoginURL(ctx context.Context, tailnet
|
||||
}
|
||||
|
||||
type nodeMetadata struct {
|
||||
ordinal int
|
||||
ordinal int32
|
||||
stateSecret *corev1.Secret
|
||||
podUID string // or empty if the Pod no longer exists.
|
||||
tsID tailcfg.StableNodeID
|
||||
|
||||
Reference in New Issue
Block a user