cmd/k8s-operator: always set ProxyGroup status conditions (#16429)

Refactors setting status into its own top-level function to make it
easier to ensure we _always_ set the status if it's changed on every
reconcile. Previously, it was possible to have stale status if some
earlier part of the provision logic failed.

Updates #16327

Change-Id: Idab0cfc15ae426cf6914a82f0d37a5cc7845236b
Signed-off-by: Tom Proctor <tomhjp@users.noreply.github.com>
This commit is contained in:
Tom Proctor
2025-07-07 00:40:56 +01:00
committed by GitHub
parent 92a114c66d
commit 079134d3c0
6 changed files with 217 additions and 177 deletions
+163 -147
View File
@@ -13,6 +13,7 @@ import (
"net/http"
"net/netip"
"slices"
"sort"
"strings"
"sync"
@@ -48,7 +49,6 @@ const (
reasonProxyGroupCreationFailed = "ProxyGroupCreationFailed"
reasonProxyGroupReady = "ProxyGroupReady"
reasonProxyGroupCreating = "ProxyGroupCreating"
reasonProxyGroupInvalid = "ProxyGroupInvalid"
// Copied from k8s.io/apiserver/pkg/registry/generic/registry/store.go@cccad306d649184bf2a0e319ba830c53f65c445c
optimisticLockErrorMsg = "the object has been modified; please apply your changes to the latest version and try again"
@@ -132,17 +132,15 @@ func (r *ProxyGroupReconciler) Reconcile(ctx context.Context, req reconcile.Requ
}
oldPGStatus := pg.Status.DeepCopy()
setStatusReady := func(pg *tsapi.ProxyGroup, status metav1.ConditionStatus, reason, message string) (reconcile.Result, error) {
tsoperator.SetProxyGroupCondition(pg, tsapi.ProxyGroupReady, status, reason, message, pg.Generation, r.clock, logger)
if !apiequality.Semantic.DeepEqual(oldPGStatus, &pg.Status) {
// An error encountered here should get returned by the Reconcile function.
if updateErr := r.Client.Status().Update(ctx, pg); updateErr != nil {
err = errors.Join(err, updateErr)
}
}
return reconcile.Result{}, err
}
staticEndpoints, nrr, err := r.reconcilePG(ctx, pg, logger)
return reconcile.Result{}, errors.Join(err, r.maybeUpdateStatus(ctx, logger, pg, oldPGStatus, nrr, staticEndpoints))
}
// reconcilePG handles all reconciliation of a ProxyGroup that is not marked
// for deletion. It is separated out from Reconcile to make a clear separation
// between reconciling the ProxyGroup, and posting the status of its created
// resources onto the ProxyGroup status field.
func (r *ProxyGroupReconciler) reconcilePG(ctx context.Context, pg *tsapi.ProxyGroup, logger *zap.SugaredLogger) (map[string][]netip.AddrPort, *notReadyReason, error) {
if !slices.Contains(pg.Finalizers, FinalizerName) {
// This log line is printed exactly once during initial provisioning,
// because once the finalizer is in place this block gets skipped. So,
@@ -150,18 +148,11 @@ func (r *ProxyGroupReconciler) Reconcile(ctx context.Context, req reconcile.Requ
// operation is underway.
logger.Infof("ensuring ProxyGroup is set up")
pg.Finalizers = append(pg.Finalizers, FinalizerName)
if err = r.Update(ctx, pg); err != nil {
err = fmt.Errorf("error adding finalizer: %w", err)
return setStatusReady(pg, metav1.ConditionFalse, reasonProxyGroupCreationFailed, reasonProxyGroupCreationFailed)
if err := r.Update(ctx, pg); err != nil {
return r.notReadyErrf(pg, "error adding finalizer: %w", err)
}
}
if err = r.validate(pg); err != nil {
message := fmt.Sprintf("ProxyGroup is invalid: %s", err)
r.recorder.Eventf(pg, corev1.EventTypeWarning, reasonProxyGroupInvalid, message)
return setStatusReady(pg, metav1.ConditionFalse, reasonProxyGroupInvalid, message)
}
proxyClassName := r.defaultProxyClass
if pg.Spec.ProxyClass != "" {
proxyClassName = pg.Spec.ProxyClass
@@ -172,78 +163,33 @@ func (r *ProxyGroupReconciler) Reconcile(ctx context.Context, req reconcile.Requ
proxyClass = new(tsapi.ProxyClass)
err := r.Get(ctx, types.NamespacedName{Name: proxyClassName}, proxyClass)
if apierrors.IsNotFound(err) {
err = nil
message := fmt.Sprintf("the ProxyGroup's ProxyClass %s does not (yet) exist", proxyClassName)
logger.Info(message)
return setStatusReady(pg, metav1.ConditionFalse, reasonProxyGroupCreating, message)
msg := fmt.Sprintf("the ProxyGroup's ProxyClass %q does not (yet) exist", proxyClassName)
logger.Info(msg)
return r.notReady(reasonProxyGroupCreating, msg)
}
if err != nil {
err = fmt.Errorf("error getting ProxyGroup's ProxyClass %s: %s", proxyClassName, err)
r.recorder.Eventf(pg, corev1.EventTypeWarning, reasonProxyGroupCreationFailed, err.Error())
return setStatusReady(pg, metav1.ConditionFalse, reasonProxyGroupCreationFailed, err.Error())
return r.notReadyErrf(pg, "error getting ProxyGroup's ProxyClass %q: %w", proxyClassName, err)
}
validateProxyClassForPG(logger, pg, proxyClass)
if !tsoperator.ProxyClassIsReady(proxyClass) {
message := fmt.Sprintf("the ProxyGroup's ProxyClass %s is not yet in a ready state, waiting...", proxyClassName)
logger.Info(message)
return setStatusReady(pg, metav1.ConditionFalse, reasonProxyGroupCreating, message)
}
}
isProvisioned, err := r.maybeProvision(ctx, pg, proxyClass)
if err != nil {
reason := reasonProxyGroupCreationFailed
msg := fmt.Sprintf("error provisioning ProxyGroup resources: %s", err)
if strings.Contains(err.Error(), optimisticLockErrorMsg) {
reason = reasonProxyGroupCreating
msg = fmt.Sprintf("optimistic lock error, retrying: %s", err)
err = nil
msg := fmt.Sprintf("the ProxyGroup's ProxyClass %q is not yet in a ready state, waiting...", proxyClassName)
logger.Info(msg)
return r.notReady(reasonProxyGroupCreating, msg)
}
}
staticEndpoints, nrr, err := r.maybeProvision(ctx, pg, proxyClass)
if err != nil {
if strings.Contains(err.Error(), optimisticLockErrorMsg) {
msg := fmt.Sprintf("optimistic lock error, retrying: %s", nrr.message)
logger.Info(msg)
return r.notReady(reasonProxyGroupCreating, msg)
} else {
r.recorder.Eventf(pg, corev1.EventTypeWarning, reason, msg)
}
return setStatusReady(pg, metav1.ConditionFalse, reason, msg)
}
if !isProvisioned {
if !apiequality.Semantic.DeepEqual(oldPGStatus, &pg.Status) {
// An error encountered here should get returned by the Reconcile function.
if updateErr := r.Client.Status().Update(ctx, pg); updateErr != nil {
return reconcile.Result{}, errors.Join(err, updateErr)
}
}
return
}
desiredReplicas := int(pgReplicas(pg))
// Set ProxyGroupAvailable condition.
status := metav1.ConditionFalse
reason := reasonProxyGroupCreating
message := fmt.Sprintf("%d/%d ProxyGroup pods running", len(pg.Status.Devices), desiredReplicas)
if len(pg.Status.Devices) > 0 {
status = metav1.ConditionTrue
if len(pg.Status.Devices) == desiredReplicas {
reason = reasonProxyGroupReady
return nil, nrr, err
}
}
tsoperator.SetProxyGroupCondition(pg, tsapi.ProxyGroupAvailable, status, reason, message, pg.Generation, r.clock, logger)
// Set ProxyGroupReady condition.
if len(pg.Status.Devices) < desiredReplicas {
logger.Debug(message)
return setStatusReady(pg, metav1.ConditionFalse, reasonProxyGroupCreating, message)
}
if len(pg.Status.Devices) > desiredReplicas {
message = fmt.Sprintf("waiting for %d ProxyGroup pods to shut down", len(pg.Status.Devices)-desiredReplicas)
logger.Debug(message)
return setStatusReady(pg, metav1.ConditionFalse, reasonProxyGroupCreating, message)
}
logger.Info("ProxyGroup resources synced")
return setStatusReady(pg, metav1.ConditionTrue, reasonProxyGroupReady, reasonProxyGroupReady)
return staticEndpoints, nrr, nil
}
// validateProxyClassForPG applies custom validation logic for ProxyClass applied to ProxyGroup.
@@ -271,7 +217,7 @@ func validateProxyClassForPG(logger *zap.SugaredLogger, pg *tsapi.ProxyGroup, pc
}
}
func (r *ProxyGroupReconciler) maybeProvision(ctx context.Context, pg *tsapi.ProxyGroup, proxyClass *tsapi.ProxyClass) (isProvisioned bool, err error) {
func (r *ProxyGroupReconciler) maybeProvision(ctx context.Context, pg *tsapi.ProxyGroup, proxyClass *tsapi.ProxyClass) (map[string][]netip.AddrPort, *notReadyReason, error) {
logger := r.logger(pg.Name)
r.mu.Lock()
r.ensureAddedToGaugeForProxyGroup(pg)
@@ -280,31 +226,30 @@ func (r *ProxyGroupReconciler) maybeProvision(ctx context.Context, pg *tsapi.Pro
svcToNodePorts := make(map[string]uint16)
var tailscaledPort *uint16
if proxyClass != nil && proxyClass.Spec.StaticEndpoints != nil {
var err error
svcToNodePorts, tailscaledPort, err = r.ensureNodePortServiceCreated(ctx, pg, proxyClass)
if err != nil {
wrappedErr := fmt.Errorf("error provisioning NodePort Services for static endpoints: %w", err)
var allocatePortErr *allocatePortsErr
if errors.As(err, &allocatePortErr) {
reason := reasonProxyGroupCreationFailed
msg := fmt.Sprintf("error provisioning ProxyGroup resources: %s", wrappedErr)
r.setStatusReady(pg, metav1.ConditionFalse, reason, msg, logger)
return false, nil
msg := fmt.Sprintf("error provisioning NodePort Services for static endpoints: %v", err)
r.recorder.Event(pg, corev1.EventTypeWarning, reason, msg)
return r.notReady(reason, msg)
}
return false, wrappedErr
return r.notReadyErrf(pg, "error provisioning NodePort Services for static endpoints: %w", err)
}
}
staticEndpoints, err := r.ensureConfigSecretsCreated(ctx, pg, proxyClass, svcToNodePorts)
if err != nil {
wrappedErr := fmt.Errorf("error provisioning config Secrets: %w", err)
var selectorErr *FindStaticEndpointErr
if errors.As(err, &selectorErr) {
reason := reasonProxyGroupCreationFailed
msg := fmt.Sprintf("error provisioning ProxyGroup resources: %s", wrappedErr)
r.setStatusReady(pg, metav1.ConditionFalse, reason, msg, logger)
return false, nil
msg := fmt.Sprintf("error provisioning config Secrets: %v", err)
r.recorder.Event(pg, corev1.EventTypeWarning, reason, msg)
return r.notReady(reason, msg)
}
return false, wrappedErr
return r.notReadyErrf(pg, "error provisioning config Secrets: %w", err)
}
// State secrets are precreated so we can use the ProxyGroup CR as their owner ref.
@@ -315,7 +260,7 @@ func (r *ProxyGroupReconciler) maybeProvision(ctx context.Context, pg *tsapi.Pro
s.ObjectMeta.Annotations = sec.ObjectMeta.Annotations
s.ObjectMeta.OwnerReferences = sec.ObjectMeta.OwnerReferences
}); err != nil {
return false, fmt.Errorf("error provisioning state Secrets: %w", err)
return r.notReadyErrf(pg, "error provisioning state Secrets: %w", err)
}
}
sa := pgServiceAccount(pg, r.tsNamespace)
@@ -324,7 +269,7 @@ func (r *ProxyGroupReconciler) maybeProvision(ctx context.Context, pg *tsapi.Pro
s.ObjectMeta.Annotations = sa.ObjectMeta.Annotations
s.ObjectMeta.OwnerReferences = sa.ObjectMeta.OwnerReferences
}); err != nil {
return false, fmt.Errorf("error provisioning ServiceAccount: %w", err)
return r.notReadyErrf(pg, "error provisioning ServiceAccount: %w", err)
}
role := pgRole(pg, r.tsNamespace)
if _, err := createOrUpdate(ctx, r.Client, r.tsNamespace, role, func(r *rbacv1.Role) {
@@ -333,7 +278,7 @@ func (r *ProxyGroupReconciler) maybeProvision(ctx context.Context, pg *tsapi.Pro
r.ObjectMeta.OwnerReferences = role.ObjectMeta.OwnerReferences
r.Rules = role.Rules
}); err != nil {
return false, fmt.Errorf("error provisioning Role: %w", err)
return r.notReadyErrf(pg, "error provisioning Role: %w", err)
}
roleBinding := pgRoleBinding(pg, r.tsNamespace)
if _, err := createOrUpdate(ctx, r.Client, r.tsNamespace, roleBinding, func(r *rbacv1.RoleBinding) {
@@ -343,7 +288,7 @@ func (r *ProxyGroupReconciler) maybeProvision(ctx context.Context, pg *tsapi.Pro
r.RoleRef = roleBinding.RoleRef
r.Subjects = roleBinding.Subjects
}); err != nil {
return false, fmt.Errorf("error provisioning RoleBinding: %w", err)
return r.notReadyErrf(pg, "error provisioning RoleBinding: %w", err)
}
if pg.Spec.Type == tsapi.ProxyGroupTypeEgress {
cm, hp := pgEgressCM(pg, r.tsNamespace)
@@ -352,7 +297,7 @@ func (r *ProxyGroupReconciler) maybeProvision(ctx context.Context, pg *tsapi.Pro
existing.ObjectMeta.OwnerReferences = cm.ObjectMeta.OwnerReferences
mak.Set(&existing.BinaryData, egressservices.KeyHEPPings, hp)
}); err != nil {
return false, fmt.Errorf("error provisioning egress ConfigMap %q: %w", cm.Name, err)
return r.notReadyErrf(pg, "error provisioning egress ConfigMap %q: %w", cm.Name, err)
}
}
if pg.Spec.Type == tsapi.ProxyGroupTypeIngress {
@@ -361,28 +306,27 @@ func (r *ProxyGroupReconciler) maybeProvision(ctx context.Context, pg *tsapi.Pro
existing.ObjectMeta.Labels = cm.ObjectMeta.Labels
existing.ObjectMeta.OwnerReferences = cm.ObjectMeta.OwnerReferences
}); err != nil {
return false, fmt.Errorf("error provisioning ingress ConfigMap %q: %w", cm.Name, err)
return r.notReadyErrf(pg, "error provisioning ingress ConfigMap %q: %w", cm.Name, err)
}
}
ss, err := pgStatefulSet(pg, r.tsNamespace, r.proxyImage, r.tsFirewallMode, tailscaledPort, proxyClass)
if err != nil {
return false, fmt.Errorf("error generating StatefulSet spec: %w", err)
return r.notReadyErrf(pg, "error generating StatefulSet spec: %w", err)
}
cfg := &tailscaleSTSConfig{
proxyType: string(pg.Spec.Type),
}
ss = applyProxyClassToStatefulSet(proxyClass, ss, cfg, logger)
updateSS := func(s *appsv1.StatefulSet) {
if _, err := createOrUpdate(ctx, r.Client, r.tsNamespace, ss, func(s *appsv1.StatefulSet) {
s.Spec = ss.Spec
s.ObjectMeta.Labels = ss.ObjectMeta.Labels
s.ObjectMeta.Annotations = ss.ObjectMeta.Annotations
s.ObjectMeta.OwnerReferences = ss.ObjectMeta.OwnerReferences
}); err != nil {
return r.notReadyErrf(pg, "error provisioning StatefulSet: %w", err)
}
if _, err := createOrUpdate(ctx, r.Client, r.tsNamespace, ss, updateSS); err != nil {
return false, fmt.Errorf("error provisioning StatefulSet: %w", err)
}
mo := &metricsOpts{
tsNamespace: r.tsNamespace,
proxyStsName: pg.Name,
@@ -390,21 +334,67 @@ func (r *ProxyGroupReconciler) maybeProvision(ctx context.Context, pg *tsapi.Pro
proxyType: "proxygroup",
}
if err := reconcileMetricsResources(ctx, logger, mo, proxyClass, r.Client); err != nil {
return false, fmt.Errorf("error reconciling metrics resources: %w", err)
return r.notReadyErrf(pg, "error reconciling metrics resources: %w", err)
}
if err := r.cleanupDanglingResources(ctx, pg, proxyClass); err != nil {
return false, fmt.Errorf("error cleaning up dangling resources: %w", err)
return r.notReadyErrf(pg, "error cleaning up dangling resources: %w", err)
}
devices, err := r.getDeviceInfo(ctx, staticEndpoints, pg)
logger.Info("ProxyGroup resources synced")
return staticEndpoints, nil, nil
}
func (r *ProxyGroupReconciler) maybeUpdateStatus(ctx context.Context, logger *zap.SugaredLogger, pg *tsapi.ProxyGroup, oldPGStatus *tsapi.ProxyGroupStatus, nrr *notReadyReason, endpoints map[string][]netip.AddrPort) (err error) {
defer func() {
if !apiequality.Semantic.DeepEqual(*oldPGStatus, pg.Status) {
if updateErr := r.Client.Status().Update(ctx, pg); updateErr != nil {
err = errors.Join(err, updateErr)
}
}
}()
devices, err := r.getRunningProxies(ctx, pg, endpoints)
if err != nil {
return false, fmt.Errorf("failed to get device info: %w", err)
return fmt.Errorf("failed to list running proxies: %w", err)
}
pg.Status.Devices = devices
return true, nil
desiredReplicas := int(pgReplicas(pg))
// Set ProxyGroupAvailable condition.
status := metav1.ConditionFalse
reason := reasonProxyGroupCreating
message := fmt.Sprintf("%d/%d ProxyGroup pods running", len(devices), desiredReplicas)
if len(devices) > 0 {
status = metav1.ConditionTrue
if len(devices) == desiredReplicas {
reason = reasonProxyGroupReady
}
}
tsoperator.SetProxyGroupCondition(pg, tsapi.ProxyGroupAvailable, status, reason, message, 0, r.clock, logger)
// Set ProxyGroupReady condition.
status = metav1.ConditionFalse
reason = reasonProxyGroupCreating
switch {
case nrr != nil:
// If we failed earlier, that reason takes precedence.
reason = nrr.reason
message = nrr.message
case len(devices) < desiredReplicas:
case len(devices) > desiredReplicas:
message = fmt.Sprintf("waiting for %d ProxyGroup pods to shut down", len(devices)-desiredReplicas)
default:
status = metav1.ConditionTrue
reason = reasonProxyGroupReady
message = reasonProxyGroupReady
}
tsoperator.SetProxyGroupCondition(pg, tsapi.ProxyGroupReady, status, reason, message, pg.Generation, r.clock, logger)
return nil
}
// getServicePortsForProxyGroups returns a map of ProxyGroup Service names to their NodePorts,
@@ -484,15 +474,15 @@ func (r *ProxyGroupReconciler) ensureNodePortServiceCreated(ctx context.Context,
tailscaledPort := getRandomPort()
svcs := []*corev1.Service{}
for i := range pgReplicas(pg) {
replicaName := pgNodePortServiceName(pg.Name, i)
nodePortSvcName := pgNodePortServiceName(pg.Name, i)
svc := &corev1.Service{}
err := r.Get(ctx, types.NamespacedName{Name: replicaName, Namespace: r.tsNamespace}, svc)
err := r.Get(ctx, types.NamespacedName{Name: nodePortSvcName, Namespace: r.tsNamespace}, svc)
if err != nil && !apierrors.IsNotFound(err) {
return nil, nil, fmt.Errorf("error getting Kubernetes Service %q: %w", replicaName, err)
return nil, nil, fmt.Errorf("error getting Kubernetes Service %q: %w", nodePortSvcName, err)
}
if apierrors.IsNotFound(err) {
svcs = append(svcs, pgNodePortService(pg, replicaName, r.tsNamespace))
svcs = append(svcs, pgNodePortService(pg, nodePortSvcName, r.tsNamespace))
} else {
// NOTE: if we can we want to recover the random port used for tailscaled,
// as well as the NodePort previously used for that Service
@@ -638,7 +628,7 @@ func (r *ProxyGroupReconciler) deleteTailnetDevice(ctx context.Context, id tailc
func (r *ProxyGroupReconciler) ensureConfigSecretsCreated(ctx context.Context, pg *tsapi.ProxyGroup, proxyClass *tsapi.ProxyClass, svcToNodePorts map[string]uint16) (endpoints map[string][]netip.AddrPort, err error) {
logger := r.logger(pg.Name)
endpoints = make(map[string][]netip.AddrPort, pgReplicas(pg))
endpoints = make(map[string][]netip.AddrPort, pgReplicas(pg)) // keyed by Service name.
for i := range pgReplicas(pg) {
cfgSecret := &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
@@ -691,14 +681,15 @@ func (r *ProxyGroupReconciler) ensureConfigSecretsCreated(ctx context.Context, p
}
}
replicaName := pgNodePortServiceName(pg.Name, i)
nodePortSvcName := pgNodePortServiceName(pg.Name, i)
if len(svcToNodePorts) > 0 {
port, ok := svcToNodePorts[replicaName]
replicaName := fmt.Sprintf("%s-%d", pg.Name, i)
port, ok := svcToNodePorts[nodePortSvcName]
if !ok {
return nil, fmt.Errorf("could not find configured NodePort for ProxyGroup replica %q", replicaName)
}
endpoints[replicaName], err = r.findStaticEndpoints(ctx, existingCfgSecret, proxyClass, port, logger)
endpoints[nodePortSvcName], err = r.findStaticEndpoints(ctx, existingCfgSecret, proxyClass, port, logger)
if err != nil {
return nil, fmt.Errorf("could not find static endpoints for replica %q: %w", replicaName, err)
}
@@ -711,7 +702,7 @@ func (r *ProxyGroupReconciler) ensureConfigSecretsCreated(ctx context.Context, p
return nil, err
}
configs, err := pgTailscaledConfig(pg, proxyClass, i, authKey, endpoints[replicaName], existingAdvertiseServices, r.loginServer)
configs, err := pgTailscaledConfig(pg, proxyClass, i, authKey, endpoints[nodePortSvcName], existingAdvertiseServices, r.loginServer)
if err != nil {
return nil, fmt.Errorf("error creating tailscaled config: %w", err)
}
@@ -910,16 +901,14 @@ func extractAdvertiseServicesConfig(cfgSecret *corev1.Secret) ([]string, error)
return conf.AdvertiseServices, nil
}
func (r *ProxyGroupReconciler) validate(_ *tsapi.ProxyGroup) error {
return nil
}
// getNodeMetadata gets metadata for all the pods owned by this ProxyGroup by
// querying their state Secrets. It may not return the same number of items as
// specified in the ProxyGroup spec if e.g. it is getting scaled up or down, or
// some pods have failed to write state.
//
// The returned metadata will contain an entry for each state Secret that exists.
func (r *ProxyGroupReconciler) getNodeMetadata(ctx context.Context, pg *tsapi.ProxyGroup) (metadata []nodeMetadata, _ error) {
// List all state secrets owned by this ProxyGroup.
// List all state Secrets owned by this ProxyGroup.
secrets := &corev1.SecretList{}
if err := r.List(ctx, secrets, client.InNamespace(r.tsNamespace), client.MatchingLabels(pgSecretLabels(pg.Name, "state"))); err != nil {
return nil, fmt.Errorf("failed to list state Secrets: %w", err)
@@ -930,20 +919,20 @@ func (r *ProxyGroupReconciler) getNodeMetadata(ctx context.Context, pg *tsapi.Pr
return nil, fmt.Errorf("unexpected secret %s was labelled as owned by the ProxyGroup %s: %w", secret.Name, pg.Name, err)
}
nm := nodeMetadata{
ordinal: ordinal,
stateSecret: &secret,
}
prefs, ok, err := getDevicePrefs(&secret)
if err != nil {
return nil, err
}
if !ok {
continue
if ok {
nm.tsID = prefs.Config.NodeID
nm.dnsName = prefs.Config.UserProfile.LoginName
}
nm := nodeMetadata{
ordinal: ordinal,
stateSecret: &secret,
tsID: prefs.Config.NodeID,
dnsName: prefs.Config.UserProfile.LoginName,
}
pod := &corev1.Pod{}
if err := r.Get(ctx, client.ObjectKey{Namespace: r.tsNamespace, Name: fmt.Sprintf("%s-%d", pg.Name, ordinal)}, pod); err != nil && !apierrors.IsNotFound(err) {
return nil, err
@@ -953,23 +942,36 @@ func (r *ProxyGroupReconciler) getNodeMetadata(ctx context.Context, pg *tsapi.Pr
metadata = append(metadata, nm)
}
// Sort for predictable ordering and status.
sort.Slice(metadata, func(i, j int) bool {
return metadata[i].ordinal < metadata[j].ordinal
})
return metadata, nil
}
func (r *ProxyGroupReconciler) getDeviceInfo(ctx context.Context, staticEndpoints map[string][]netip.AddrPort, pg *tsapi.ProxyGroup) (devices []tsapi.TailnetDevice, _ error) {
// getRunningProxies will return status for all proxy Pods whose state Secret
// has an up to date Pod UID and at least a hostname.
func (r *ProxyGroupReconciler) getRunningProxies(ctx context.Context, pg *tsapi.ProxyGroup, staticEndpoints map[string][]netip.AddrPort) (devices []tsapi.TailnetDevice, _ error) {
metadata, err := r.getNodeMetadata(ctx, pg)
if err != nil {
return nil, err
}
for _, m := range metadata {
if !strings.EqualFold(string(m.stateSecret.Data[kubetypes.KeyPodUID]), m.podUID) {
for i, m := range metadata {
if m.podUID == "" || !strings.EqualFold(string(m.stateSecret.Data[kubetypes.KeyPodUID]), m.podUID) {
// Current Pod has not yet written its UID to the state Secret, data may
// be stale.
continue
}
device := tsapi.TailnetDevice{}
if hostname, _, ok := strings.Cut(string(m.stateSecret.Data[kubetypes.KeyDeviceFQDN]), "."); ok {
device.Hostname = hostname
} else {
continue
}
if ipsB := m.stateSecret.Data[kubetypes.KeyDeviceIPs]; len(ipsB) > 0 {
ips := []string{}
if err := json.Unmarshal(ipsB, &ips); err != nil {
@@ -978,11 +980,10 @@ func (r *ProxyGroupReconciler) getDeviceInfo(ctx context.Context, staticEndpoint
device.TailnetIPs = ips
}
if hostname, _, ok := strings.Cut(string(m.stateSecret.Data[kubetypes.KeyDeviceFQDN]), "."); ok {
device.Hostname = hostname
}
if ep, ok := staticEndpoints[device.Hostname]; ok && len(ep) > 0 {
// TODO(tomhjp): This is our input to the proxy, but we should instead
// read this back from the proxy's state in some way to more accurately
// reflect its status.
if ep, ok := staticEndpoints[pgNodePortServiceName(pg.Name, int32(i))]; ok && len(ep) > 0 {
eps := make([]string, 0, len(ep))
for _, e := range ep {
eps = append(eps, e.String())
@@ -999,13 +1000,28 @@ func (r *ProxyGroupReconciler) getDeviceInfo(ctx context.Context, staticEndpoint
type nodeMetadata struct {
ordinal int
stateSecret *corev1.Secret
// podUID is the UID of the current Pod or empty if the Pod does not exist.
podUID string
tsID tailcfg.StableNodeID
dnsName string
podUID string // or empty if the Pod no longer exists.
tsID tailcfg.StableNodeID
dnsName string
}
func (pr *ProxyGroupReconciler) setStatusReady(pg *tsapi.ProxyGroup, status metav1.ConditionStatus, reason string, msg string, logger *zap.SugaredLogger) {
pr.recorder.Eventf(pg, corev1.EventTypeWarning, reason, msg)
tsoperator.SetProxyGroupCondition(pg, tsapi.ProxyGroupReady, status, reason, msg, pg.Generation, pr.clock, logger)
func (r *ProxyGroupReconciler) notReady(reason, msg string) (map[string][]netip.AddrPort, *notReadyReason, error) {
return nil, &notReadyReason{
reason: reason,
message: msg,
}, nil
}
func (r *ProxyGroupReconciler) notReadyErrf(pg *tsapi.ProxyGroup, format string, a ...any) (map[string][]netip.AddrPort, *notReadyReason, error) {
err := fmt.Errorf(format, a...)
r.recorder.Event(pg, corev1.EventTypeWarning, reasonProxyGroupCreationFailed, err.Error())
return nil, &notReadyReason{
reason: reasonProxyGroupCreationFailed,
message: err.Error(),
}, err
}
type notReadyReason struct {
reason string
message string
}