cmd/k8s-operator: allow specifying replicas for connectors (#16721)

This commit adds a `replicas` field to the `Connector` custom resource that
allows users to specify the number of desired replicas deployed for their
connectors.

This allows users to deploy exit nodes, subnet routers and app connectors
in a highly available fashion.

Fixes #14020

Signed-off-by: David Bond <davidsbond93@gmail.com>
This commit is contained in:
David Bond
2025-09-02 13:10:03 +01:00
committed by GitHub
parent d05e6dc09e
commit 12ad630128
13 changed files with 665 additions and 202 deletions
+229 -133
View File
@@ -13,6 +13,7 @@ import (
"fmt"
"net/http"
"os"
"path"
"slices"
"strconv"
"strings"
@@ -20,6 +21,7 @@ import (
"go.uber.org/zap"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
@@ -114,6 +116,7 @@ var (
)
type tailscaleSTSConfig struct {
Replicas int32
ParentResourceName string
ParentResourceUID string
ChildResourceLabels map[string]string
@@ -144,6 +147,10 @@ type tailscaleSTSConfig struct {
// LoginServer denotes the URL of the control plane that should be used by the proxy.
LoginServer string
// HostnamePrefix specifies the desired prefix for the device's hostname. The hostname will be suffixed with the
// ordinal number generated by the StatefulSet.
HostnamePrefix string
}
type connector struct {
@@ -205,11 +212,12 @@ func (a *tailscaleSTSReconciler) Provision(ctx context.Context, logger *zap.Suga
}
sts.ProxyClass = proxyClass
secretName, _, err := a.createOrGetSecret(ctx, logger, sts, hsvc)
secretNames, err := a.provisionSecrets(ctx, logger, sts, hsvc)
if err != nil {
return nil, fmt.Errorf("failed to create or get API key secret: %w", err)
}
_, err = a.reconcileSTS(ctx, logger, sts, hsvc, secretName)
_, err = a.reconcileSTS(ctx, logger, sts, hsvc, secretNames)
if err != nil {
return nil, fmt.Errorf("failed to reconcile statefulset: %w", err)
}
@@ -239,6 +247,7 @@ func (a *tailscaleSTSReconciler) Cleanup(ctx context.Context, logger *zap.Sugare
if err != nil {
return false, fmt.Errorf("getting statefulset: %w", err)
}
if sts != nil {
if !sts.GetDeletionTimestamp().IsZero() {
// Deletion in progress, check again later. We'll get another
@@ -246,29 +255,39 @@ func (a *tailscaleSTSReconciler) Cleanup(ctx context.Context, logger *zap.Sugare
logger.Debugf("waiting for statefulset %s/%s deletion", sts.GetNamespace(), sts.GetName())
return false, nil
}
err := a.DeleteAllOf(ctx, &appsv1.StatefulSet{}, client.InNamespace(a.operatorNamespace), client.MatchingLabels(labels), client.PropagationPolicy(metav1.DeletePropagationForeground))
if err != nil {
options := []client.DeleteAllOfOption{
client.InNamespace(a.operatorNamespace),
client.MatchingLabels(labels),
client.PropagationPolicy(metav1.DeletePropagationForeground),
}
if err = a.DeleteAllOf(ctx, &appsv1.StatefulSet{}, options...); err != nil {
return false, fmt.Errorf("deleting statefulset: %w", err)
}
logger.Debugf("started deletion of statefulset %s/%s", sts.GetNamespace(), sts.GetName())
return false, nil
}
dev, err := a.DeviceInfo(ctx, labels, logger)
devices, err := a.DeviceInfo(ctx, labels, logger)
if err != nil {
return false, fmt.Errorf("getting device info: %w", err)
}
if dev != nil && dev.id != "" {
logger.Debugf("deleting device %s from control", string(dev.id))
if err := a.tsClient.DeleteDevice(ctx, string(dev.id)); err != nil {
errResp := &tailscale.ErrResponse{}
if ok := errors.As(err, errResp); ok && errResp.Status == http.StatusNotFound {
logger.Debugf("device %s not found, likely because it has already been deleted from control", string(dev.id))
for _, dev := range devices {
if dev.id != "" {
logger.Debugf("deleting device %s from control", string(dev.id))
if err = a.tsClient.DeleteDevice(ctx, string(dev.id)); err != nil {
errResp := &tailscale.ErrResponse{}
if ok := errors.As(err, errResp); ok && errResp.Status == http.StatusNotFound {
logger.Debugf("device %s not found, likely because it has already been deleted from control", string(dev.id))
} else {
return false, fmt.Errorf("deleting device: %w", err)
}
} else {
return false, fmt.Errorf("deleting device: %w", err)
logger.Debugf("device %s deleted from control", string(dev.id))
}
} else {
logger.Debugf("device %s deleted from control", string(dev.id))
}
}
@@ -286,9 +305,10 @@ func (a *tailscaleSTSReconciler) Cleanup(ctx context.Context, logger *zap.Sugare
tsNamespace: a.operatorNamespace,
proxyType: typ,
}
if err := maybeCleanupMetricsResources(ctx, mo, a.Client); err != nil {
if err = maybeCleanupMetricsResources(ctx, mo, a.Client); err != nil {
return false, fmt.Errorf("error cleaning up metrics resources: %w", err)
}
return true, nil
}
@@ -339,91 +359,139 @@ func (a *tailscaleSTSReconciler) reconcileHeadlessService(ctx context.Context, l
return createOrUpdate(ctx, a.Client, a.operatorNamespace, hsvc, func(svc *corev1.Service) { svc.Spec = hsvc.Spec })
}
func (a *tailscaleSTSReconciler) createOrGetSecret(ctx context.Context, logger *zap.SugaredLogger, stsC *tailscaleSTSConfig, hsvc *corev1.Service) (secretName string, configs tailscaledConfigs, _ error) {
secret := &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
// Hardcode a -0 suffix so that in future, if we support
// multiple StatefulSet replicas, we can provision -N for
// those.
Name: hsvc.Name + "-0",
Namespace: a.operatorNamespace,
Labels: stsC.ChildResourceLabels,
},
}
var orig *corev1.Secret // unmodified copy of secret
if err := a.Get(ctx, client.ObjectKeyFromObject(secret), secret); err == nil {
logger.Debugf("secret %s/%s already exists", secret.GetNamespace(), secret.GetName())
orig = secret.DeepCopy()
} else if !apierrors.IsNotFound(err) {
return "", nil, err
}
func (a *tailscaleSTSReconciler) provisionSecrets(ctx context.Context, logger *zap.SugaredLogger, stsC *tailscaleSTSConfig, hsvc *corev1.Service) ([]string, error) {
secretNames := make([]string, stsC.Replicas)
var authKey string
if orig == nil {
// Initially it contains only tailscaled config, but when the
// proxy starts, it will also store there the state, certs and
// ACME account key.
sts, err := getSingleObject[appsv1.StatefulSet](ctx, a.Client, a.operatorNamespace, stsC.ChildResourceLabels)
// Start by ensuring we have Secrets for the desired number of replicas. This will handle both creating and scaling
// up a StatefulSet.
for i := range stsC.Replicas {
secret := &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%s-%d", hsvc.Name, i),
Namespace: a.operatorNamespace,
Labels: stsC.ChildResourceLabels,
},
}
// If we only have a single replica, use the hostname verbatim. Otherwise, use the hostname prefix and add
// an ordinal suffix.
hostname := stsC.Hostname
if stsC.HostnamePrefix != "" {
hostname = fmt.Sprintf("%s-%d", stsC.HostnamePrefix, i)
}
secretNames[i] = secret.Name
var orig *corev1.Secret // unmodified copy of secret
if err := a.Get(ctx, client.ObjectKeyFromObject(secret), secret); err == nil {
logger.Debugf("secret %s/%s already exists", secret.GetNamespace(), secret.GetName())
orig = secret.DeepCopy()
} else if !apierrors.IsNotFound(err) {
return nil, err
}
var (
authKey string
err error
)
if orig == nil {
// Create API Key secret which is going to be used by the statefulset
// to authenticate with Tailscale.
logger.Debugf("creating authkey for new tailscale proxy")
tags := stsC.Tags
if len(tags) == 0 {
tags = a.defaultTags
}
authKey, err = newAuthKey(ctx, a.tsClient, tags)
if err != nil {
return nil, err
}
}
configs, err := tailscaledConfig(stsC, authKey, orig, hostname)
if err != nil {
return "", nil, err
return nil, fmt.Errorf("error creating tailscaled config: %w", err)
}
if sts != nil {
// StatefulSet exists, so we have already created the secret.
// If the secret is missing, they should delete the StatefulSet.
logger.Errorf("Tailscale proxy secret doesn't exist, but the corresponding StatefulSet %s/%s already does. Something is wrong, please delete the StatefulSet.", sts.GetNamespace(), sts.GetName())
return "", nil, nil
latest := tailcfg.CapabilityVersion(-1)
var latestConfig ipn.ConfigVAlpha
for key, val := range configs {
fn := tsoperator.TailscaledConfigFileName(key)
b, err := json.Marshal(val)
if err != nil {
return nil, fmt.Errorf("error marshalling tailscaled config: %w", err)
}
mak.Set(&secret.StringData, fn, string(b))
if key > latest {
latest = key
latestConfig = val
}
}
// Create API Key secret which is going to be used by the statefulset
// to authenticate with Tailscale.
logger.Debugf("creating authkey for new tailscale proxy")
tags := stsC.Tags
if len(tags) == 0 {
tags = a.defaultTags
if stsC.ServeConfig != nil {
j, err := json.Marshal(stsC.ServeConfig)
if err != nil {
return nil, err
}
mak.Set(&secret.StringData, "serve-config", string(j))
}
authKey, err = newAuthKey(ctx, a.tsClient, tags)
if err != nil {
return "", nil, err
}
}
configs, err := tailscaledConfig(stsC, authKey, orig)
if err != nil {
return "", nil, fmt.Errorf("error creating tailscaled config: %w", err)
}
latest := tailcfg.CapabilityVersion(-1)
var latestConfig ipn.ConfigVAlpha
for key, val := range configs {
fn := tsoperator.TailscaledConfigFileName(key)
b, err := json.Marshal(val)
if err != nil {
return "", nil, fmt.Errorf("error marshalling tailscaled config: %w", err)
}
mak.Set(&secret.StringData, fn, string(b))
if key > latest {
latest = key
latestConfig = val
if orig != nil && !apiequality.Semantic.DeepEqual(latest, orig) {
logger.Debugf("patching the existing proxy Secret with tailscaled config %s", sanitizeConfigBytes(latestConfig))
if err = a.Patch(ctx, secret, client.MergeFrom(orig)); err != nil {
return nil, err
}
} else {
logger.Debugf("creating a new Secret for the proxy with tailscaled config %s", sanitizeConfigBytes(latestConfig))
if err = a.Create(ctx, secret); err != nil {
return nil, err
}
}
}
if stsC.ServeConfig != nil {
j, err := json.Marshal(stsC.ServeConfig)
if err != nil {
return "", nil, err
}
mak.Set(&secret.StringData, "serve-config", string(j))
// Next, we check if we have additional secrets and remove them and their associated device. This happens when we
// scale an StatefulSet down.
var secrets corev1.SecretList
if err := a.List(ctx, &secrets, client.InNamespace(a.operatorNamespace), client.MatchingLabels(stsC.ChildResourceLabels)); err != nil {
return nil, err
}
if orig != nil {
logger.Debugf("patching the existing proxy Secret with tailscaled config %s", sanitizeConfigBytes(latestConfig))
if err := a.Patch(ctx, secret, client.MergeFrom(orig)); err != nil {
return "", nil, err
for _, secret := range secrets.Items {
var ordinal int32
if _, err := fmt.Sscanf(secret.Name, hsvc.Name+"-%d", &ordinal); err != nil {
return nil, err
}
} else {
logger.Debugf("creating a new Secret for the proxy with tailscaled config %s", sanitizeConfigBytes(latestConfig))
if err := a.Create(ctx, secret); err != nil {
return "", nil, err
if ordinal < stsC.Replicas {
continue
}
dev, err := deviceInfo(&secret, "", logger)
if err != nil {
return nil, err
}
if dev != nil && dev.id != "" {
var errResp *tailscale.ErrResponse
err = a.tsClient.DeleteDevice(ctx, string(dev.id))
switch {
case errors.As(err, &errResp) && errResp.Status == http.StatusNotFound:
// This device has possibly already been deleted in the admin console. So we can ignore this
// and move on to removing the secret.
case err != nil:
return nil, err
}
}
if err = a.Delete(ctx, &secret); err != nil {
return nil, err
}
}
return secret.Name, configs, nil
return secretNames, nil
}
// sanitizeConfigBytes returns ipn.ConfigVAlpha in string form with redacted
@@ -443,22 +511,38 @@ func sanitizeConfigBytes(c ipn.ConfigVAlpha) string {
// It retrieves info from a Kubernetes Secret labeled with the provided labels. Capver is cross-validated against the
// Pod to ensure that it is the currently running Pod that set the capver. If the Pod or the Secret does not exist, the
// returned capver is -1. Either of device ID, hostname and IPs can be empty string if not found in the Secret.
func (a *tailscaleSTSReconciler) DeviceInfo(ctx context.Context, childLabels map[string]string, logger *zap.SugaredLogger) (dev *device, err error) {
sec, err := getSingleObject[corev1.Secret](ctx, a.Client, a.operatorNamespace, childLabels)
if err != nil {
return dev, err
func (a *tailscaleSTSReconciler) DeviceInfo(ctx context.Context, childLabels map[string]string, logger *zap.SugaredLogger) ([]*device, error) {
var secrets corev1.SecretList
if err := a.List(ctx, &secrets, client.InNamespace(a.operatorNamespace), client.MatchingLabels(childLabels)); err != nil {
return nil, err
}
if sec == nil {
return dev, nil
devices := make([]*device, 0)
for _, sec := range secrets.Items {
podUID := ""
pod := new(corev1.Pod)
err := a.Get(ctx, types.NamespacedName{Namespace: sec.Namespace, Name: sec.Name}, pod)
switch {
case apierrors.IsNotFound(err):
// If the Pod is not found, we won't have its UID. We can still get the device information but the
// capability version will be unknown.
case err != nil:
return nil, err
default:
podUID = string(pod.ObjectMeta.UID)
}
info, err := deviceInfo(&sec, podUID, logger)
if err != nil {
return nil, err
}
if info != nil {
devices = append(devices, info)
}
}
podUID := ""
pod := new(corev1.Pod)
if err := a.Get(ctx, types.NamespacedName{Namespace: sec.Namespace, Name: sec.Name}, pod); err != nil && !apierrors.IsNotFound(err) {
return dev, err
} else if err == nil {
podUID = string(pod.ObjectMeta.UID)
}
return deviceInfo(sec, podUID, logger)
return devices, nil
}
// device contains tailscale state of a proxy device as gathered from its tailscale state Secret.
@@ -534,7 +618,7 @@ var proxyYaml []byte
//go:embed deploy/manifests/userspace-proxy.yaml
var userspaceProxyYaml []byte
func (a *tailscaleSTSReconciler) reconcileSTS(ctx context.Context, logger *zap.SugaredLogger, sts *tailscaleSTSConfig, headlessSvc *corev1.Service, proxySecret string) (*appsv1.StatefulSet, error) {
func (a *tailscaleSTSReconciler) reconcileSTS(ctx context.Context, logger *zap.SugaredLogger, sts *tailscaleSTSConfig, headlessSvc *corev1.Service, proxySecrets []string) (*appsv1.StatefulSet, error) {
ss := new(appsv1.StatefulSet)
if sts.ServeConfig != nil && sts.ForwardClusterTrafficViaL7IngressProxy != true { // If forwarding cluster traffic via is required we need non-userspace + NET_ADMIN + forwarding
if err := yaml.Unmarshal(userspaceProxyYaml, &ss); err != nil {
@@ -573,18 +657,22 @@ func (a *tailscaleSTSReconciler) reconcileSTS(ctx context.Context, logger *zap.S
pod.Labels[key] = val // sync StatefulSet labels to Pod to make it easier for users to select the Pod
}
if sts.Replicas > 0 {
ss.Spec.Replicas = ptr.To(sts.Replicas)
}
// Generic containerboot configuration options.
container.Env = append(container.Env,
corev1.EnvVar{
Name: "TS_KUBE_SECRET",
Value: proxySecret,
Value: "$(POD_NAME)",
},
corev1.EnvVar{
// New style is in the form of cap-<capability-version>.hujson.
Name: "TS_EXPERIMENTAL_VERSIONED_CONFIG_DIR",
Value: "/etc/tsconfig",
Value: "/etc/tsconfig/$(POD_NAME)",
},
)
if sts.ForwardClusterTrafficViaL7IngressProxy {
container.Env = append(container.Env, corev1.EnvVar{
Name: "EXPERIMENTAL_ALLOW_PROXYING_CLUSTER_TRAFFIC_VIA_INGRESS",
@@ -592,20 +680,23 @@ func (a *tailscaleSTSReconciler) reconcileSTS(ctx context.Context, logger *zap.S
})
}
configVolume := corev1.Volume{
Name: "tailscaledconfig",
VolumeSource: corev1.VolumeSource{
Secret: &corev1.SecretVolumeSource{
SecretName: proxySecret,
for i, secret := range proxySecrets {
configVolume := corev1.Volume{
Name: "tailscaledconfig-" + strconv.Itoa(i),
VolumeSource: corev1.VolumeSource{
Secret: &corev1.SecretVolumeSource{
SecretName: secret,
},
},
},
}
pod.Spec.Volumes = append(ss.Spec.Template.Spec.Volumes, configVolume)
container.VolumeMounts = append(container.VolumeMounts, corev1.VolumeMount{
Name: fmt.Sprintf("tailscaledconfig-%d", i),
ReadOnly: true,
MountPath: path.Join("/etc/tsconfig/", secret),
})
}
pod.Spec.Volumes = append(ss.Spec.Template.Spec.Volumes, configVolume)
container.VolumeMounts = append(container.VolumeMounts, corev1.VolumeMount{
Name: "tailscaledconfig",
ReadOnly: true,
MountPath: "/etc/tsconfig",
})
if a.tsFirewallMode != "" {
container.Env = append(container.Env, corev1.EnvVar{
@@ -643,22 +734,27 @@ func (a *tailscaleSTSReconciler) reconcileSTS(ctx context.Context, logger *zap.S
} else if sts.ServeConfig != nil {
container.Env = append(container.Env, corev1.EnvVar{
Name: "TS_SERVE_CONFIG",
Value: "/etc/tailscaled/serve-config",
Value: "/etc/tailscaled/$(POD_NAME)/serve-config",
})
container.VolumeMounts = append(container.VolumeMounts, corev1.VolumeMount{
Name: "serve-config",
ReadOnly: true,
MountPath: "/etc/tailscaled",
})
pod.Spec.Volumes = append(ss.Spec.Template.Spec.Volumes, corev1.Volume{
Name: "serve-config",
VolumeSource: corev1.VolumeSource{
Secret: &corev1.SecretVolumeSource{
SecretName: proxySecret,
Items: []corev1.KeyToPath{{Key: "serve-config", Path: "serve-config"}},
for i, secret := range proxySecrets {
container.VolumeMounts = append(container.VolumeMounts, corev1.VolumeMount{
Name: "serve-config-" + strconv.Itoa(i),
ReadOnly: true,
MountPath: path.Join("/etc/tailscaled", secret),
})
pod.Spec.Volumes = append(ss.Spec.Template.Spec.Volumes, corev1.Volume{
Name: "serve-config-" + strconv.Itoa(i),
VolumeSource: corev1.VolumeSource{
Secret: &corev1.SecretVolumeSource{
SecretName: secret,
Items: []corev1.KeyToPath{{Key: "serve-config", Path: "serve-config"}},
},
},
},
})
})
}
}
app, err := appInfoForProxy(sts)
@@ -918,13 +1014,13 @@ func isMainContainer(c *corev1.Container) bool {
// tailscaledConfig takes a proxy config, a newly generated auth key if generated and a Secret with the previous proxy
// state and auth key and returns tailscaled config files for currently supported proxy versions.
func tailscaledConfig(stsC *tailscaleSTSConfig, newAuthkey string, oldSecret *corev1.Secret) (tailscaledConfigs, error) {
func tailscaledConfig(stsC *tailscaleSTSConfig, newAuthkey string, oldSecret *corev1.Secret, hostname string) (tailscaledConfigs, error) {
conf := &ipn.ConfigVAlpha{
Version: "alpha0",
AcceptDNS: "false",
AcceptRoutes: "false", // AcceptRoutes defaults to true
Locked: "false",
Hostname: &stsC.Hostname,
Hostname: &hostname,
NoStatefulFiltering: "true", // Explicitly enforce default value, see #14216
AppConnector: &ipn.AppConnectorPrefs{Advertise: false},
}