cmd/k8s-operator: add multi replica support for recorders (#17864)

This commit adds the `spec.replicas` field to the `Recorder` custom
resource that allows for a highly available deployment of `tsrecorder`
within a kubernetes cluster.

Many changes were required here as the code hard-coded the assumption
of a single replica. This has required a few loops, similar to what we
do for the `Connector` resource to create auth and state secrets. It
was also required to add a check to remove dangling state and auth
secrets should the recorder be scaled down.

Updates: https://github.com/tailscale/tailscale/issues/17965

Signed-off-by: David Bond <davidsbond93@gmail.com>
This commit is contained in:
David Bond
2025-11-20 11:46:34 +00:00
committed by GitHub
parent 682172ca2d
commit 42a5262016
10 changed files with 380 additions and 151 deletions
+204 -90
View File
@@ -12,6 +12,7 @@ import (
"fmt"
"net/http"
"slices"
"strconv"
"strings"
"sync"
@@ -29,6 +30,7 @@ import (
"k8s.io/client-go/tools/record"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"tailscale.com/client/tailscale"
tsoperator "tailscale.com/k8s-operator"
tsapi "tailscale.com/k8s-operator/apis/v1alpha1"
@@ -69,13 +71,13 @@ func (r *RecorderReconciler) logger(name string) *zap.SugaredLogger {
return r.log.With("Recorder", name)
}
func (r *RecorderReconciler) Reconcile(ctx context.Context, req reconcile.Request) (_ reconcile.Result, err error) {
func (r *RecorderReconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
logger := r.logger(req.Name)
logger.Debugf("starting reconcile")
defer logger.Debugf("reconcile finished")
tsr := new(tsapi.Recorder)
err = r.Get(ctx, req.NamespacedName, tsr)
err := r.Get(ctx, req.NamespacedName, tsr)
if apierrors.IsNotFound(err) {
logger.Debugf("Recorder not found, assuming it was deleted")
return reconcile.Result{}, nil
@@ -98,7 +100,7 @@ func (r *RecorderReconciler) Reconcile(ctx context.Context, req reconcile.Reques
}
tsr.Finalizers = slices.Delete(tsr.Finalizers, ix, ix+1)
if err := r.Update(ctx, tsr); err != nil {
if err = r.Update(ctx, tsr); err != nil {
return reconcile.Result{}, err
}
return reconcile.Result{}, nil
@@ -110,10 +112,11 @@ func (r *RecorderReconciler) Reconcile(ctx context.Context, req reconcile.Reques
if !apiequality.Semantic.DeepEqual(oldTSRStatus, &tsr.Status) {
// An error encountered here should get returned by the Reconcile function.
if updateErr := r.Client.Status().Update(ctx, tsr); updateErr != nil {
err = errors.Join(err, updateErr)
return reconcile.Result{}, errors.Join(err, updateErr)
}
}
return reconcile.Result{}, err
return reconcile.Result{}, nil
}
if !slices.Contains(tsr.Finalizers, FinalizerName) {
@@ -123,12 +126,12 @@ func (r *RecorderReconciler) Reconcile(ctx context.Context, req reconcile.Reques
// operation is underway.
logger.Infof("ensuring Recorder is set up")
tsr.Finalizers = append(tsr.Finalizers, FinalizerName)
if err := r.Update(ctx, tsr); err != nil {
if err = r.Update(ctx, tsr); err != nil {
return setStatusReady(tsr, metav1.ConditionFalse, reasonRecorderCreationFailed, reasonRecorderCreationFailed)
}
}
if err := r.validate(ctx, tsr); err != nil {
if err = r.validate(ctx, tsr); err != nil {
message := fmt.Sprintf("Recorder is invalid: %s", err)
r.recorder.Eventf(tsr, corev1.EventTypeWarning, reasonRecorderInvalid, message)
return setStatusReady(tsr, metav1.ConditionFalse, reasonRecorderInvalid, message)
@@ -160,19 +163,29 @@ func (r *RecorderReconciler) maybeProvision(ctx context.Context, tsr *tsapi.Reco
gaugeRecorderResources.Set(int64(r.recorders.Len()))
r.mu.Unlock()
if err := r.ensureAuthSecretCreated(ctx, tsr); err != nil {
if err := r.ensureAuthSecretsCreated(ctx, tsr); err != nil {
return fmt.Errorf("error creating secrets: %w", err)
}
// State Secret is precreated so we can use the Recorder CR as its owner ref.
sec := tsrStateSecret(tsr, r.tsNamespace)
if _, err := createOrUpdate(ctx, r.Client, r.tsNamespace, sec, func(s *corev1.Secret) {
s.ObjectMeta.Labels = sec.ObjectMeta.Labels
s.ObjectMeta.Annotations = sec.ObjectMeta.Annotations
}); err != nil {
return fmt.Errorf("error creating state Secret: %w", err)
// State Secrets are pre-created so we can use the Recorder CR as its owner ref.
var replicas int32 = 1
if tsr.Spec.Replicas != nil {
replicas = *tsr.Spec.Replicas
}
for replica := range replicas {
sec := tsrStateSecret(tsr, r.tsNamespace, replica)
_, err := createOrUpdate(ctx, r.Client, r.tsNamespace, sec, func(s *corev1.Secret) {
s.ObjectMeta.Labels = sec.ObjectMeta.Labels
s.ObjectMeta.Annotations = sec.ObjectMeta.Annotations
})
if err != nil {
return fmt.Errorf("error creating state Secret %q: %w", sec.Name, err)
}
}
sa := tsrServiceAccount(tsr, r.tsNamespace)
if _, err := createOrMaybeUpdate(ctx, r.Client, r.tsNamespace, sa, func(s *corev1.ServiceAccount) error {
_, err := createOrMaybeUpdate(ctx, r.Client, r.tsNamespace, sa, func(s *corev1.ServiceAccount) error {
// Perform this check within the update function to make sure we don't
// have a race condition between the previous check and the update.
if err := saOwnedByRecorder(s, tsr); err != nil {
@@ -183,54 +196,68 @@ func (r *RecorderReconciler) maybeProvision(ctx context.Context, tsr *tsapi.Reco
s.ObjectMeta.Annotations = sa.ObjectMeta.Annotations
return nil
}); err != nil {
})
if err != nil {
return fmt.Errorf("error creating ServiceAccount: %w", err)
}
role := tsrRole(tsr, r.tsNamespace)
if _, err := createOrUpdate(ctx, r.Client, r.tsNamespace, role, func(r *rbacv1.Role) {
_, err = createOrUpdate(ctx, r.Client, r.tsNamespace, role, func(r *rbacv1.Role) {
r.ObjectMeta.Labels = role.ObjectMeta.Labels
r.ObjectMeta.Annotations = role.ObjectMeta.Annotations
r.Rules = role.Rules
}); err != nil {
})
if err != nil {
return fmt.Errorf("error creating Role: %w", err)
}
roleBinding := tsrRoleBinding(tsr, r.tsNamespace)
if _, err := createOrUpdate(ctx, r.Client, r.tsNamespace, roleBinding, func(r *rbacv1.RoleBinding) {
_, err = createOrUpdate(ctx, r.Client, r.tsNamespace, roleBinding, func(r *rbacv1.RoleBinding) {
r.ObjectMeta.Labels = roleBinding.ObjectMeta.Labels
r.ObjectMeta.Annotations = roleBinding.ObjectMeta.Annotations
r.RoleRef = roleBinding.RoleRef
r.Subjects = roleBinding.Subjects
}); err != nil {
})
if err != nil {
return fmt.Errorf("error creating RoleBinding: %w", err)
}
ss := tsrStatefulSet(tsr, r.tsNamespace, r.loginServer)
if _, err := createOrUpdate(ctx, r.Client, r.tsNamespace, ss, func(s *appsv1.StatefulSet) {
_, err = createOrUpdate(ctx, r.Client, r.tsNamespace, ss, func(s *appsv1.StatefulSet) {
s.ObjectMeta.Labels = ss.ObjectMeta.Labels
s.ObjectMeta.Annotations = ss.ObjectMeta.Annotations
s.Spec = ss.Spec
}); err != nil {
})
if err != nil {
return fmt.Errorf("error creating StatefulSet: %w", err)
}
// ServiceAccount name may have changed, in which case we need to clean up
// the previous ServiceAccount. RoleBinding will already be updated to point
// to the new ServiceAccount.
if err := r.maybeCleanupServiceAccounts(ctx, tsr, sa.Name); err != nil {
if err = r.maybeCleanupServiceAccounts(ctx, tsr, sa.Name); err != nil {
return fmt.Errorf("error cleaning up ServiceAccounts: %w", err)
}
// If we have scaled the recorder down, we will have dangling state secrets
// that we need to clean up.
if err = r.maybeCleanupSecrets(ctx, tsr); err != nil {
return fmt.Errorf("error cleaning up Secrets: %w", err)
}
var devices []tsapi.RecorderTailnetDevice
for replica := range replicas {
dev, ok, err := r.getDeviceInfo(ctx, tsr.Name, replica)
switch {
case err != nil:
return fmt.Errorf("failed to get device info: %w", err)
case !ok:
logger.Debugf("no Tailscale hostname known yet, waiting for Recorder pod to finish auth")
continue
}
device, ok, err := r.getDeviceInfo(ctx, tsr.Name)
if err != nil {
return fmt.Errorf("failed to get device info: %w", err)
devices = append(devices, dev)
}
if !ok {
logger.Debugf("no Tailscale hostname known yet, waiting for Recorder pod to finish auth")
return nil
}
devices = append(devices, device)
tsr.Status.Devices = devices
@@ -257,22 +284,89 @@ func saOwnedByRecorder(sa *corev1.ServiceAccount, tsr *tsapi.Recorder) error {
func (r *RecorderReconciler) maybeCleanupServiceAccounts(ctx context.Context, tsr *tsapi.Recorder, currentName string) error {
logger := r.logger(tsr.Name)
// List all ServiceAccounts owned by this Recorder.
options := []client.ListOption{
client.InNamespace(r.tsNamespace),
client.MatchingLabels(tsrLabels("recorder", tsr.Name, nil)),
}
sas := &corev1.ServiceAccountList{}
if err := r.List(ctx, sas, client.InNamespace(r.tsNamespace), client.MatchingLabels(labels("recorder", tsr.Name, nil))); err != nil {
if err := r.List(ctx, sas, options...); err != nil {
return fmt.Errorf("error listing ServiceAccounts for cleanup: %w", err)
}
for _, sa := range sas.Items {
if sa.Name == currentName {
for _, serviceAccount := range sas.Items {
if serviceAccount.Name == currentName {
continue
}
if err := r.Delete(ctx, &sa); err != nil {
if apierrors.IsNotFound(err) {
logger.Debugf("ServiceAccount %s not found, likely already deleted", sa.Name)
} else {
return fmt.Errorf("error deleting ServiceAccount %s: %w", sa.Name, err)
err := r.Delete(ctx, &serviceAccount)
switch {
case apierrors.IsNotFound(err):
logger.Debugf("ServiceAccount %s not found, likely already deleted", serviceAccount.Name)
continue
case err != nil:
return fmt.Errorf("error deleting ServiceAccount %s: %w", serviceAccount.Name, err)
}
}
return nil
}
func (r *RecorderReconciler) maybeCleanupSecrets(ctx context.Context, tsr *tsapi.Recorder) error {
options := []client.ListOption{
client.InNamespace(r.tsNamespace),
client.MatchingLabels(tsrLabels("recorder", tsr.Name, nil)),
}
secrets := &corev1.SecretList{}
if err := r.List(ctx, secrets, options...); err != nil {
return fmt.Errorf("error listing Secrets for cleanup: %w", err)
}
// Get the largest ordinal suffix that we expect. Then we'll go through the list of secrets owned by this
// recorder and remove them.
var replicas int32 = 1
if tsr.Spec.Replicas != nil {
replicas = *tsr.Spec.Replicas
}
for _, secret := range secrets.Items {
parts := strings.Split(secret.Name, "-")
if len(parts) == 0 {
continue
}
ordinal, err := strconv.ParseUint(parts[len(parts)-1], 10, 32)
if err != nil {
return fmt.Errorf("error parsing secret name %q: %w", secret.Name, err)
}
if int32(ordinal) < replicas {
continue
}
devicePrefs, ok, err := getDevicePrefs(&secret)
if err != nil {
return err
}
if ok {
var errResp *tailscale.ErrResponse
r.log.Debugf("deleting device %s", devicePrefs.Config.NodeID)
err = r.tsClient.DeleteDevice(ctx, string(devicePrefs.Config.NodeID))
switch {
case errors.As(err, &errResp) && errResp.Status == http.StatusNotFound:
// This device has possibly already been deleted in the admin console. So we can ignore this
// and move on to removing the secret.
case err != nil:
return err
}
}
if err = r.Delete(ctx, &secret); err != nil {
return err
}
}
return nil
@@ -284,30 +378,38 @@ func (r *RecorderReconciler) maybeCleanupServiceAccounts(ctx context.Context, ts
func (r *RecorderReconciler) maybeCleanup(ctx context.Context, tsr *tsapi.Recorder) (bool, error) {
logger := r.logger(tsr.Name)
prefs, ok, err := r.getDevicePrefs(ctx, tsr.Name)
if err != nil {
return false, err
}
if !ok {
logger.Debugf("state Secret %s-0 not found or does not contain node ID, continuing cleanup", tsr.Name)
r.mu.Lock()
r.recorders.Remove(tsr.UID)
gaugeRecorderResources.Set(int64(r.recorders.Len()))
r.mu.Unlock()
return true, nil
var replicas int32 = 1
if tsr.Spec.Replicas != nil {
replicas = *tsr.Spec.Replicas
}
id := string(prefs.Config.NodeID)
logger.Debugf("deleting device %s from control", string(id))
if err := r.tsClient.DeleteDevice(ctx, string(id)); err != nil {
errResp := &tailscale.ErrResponse{}
if ok := errors.As(err, errResp); ok && errResp.Status == http.StatusNotFound {
logger.Debugf("device %s not found, likely because it has already been deleted from control", string(id))
} else {
for replica := range replicas {
devicePrefs, ok, err := r.getDevicePrefs(ctx, tsr.Name, replica)
if err != nil {
return false, err
}
if !ok {
logger.Debugf("state Secret %s-%d not found or does not contain node ID, continuing cleanup", tsr.Name, replica)
r.mu.Lock()
r.recorders.Remove(tsr.UID)
gaugeRecorderResources.Set(int64(r.recorders.Len()))
r.mu.Unlock()
return true, nil
}
nodeID := string(devicePrefs.Config.NodeID)
logger.Debugf("deleting device %s from control", nodeID)
if err = r.tsClient.DeleteDevice(ctx, nodeID); err != nil {
errResp := &tailscale.ErrResponse{}
if errors.As(err, errResp) && errResp.Status == http.StatusNotFound {
logger.Debugf("device %s not found, likely because it has already been deleted from control", nodeID)
continue
}
return false, fmt.Errorf("error deleting device: %w", err)
}
} else {
logger.Debugf("device %s deleted from control", string(id))
logger.Debugf("device %s deleted from control", nodeID)
}
// Unlike most log entries in the reconcile loop, this will get printed
@@ -319,38 +421,46 @@ func (r *RecorderReconciler) maybeCleanup(ctx context.Context, tsr *tsapi.Record
r.recorders.Remove(tsr.UID)
gaugeRecorderResources.Set(int64(r.recorders.Len()))
r.mu.Unlock()
return true, nil
}
func (r *RecorderReconciler) ensureAuthSecretCreated(ctx context.Context, tsr *tsapi.Recorder) error {
logger := r.logger(tsr.Name)
key := types.NamespacedName{
Namespace: r.tsNamespace,
Name: tsr.Name,
}
if err := r.Get(ctx, key, &corev1.Secret{}); err == nil {
// No updates, already created the auth key.
logger.Debugf("auth Secret %s already exists", key.Name)
return nil
} else if !apierrors.IsNotFound(err) {
return err
func (r *RecorderReconciler) ensureAuthSecretsCreated(ctx context.Context, tsr *tsapi.Recorder) error {
var replicas int32 = 1
if tsr.Spec.Replicas != nil {
replicas = *tsr.Spec.Replicas
}
// Create the auth key Secret which is going to be used by the StatefulSet
// to authenticate with Tailscale.
logger.Debugf("creating authkey for new Recorder")
tags := tsr.Spec.Tags
if len(tags) == 0 {
tags = tsapi.Tags{"tag:k8s"}
}
authKey, err := newAuthKey(ctx, r.tsClient, tags.Stringify())
if err != nil {
return err
}
logger.Debug("creating a new Secret for the Recorder")
if err := r.Create(ctx, tsrAuthSecret(tsr, r.tsNamespace, authKey)); err != nil {
return err
logger := r.logger(tsr.Name)
for replica := range replicas {
key := types.NamespacedName{
Namespace: r.tsNamespace,
Name: fmt.Sprintf("%s-auth-%d", tsr.Name, replica),
}
err := r.Get(ctx, key, &corev1.Secret{})
switch {
case err == nil:
logger.Debugf("auth Secret %q already exists", key.Name)
continue
case !apierrors.IsNotFound(err):
return fmt.Errorf("failed to get Secret %q: %w", key.Name, err)
}
authKey, err := newAuthKey(ctx, r.tsClient, tags.Stringify())
if err != nil {
return err
}
if err = r.Create(ctx, tsrAuthSecret(tsr, r.tsNamespace, authKey, replica)); err != nil {
return err
}
}
return nil
@@ -361,6 +471,10 @@ func (r *RecorderReconciler) validate(ctx context.Context, tsr *tsapi.Recorder)
return errors.New("must either enable UI or use S3 storage to ensure recordings are accessible")
}
if tsr.Spec.Replicas != nil && *tsr.Spec.Replicas > 1 && tsr.Spec.Storage.S3 == nil {
return errors.New("must use S3 storage when using multiple replicas to ensure recordings are accessible")
}
// Check any custom ServiceAccount config doesn't conflict with pre-existing
// ServiceAccounts. This check is performed once during validation to ensure
// errors are raised early, but also again during any Updates to prevent a race.
@@ -394,11 +508,11 @@ func (r *RecorderReconciler) validate(ctx context.Context, tsr *tsapi.Recorder)
return nil
}
func (r *RecorderReconciler) getStateSecret(ctx context.Context, tsrName string) (*corev1.Secret, error) {
func (r *RecorderReconciler) getStateSecret(ctx context.Context, tsrName string, replica int32) (*corev1.Secret, error) {
secret := &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Namespace: r.tsNamespace,
Name: fmt.Sprintf("%s-0", tsrName),
Name: fmt.Sprintf("%s-%d", tsrName, replica),
},
}
if err := r.Get(ctx, client.ObjectKeyFromObject(secret), secret); err != nil {
@@ -412,8 +526,8 @@ func (r *RecorderReconciler) getStateSecret(ctx context.Context, tsrName string)
return secret, nil
}
func (r *RecorderReconciler) getDevicePrefs(ctx context.Context, tsrName string) (prefs prefs, ok bool, err error) {
secret, err := r.getStateSecret(ctx, tsrName)
func (r *RecorderReconciler) getDevicePrefs(ctx context.Context, tsrName string, replica int32) (prefs prefs, ok bool, err error) {
secret, err := r.getStateSecret(ctx, tsrName, replica)
if err != nil || secret == nil {
return prefs, false, err
}
@@ -441,8 +555,8 @@ func getDevicePrefs(secret *corev1.Secret) (prefs prefs, ok bool, err error) {
return prefs, ok, nil
}
func (r *RecorderReconciler) getDeviceInfo(ctx context.Context, tsrName string) (d tsapi.RecorderTailnetDevice, ok bool, err error) {
secret, err := r.getStateSecret(ctx, tsrName)
func (r *RecorderReconciler) getDeviceInfo(ctx context.Context, tsrName string, replica int32) (d tsapi.RecorderTailnetDevice, ok bool, err error) {
secret, err := r.getStateSecret(ctx, tsrName, replica)
if err != nil || secret == nil {
return tsapi.RecorderTailnetDevice{}, false, err
}