all-kube: create Tailscale Service for HA kube-apiserver ProxyGroup (#16572)

Adds a new reconciler for ProxyGroups of type kube-apiserver that will
provision a Tailscale Service for each replica to advertise. Adds two
new condition types to the ProxyGroup, TailscaleServiceValid and
TailscaleServiceConfigured, to post updates on the state of that
reconciler in a way that's consistent with the service-pg reconciler.
The created Tailscale Service name is configurable via a new ProxyGroup
field spec.kubeAPISserver.ServiceName, which expects a string of the
form "svc:<dns-label>".

Lots of supporting changes were needed to implement this in a way that's
consistent with other operator workflows, including:

* Pulled containerboot's ensureServicesUnadvertised and certManager into
  kube/ libraries to be shared with k8s-proxy. Use those in k8s-proxy to
  aid Service cert sharing between replicas and graceful Service shutdown.
* For certManager, add an initial wait to the cert loop to wait until
  the domain appears in the devices's netmap to avoid a guaranteed error
  on the first issue attempt when it's quick to start.
* Made several methods in ingress-for-pg.go and svc-for-pg.go into
  functions to share with the new reconciler
* Added a Resource struct to the owner refs stored in Tailscale Service
  annotations to be able to distinguish between Ingress- and ProxyGroup-
  based Services that need cleaning up in the Tailscale API.
* Added a ListVIPServices method to the internal tailscale client to aid
  cleaning up orphaned Services
* Support for reading config from a kube Secret, and partial support for
  config reloading, to prevent us having to force Pod restarts when
  config changes.
* Fixed up the zap logger so it's possible to set debug log level.

Updates #13358

Change-Id: Ia9607441157dd91fb9b6ecbc318eecbef446e116
Signed-off-by: Tom Proctor <tomhjp@users.noreply.github.com>
This commit is contained in:
Tom Proctor
2025-07-21 11:03:21 +01:00
committed by GitHub
parent 5adde9e3f3
commit f421907c38
39 changed files with 2551 additions and 397 deletions
+264
View File
@@ -0,0 +1,264 @@
// Copyright (c) Tailscale Inc & AUTHORS
// SPDX-License-Identifier: BSD-3-Clause
//go:build !plan9
// Package config provides watchers for the various supported ways to load a
// config file for k8s-proxy; currently file or Kubernetes Secret.
package config
import (
"bytes"
"context"
"errors"
"fmt"
"os"
"path/filepath"
"strings"
"sync"
"time"
"github.com/fsnotify/fsnotify"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/watch"
clientcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"tailscale.com/kube/k8s-proxy/conf"
"tailscale.com/kube/kubetypes"
"tailscale.com/types/ptr"
"tailscale.com/util/testenv"
)
type configLoader struct {
logger *zap.SugaredLogger
client clientcorev1.CoreV1Interface
cfgChan chan<- *conf.Config
previous []byte
once sync.Once // For use in tests. To close cfgIgnored.
cfgIgnored chan struct{} // For use in tests.
}
func NewConfigLoader(logger *zap.SugaredLogger, client clientcorev1.CoreV1Interface, cfgChan chan<- *conf.Config) *configLoader {
return &configLoader{
logger: logger,
client: client,
cfgChan: cfgChan,
}
}
func (l *configLoader) WatchConfig(ctx context.Context, path string) error {
secretNamespacedName, isKubeSecret := strings.CutPrefix(path, "kube:")
if isKubeSecret {
secretNamespace, secretName, ok := strings.Cut(secretNamespacedName, string(types.Separator))
if !ok {
return fmt.Errorf("invalid Kubernetes Secret reference %q, expected format <namespace>/<name>", path)
}
if err := l.watchConfigSecretChanges(ctx, secretNamespace, secretName); err != nil && !errors.Is(err, context.Canceled) {
return fmt.Errorf("error watching config Secret %q: %w", secretNamespacedName, err)
}
return nil
}
if err := l.watchConfigFileChanges(ctx, path); err != nil && !errors.Is(err, context.Canceled) {
return fmt.Errorf("error watching config file %q: %w", path, err)
}
return nil
}
func (l *configLoader) reloadConfig(ctx context.Context, raw []byte) error {
if bytes.Equal(raw, l.previous) {
if l.cfgIgnored != nil && testenv.InTest() {
l.once.Do(func() {
close(l.cfgIgnored)
})
}
return nil
}
cfg, err := conf.Load(raw)
if err != nil {
return fmt.Errorf("error loading config: %w", err)
}
select {
case <-ctx.Done():
return ctx.Err()
case l.cfgChan <- &cfg:
}
l.previous = raw
return nil
}
func (l *configLoader) watchConfigFileChanges(ctx context.Context, path string) error {
var (
tickChan <-chan time.Time
eventChan <-chan fsnotify.Event
errChan <-chan error
)
if w, err := fsnotify.NewWatcher(); err != nil {
// Creating a new fsnotify watcher would fail for example if inotify was not able to create a new file descriptor.
// See https://github.com/tailscale/tailscale/issues/15081
l.logger.Infof("Failed to create fsnotify watcher on config file %q; watching for changes on 5s timer: %v", path, err)
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
tickChan = ticker.C
} else {
dir := filepath.Dir(path)
file := filepath.Base(path)
l.logger.Infof("Watching directory %q for changes to config file %q", dir, file)
defer w.Close()
if err := w.Add(dir); err != nil {
return fmt.Errorf("failed to add fsnotify watch: %w", err)
}
eventChan = w.Events
errChan = w.Errors
}
// Read the initial config file, but after the watcher is already set up to
// avoid an unlucky race condition if the config file is edited in between.
b, err := os.ReadFile(path)
if err != nil {
return fmt.Errorf("error reading config file %q: %w", path, err)
}
if err := l.reloadConfig(ctx, b); err != nil {
return fmt.Errorf("error loading initial config file %q: %w", path, err)
}
for {
select {
case <-ctx.Done():
return ctx.Err()
case err, ok := <-errChan:
if !ok {
// Watcher was closed.
return nil
}
return fmt.Errorf("watcher error: %w", err)
case <-tickChan:
case ev, ok := <-eventChan:
if !ok {
// Watcher was closed.
return nil
}
if ev.Name != path || ev.Op&fsnotify.Write == 0 {
// Ignore irrelevant events.
continue
}
}
b, err := os.ReadFile(path)
if err != nil {
return fmt.Errorf("error reading config file: %w", err)
}
// Writers such as os.WriteFile may truncate the file before writing
// new contents, so it's possible to read an empty file if we read before
// the write has completed.
if len(b) == 0 {
continue
}
if err := l.reloadConfig(ctx, b); err != nil {
return fmt.Errorf("error reloading config file %q: %v", path, err)
}
}
}
func (l *configLoader) watchConfigSecretChanges(ctx context.Context, secretNamespace, secretName string) error {
secrets := l.client.Secrets(secretNamespace)
w, err := secrets.Watch(ctx, metav1.ListOptions{
TypeMeta: metav1.TypeMeta{
Kind: "Secret",
APIVersion: "v1",
},
// Re-watch regularly to avoid relying on long-lived connections.
// See https://github.com/kubernetes-client/javascript/issues/596#issuecomment-786419380
TimeoutSeconds: ptr.To(int64(600)),
FieldSelector: fmt.Sprintf("metadata.name=%s", secretName),
Watch: true,
})
if err != nil {
return fmt.Errorf("failed to watch config Secret %q: %w", secretName, err)
}
defer func() {
// May not be the original watcher by the time we exit.
if w != nil {
w.Stop()
}
}()
// Get the initial config Secret now we've got the watcher set up.
secret, err := secrets.Get(ctx, secretName, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("failed to get config Secret %q: %w", secretName, err)
}
if err := l.configFromSecret(ctx, secret); err != nil {
return fmt.Errorf("error loading initial config: %w", err)
}
l.logger.Infof("Watching config Secret %q for changes", secretName)
for {
var secret *corev1.Secret
select {
case <-ctx.Done():
return ctx.Err()
case ev, ok := <-w.ResultChan():
if !ok {
w.Stop()
w, err = secrets.Watch(ctx, metav1.ListOptions{
TypeMeta: metav1.TypeMeta{
Kind: "Secret",
APIVersion: "v1",
},
TimeoutSeconds: ptr.To(int64(600)),
FieldSelector: fmt.Sprintf("metadata.name=%s", secretName),
Watch: true,
})
if err != nil {
return fmt.Errorf("failed to re-watch config Secret %q: %w", secretName, err)
}
continue
}
switch ev.Type {
case watch.Added, watch.Modified:
// New config available to load.
var ok bool
secret, ok = ev.Object.(*corev1.Secret)
if !ok {
return fmt.Errorf("unexpected object type %T in watch event for config Secret %q", ev.Object, secretName)
}
if secret == nil || secret.Data == nil {
continue
}
if err := l.configFromSecret(ctx, secret); err != nil {
return fmt.Errorf("error reloading config Secret %q: %v", secret.Name, err)
}
case watch.Error:
return fmt.Errorf("error watching config Secret %q: %v", secretName, ev.Object)
default:
// Ignore, no action required.
continue
}
}
}
}
func (l *configLoader) configFromSecret(ctx context.Context, s *corev1.Secret) error {
b := s.Data[kubetypes.KubeAPIServerConfigFile]
if len(b) == 0 {
return fmt.Errorf("config Secret %q does not contain expected config in key %q", s.Name, kubetypes.KubeAPIServerConfigFile)
}
if err := l.reloadConfig(ctx, b); err != nil {
return err
}
return nil
}
@@ -0,0 +1,245 @@
// Copyright (c) Tailscale Inc & AUTHORS
// SPDX-License-Identifier: BSD-3-Clause
package config
import (
"context"
"os"
"path/filepath"
"strings"
"testing"
"time"
"github.com/google/go-cmp/cmp"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes/fake"
ktesting "k8s.io/client-go/testing"
"tailscale.com/kube/k8s-proxy/conf"
"tailscale.com/kube/kubetypes"
"tailscale.com/types/ptr"
)
func TestWatchConfig(t *testing.T) {
type phase struct {
config string
cancel bool
expectedConf *conf.ConfigV1Alpha1
expectedErr string
}
// Same set of behaviour tests for each config source.
for _, env := range []string{"file", "kube"} {
t.Run(env, func(t *testing.T) {
t.Parallel()
for _, tc := range []struct {
name string
initialConfig string
phases []phase
}{
{
name: "no_config",
phases: []phase{{
expectedErr: "error loading initial config",
}},
},
{
name: "valid_config",
initialConfig: `{"version": "v1alpha1", "authKey": "abc123"}`,
phases: []phase{{
expectedConf: &conf.ConfigV1Alpha1{
AuthKey: ptr.To("abc123"),
},
}},
},
{
name: "can_cancel",
initialConfig: `{"version": "v1alpha1", "authKey": "abc123"}`,
phases: []phase{
{
expectedConf: &conf.ConfigV1Alpha1{
AuthKey: ptr.To("abc123"),
},
},
{
cancel: true,
},
},
},
{
name: "can_reload",
initialConfig: `{"version": "v1alpha1", "authKey": "abc123"}`,
phases: []phase{
{
expectedConf: &conf.ConfigV1Alpha1{
AuthKey: ptr.To("abc123"),
},
},
{
config: `{"version": "v1alpha1", "authKey": "def456"}`,
expectedConf: &conf.ConfigV1Alpha1{
AuthKey: ptr.To("def456"),
},
},
},
},
{
name: "ignores_events_with_no_changes",
initialConfig: `{"version": "v1alpha1", "authKey": "abc123"}`,
phases: []phase{
{
expectedConf: &conf.ConfigV1Alpha1{
AuthKey: ptr.To("abc123"),
},
},
{
config: `{"version": "v1alpha1", "authKey": "abc123"}`,
},
},
},
} {
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
root := t.TempDir()
cl := fake.NewClientset()
var cfgPath string
var writeFile func(*testing.T, string)
if env == "file" {
cfgPath = filepath.Join(root, kubetypes.KubeAPIServerConfigFile)
writeFile = func(t *testing.T, content string) {
if err := os.WriteFile(cfgPath, []byte(content), 0o644); err != nil {
t.Fatalf("error writing config file %q: %v", cfgPath, err)
}
}
} else {
cfgPath = "kube:default/config-secret"
writeFile = func(t *testing.T, content string) {
s := secretFrom(content)
mustCreateOrUpdate(t, cl, s)
}
}
configChan := make(chan *conf.Config)
l := NewConfigLoader(zap.Must(zap.NewDevelopment()).Sugar(), cl.CoreV1(), configChan)
l.cfgIgnored = make(chan struct{})
errs := make(chan error)
ctx, cancel := context.WithCancel(t.Context())
defer cancel()
writeFile(t, tc.initialConfig)
go func() {
errs <- l.WatchConfig(ctx, cfgPath)
}()
for i, p := range tc.phases {
if p.config != "" {
writeFile(t, p.config)
}
if p.cancel {
cancel()
}
select {
case cfg := <-configChan:
if diff := cmp.Diff(*p.expectedConf, cfg.Parsed); diff != "" {
t.Errorf("unexpected config (-want +got):\n%s", diff)
}
case err := <-errs:
if p.cancel {
if err != nil {
t.Fatalf("unexpected error after cancel: %v", err)
}
} else if p.expectedErr == "" {
t.Fatalf("unexpected error: %v", err)
} else if !strings.Contains(err.Error(), p.expectedErr) {
t.Fatalf("expected error to contain %q, got %q", p.expectedErr, err.Error())
}
case <-l.cfgIgnored:
if p.expectedConf != nil {
t.Fatalf("expected config to be reloaded, but got ignored signal")
}
case <-time.After(5 * time.Second):
t.Fatalf("timed out waiting for expected event in phase: %d", i)
}
}
})
}
})
}
}
func TestWatchConfigSecret_Rewatches(t *testing.T) {
cl := fake.NewClientset()
var watchCount int
var watcher *watch.RaceFreeFakeWatcher
expected := []string{
`{"version": "v1alpha1", "authKey": "abc123"}`,
`{"version": "v1alpha1", "authKey": "def456"}`,
`{"version": "v1alpha1", "authKey": "ghi789"}`,
}
cl.PrependWatchReactor("secrets", func(action ktesting.Action) (handled bool, ret watch.Interface, err error) {
watcher = watch.NewRaceFreeFake()
watcher.Add(secretFrom(expected[watchCount]))
if action.GetVerb() == "watch" && action.GetResource().Resource == "secrets" {
watchCount++
}
return true, watcher, nil
})
configChan := make(chan *conf.Config)
l := NewConfigLoader(zap.Must(zap.NewDevelopment()).Sugar(), cl.CoreV1(), configChan)
mustCreateOrUpdate(t, cl, secretFrom(expected[0]))
errs := make(chan error)
go func() {
errs <- l.watchConfigSecretChanges(t.Context(), "default", "config-secret")
}()
for i := range 2 {
select {
case cfg := <-configChan:
if exp := expected[i]; cfg.Parsed.AuthKey == nil || !strings.Contains(exp, *cfg.Parsed.AuthKey) {
t.Fatalf("expected config to have authKey %q, got: %v", exp, cfg.Parsed.AuthKey)
}
if i == 0 {
watcher.Stop()
}
case err := <-errs:
t.Fatalf("unexpected error: %v", err)
case <-l.cfgIgnored:
t.Fatalf("expected config to be reloaded, but got ignored signal")
case <-time.After(5 * time.Second):
t.Fatalf("timed out waiting for expected event")
}
}
if watchCount != 2 {
t.Fatalf("expected 2 watch API calls, got %d", watchCount)
}
}
func secretFrom(content string) *corev1.Secret {
return &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "config-secret",
},
Data: map[string][]byte{
kubetypes.KubeAPIServerConfigFile: []byte(content),
},
}
}
func mustCreateOrUpdate(t *testing.T, cl *fake.Clientset, s *corev1.Secret) {
t.Helper()
if _, err := cl.CoreV1().Secrets("default").Create(t.Context(), s, metav1.CreateOptions{}); err != nil {
if _, updateErr := cl.CoreV1().Secrets("default").Update(t.Context(), s, metav1.UpdateOptions{}); updateErr != nil {
t.Fatalf("error writing config Secret %q: %v", s.Name, updateErr)
}
}
}